airbyte_cdk.sources.concurrent_source.concurrent_read_processor
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4import logging 5import os 6from typing import Dict, Iterable, List, Optional, Set 7 8from airbyte_cdk.exception_handler import generate_failed_streams_error_message 9from airbyte_cdk.models import AirbyteMessage, AirbyteStreamStatus, FailureType, StreamDescriptor 10from airbyte_cdk.models import Type as MessageType 11from airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel import ( 12 PartitionGenerationCompletedSentinel, 13) 14from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException 15from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager 16from airbyte_cdk.sources.message import MessageRepository 17from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream 18from airbyte_cdk.sources.streams.concurrent.partition_enqueuer import PartitionEnqueuer 19from airbyte_cdk.sources.streams.concurrent.partition_reader import PartitionReader 20from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition 21from airbyte_cdk.sources.streams.concurrent.partitions.types import PartitionCompleteSentinel 22from airbyte_cdk.sources.types import Record 23from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message 24from airbyte_cdk.sources.utils.slice_logger import SliceLogger 25from airbyte_cdk.utils import AirbyteTracedException 26from airbyte_cdk.utils.stream_status_utils import ( 27 as_airbyte_message as stream_status_as_airbyte_message, 28) 29 30 31class ConcurrentReadProcessor: 32 def __init__( 33 self, 34 stream_instances_to_read_from: List[AbstractStream], 35 partition_enqueuer: PartitionEnqueuer, 36 thread_pool_manager: ThreadPoolManager, 37 logger: logging.Logger, 38 slice_logger: SliceLogger, 39 message_repository: MessageRepository, 40 partition_reader: PartitionReader, 41 ): 42 """ 43 This class is responsible for handling items from a concurrent stream read process. 44 :param stream_instances_to_read_from: List of streams to read from 45 :param partition_enqueuer: PartitionEnqueuer instance 46 :param thread_pool_manager: ThreadPoolManager instance 47 :param logger: Logger instance 48 :param slice_logger: SliceLogger instance 49 :param message_repository: MessageRepository instance 50 :param partition_reader: PartitionReader instance 51 """ 52 self._stream_name_to_instance = {s.name: s for s in stream_instances_to_read_from} 53 self._record_counter = {} 54 self._streams_to_running_partitions: Dict[str, Set[Partition]] = {} 55 for stream in stream_instances_to_read_from: 56 self._streams_to_running_partitions[stream.name] = set() 57 self._record_counter[stream.name] = 0 58 self._thread_pool_manager = thread_pool_manager 59 self._partition_enqueuer = partition_enqueuer 60 self._stream_instances_to_start_partition_generation = stream_instances_to_read_from 61 self._streams_currently_generating_partitions: List[str] = [] 62 self._logger = logger 63 self._slice_logger = slice_logger 64 self._message_repository = message_repository 65 self._partition_reader = partition_reader 66 self._streams_done: Set[str] = set() 67 self._exceptions_per_stream_name: dict[str, List[Exception]] = {} 68 69 def on_partition_generation_completed( 70 self, sentinel: PartitionGenerationCompletedSentinel 71 ) -> Iterable[AirbyteMessage]: 72 """ 73 This method is called when a partition generation is completed. 74 1. Remove the stream from the list of streams currently generating partitions 75 2. If the stream is done, mark it as such and return a stream status message 76 3. If there are more streams to read from, start the next partition generator 77 """ 78 stream_name = sentinel.stream.name 79 self._streams_currently_generating_partitions.remove(sentinel.stream.name) 80 # It is possible for the stream to already be done if no partitions were generated 81 # If the partition generation process was completed and there are no partitions left to process, the stream is done 82 if ( 83 self._is_stream_done(stream_name) 84 or len(self._streams_to_running_partitions[stream_name]) == 0 85 ): 86 yield from self._on_stream_is_done(stream_name) 87 if self._stream_instances_to_start_partition_generation: 88 yield self.start_next_partition_generator() # type:ignore # None may be yielded 89 90 def on_partition(self, partition: Partition) -> None: 91 """ 92 This method is called when a partition is generated. 93 1. Add the partition to the set of partitions for the stream 94 2. Log the slice if necessary 95 3. Submit the partition to the thread pool manager 96 """ 97 stream_name = partition.stream_name() 98 self._streams_to_running_partitions[stream_name].add(partition) 99 cursor = self._stream_name_to_instance[stream_name].cursor 100 if self._slice_logger.should_log_slice_message(self._logger): 101 self._message_repository.emit_message( 102 self._slice_logger.create_slice_log_message(partition.to_slice()) 103 ) 104 self._thread_pool_manager.submit( 105 self._partition_reader.process_partition, partition, cursor 106 ) 107 108 def on_partition_complete_sentinel( 109 self, sentinel: PartitionCompleteSentinel 110 ) -> Iterable[AirbyteMessage]: 111 """ 112 This method is called when a partition is completed. 113 1. Close the partition 114 2. If the stream is done, mark it as such and return a stream status message 115 3. Emit messages that were added to the message repository 116 """ 117 partition = sentinel.partition 118 119 partitions_running = self._streams_to_running_partitions[partition.stream_name()] 120 if partition in partitions_running: 121 partitions_running.remove(partition) 122 # If all partitions were generated and this was the last one, the stream is done 123 if ( 124 partition.stream_name() not in self._streams_currently_generating_partitions 125 and len(partitions_running) == 0 126 ): 127 yield from self._on_stream_is_done(partition.stream_name()) 128 yield from self._message_repository.consume_queue() 129 130 def on_record(self, record: Record) -> Iterable[AirbyteMessage]: 131 """ 132 This method is called when a record is read from a partition. 133 1. Convert the record to an AirbyteMessage 134 2. If this is the first record for the stream, mark the stream as RUNNING 135 3. Increment the record counter for the stream 136 4. Ensures the cursor knows the record has been successfully emitted 137 5. Emit the message 138 6. Emit messages that were added to the message repository 139 """ 140 # Do not pass a transformer or a schema 141 # AbstractStreams are expected to return data as they are expected. 142 # Any transformation on the data should be done before reaching this point 143 message = stream_data_to_airbyte_message( 144 stream_name=record.stream_name, 145 data_or_message=record.data, 146 file_reference=record.file_reference, 147 ) 148 stream = self._stream_name_to_instance[record.stream_name] 149 150 if message.type == MessageType.RECORD: 151 if self._record_counter[stream.name] == 0: 152 self._logger.info(f"Marking stream {stream.name} as RUNNING") 153 yield stream_status_as_airbyte_message( 154 stream.as_airbyte_stream(), AirbyteStreamStatus.RUNNING 155 ) 156 self._record_counter[stream.name] += 1 157 yield message 158 yield from self._message_repository.consume_queue() 159 160 def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMessage]: 161 """ 162 This method is called when an exception is raised. 163 1. Stop all running streams 164 2. Raise the exception 165 """ 166 self._flag_exception(exception.stream_name, exception.exception) 167 self._logger.exception( 168 f"Exception while syncing stream {exception.stream_name}", exc_info=exception.exception 169 ) 170 171 stream_descriptor = StreamDescriptor(name=exception.stream_name) 172 if isinstance(exception.exception, AirbyteTracedException): 173 yield exception.exception.as_airbyte_message(stream_descriptor=stream_descriptor) 174 else: 175 yield AirbyteTracedException.from_exception( 176 exception.exception, 177 stream_descriptor=stream_descriptor, 178 message=f"An unexpected error occurred in stream {exception.stream_name}: {type(exception.exception).__name__}", 179 ).as_airbyte_message() 180 181 def _flag_exception(self, stream_name: str, exception: Exception) -> None: 182 self._exceptions_per_stream_name.setdefault(stream_name, []).append(exception) 183 184 def start_next_partition_generator(self) -> Optional[AirbyteMessage]: 185 """ 186 Start the next partition generator. 187 1. Pop the next stream to read from 188 2. Submit the partition generator to the thread pool manager 189 3. Add the stream to the list of streams currently generating partitions 190 4. Return a stream status message 191 """ 192 if self._stream_instances_to_start_partition_generation: 193 stream = self._stream_instances_to_start_partition_generation.pop(0) 194 self._thread_pool_manager.submit(self._partition_enqueuer.generate_partitions, stream) 195 self._streams_currently_generating_partitions.append(stream.name) 196 self._logger.info(f"Marking stream {stream.name} as STARTED") 197 self._logger.info(f"Syncing stream: {stream.name} ") 198 return stream_status_as_airbyte_message( 199 stream.as_airbyte_stream(), 200 AirbyteStreamStatus.STARTED, 201 ) 202 else: 203 return None 204 205 def is_done(self) -> bool: 206 """ 207 This method is called to check if the sync is done. 208 The sync is done when: 209 1. There are no more streams generating partitions 210 2. There are no more streams to read from 211 3. All partitions for all streams are closed 212 """ 213 is_done = all( 214 [ 215 self._is_stream_done(stream_name) 216 for stream_name in self._stream_name_to_instance.keys() 217 ] 218 ) 219 if is_done and self._exceptions_per_stream_name: 220 error_message = generate_failed_streams_error_message(self._exceptions_per_stream_name) 221 self._logger.info(error_message) 222 # We still raise at least one exception when a stream raises an exception because the platform currently relies 223 # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error 224 # type because this combined error isn't actionable, but rather the previously emitted individual errors. 225 raise AirbyteTracedException( 226 message=error_message, 227 internal_message="Concurrent read failure", 228 failure_type=FailureType.config_error, 229 ) 230 return is_done 231 232 def _is_stream_done(self, stream_name: str) -> bool: 233 return stream_name in self._streams_done 234 235 def _on_stream_is_done(self, stream_name: str) -> Iterable[AirbyteMessage]: 236 self._logger.info( 237 f"Read {self._record_counter[stream_name]} records from {stream_name} stream" 238 ) 239 self._logger.info(f"Marking stream {stream_name} as STOPPED") 240 stream = self._stream_name_to_instance[stream_name] 241 stream.cursor.ensure_at_least_one_state_emitted() 242 yield from self._message_repository.consume_queue() 243 self._logger.info(f"Finished syncing {stream.name}") 244 self._streams_done.add(stream_name) 245 stream_status = ( 246 AirbyteStreamStatus.INCOMPLETE 247 if self._exceptions_per_stream_name.get(stream_name, []) 248 else AirbyteStreamStatus.COMPLETE 249 ) 250 yield stream_status_as_airbyte_message(stream.as_airbyte_stream(), stream_status)
class
ConcurrentReadProcessor:
32class ConcurrentReadProcessor: 33 def __init__( 34 self, 35 stream_instances_to_read_from: List[AbstractStream], 36 partition_enqueuer: PartitionEnqueuer, 37 thread_pool_manager: ThreadPoolManager, 38 logger: logging.Logger, 39 slice_logger: SliceLogger, 40 message_repository: MessageRepository, 41 partition_reader: PartitionReader, 42 ): 43 """ 44 This class is responsible for handling items from a concurrent stream read process. 45 :param stream_instances_to_read_from: List of streams to read from 46 :param partition_enqueuer: PartitionEnqueuer instance 47 :param thread_pool_manager: ThreadPoolManager instance 48 :param logger: Logger instance 49 :param slice_logger: SliceLogger instance 50 :param message_repository: MessageRepository instance 51 :param partition_reader: PartitionReader instance 52 """ 53 self._stream_name_to_instance = {s.name: s for s in stream_instances_to_read_from} 54 self._record_counter = {} 55 self._streams_to_running_partitions: Dict[str, Set[Partition]] = {} 56 for stream in stream_instances_to_read_from: 57 self._streams_to_running_partitions[stream.name] = set() 58 self._record_counter[stream.name] = 0 59 self._thread_pool_manager = thread_pool_manager 60 self._partition_enqueuer = partition_enqueuer 61 self._stream_instances_to_start_partition_generation = stream_instances_to_read_from 62 self._streams_currently_generating_partitions: List[str] = [] 63 self._logger = logger 64 self._slice_logger = slice_logger 65 self._message_repository = message_repository 66 self._partition_reader = partition_reader 67 self._streams_done: Set[str] = set() 68 self._exceptions_per_stream_name: dict[str, List[Exception]] = {} 69 70 def on_partition_generation_completed( 71 self, sentinel: PartitionGenerationCompletedSentinel 72 ) -> Iterable[AirbyteMessage]: 73 """ 74 This method is called when a partition generation is completed. 75 1. Remove the stream from the list of streams currently generating partitions 76 2. If the stream is done, mark it as such and return a stream status message 77 3. If there are more streams to read from, start the next partition generator 78 """ 79 stream_name = sentinel.stream.name 80 self._streams_currently_generating_partitions.remove(sentinel.stream.name) 81 # It is possible for the stream to already be done if no partitions were generated 82 # If the partition generation process was completed and there are no partitions left to process, the stream is done 83 if ( 84 self._is_stream_done(stream_name) 85 or len(self._streams_to_running_partitions[stream_name]) == 0 86 ): 87 yield from self._on_stream_is_done(stream_name) 88 if self._stream_instances_to_start_partition_generation: 89 yield self.start_next_partition_generator() # type:ignore # None may be yielded 90 91 def on_partition(self, partition: Partition) -> None: 92 """ 93 This method is called when a partition is generated. 94 1. Add the partition to the set of partitions for the stream 95 2. Log the slice if necessary 96 3. Submit the partition to the thread pool manager 97 """ 98 stream_name = partition.stream_name() 99 self._streams_to_running_partitions[stream_name].add(partition) 100 cursor = self._stream_name_to_instance[stream_name].cursor 101 if self._slice_logger.should_log_slice_message(self._logger): 102 self._message_repository.emit_message( 103 self._slice_logger.create_slice_log_message(partition.to_slice()) 104 ) 105 self._thread_pool_manager.submit( 106 self._partition_reader.process_partition, partition, cursor 107 ) 108 109 def on_partition_complete_sentinel( 110 self, sentinel: PartitionCompleteSentinel 111 ) -> Iterable[AirbyteMessage]: 112 """ 113 This method is called when a partition is completed. 114 1. Close the partition 115 2. If the stream is done, mark it as such and return a stream status message 116 3. Emit messages that were added to the message repository 117 """ 118 partition = sentinel.partition 119 120 partitions_running = self._streams_to_running_partitions[partition.stream_name()] 121 if partition in partitions_running: 122 partitions_running.remove(partition) 123 # If all partitions were generated and this was the last one, the stream is done 124 if ( 125 partition.stream_name() not in self._streams_currently_generating_partitions 126 and len(partitions_running) == 0 127 ): 128 yield from self._on_stream_is_done(partition.stream_name()) 129 yield from self._message_repository.consume_queue() 130 131 def on_record(self, record: Record) -> Iterable[AirbyteMessage]: 132 """ 133 This method is called when a record is read from a partition. 134 1. Convert the record to an AirbyteMessage 135 2. If this is the first record for the stream, mark the stream as RUNNING 136 3. Increment the record counter for the stream 137 4. Ensures the cursor knows the record has been successfully emitted 138 5. Emit the message 139 6. Emit messages that were added to the message repository 140 """ 141 # Do not pass a transformer or a schema 142 # AbstractStreams are expected to return data as they are expected. 143 # Any transformation on the data should be done before reaching this point 144 message = stream_data_to_airbyte_message( 145 stream_name=record.stream_name, 146 data_or_message=record.data, 147 file_reference=record.file_reference, 148 ) 149 stream = self._stream_name_to_instance[record.stream_name] 150 151 if message.type == MessageType.RECORD: 152 if self._record_counter[stream.name] == 0: 153 self._logger.info(f"Marking stream {stream.name} as RUNNING") 154 yield stream_status_as_airbyte_message( 155 stream.as_airbyte_stream(), AirbyteStreamStatus.RUNNING 156 ) 157 self._record_counter[stream.name] += 1 158 yield message 159 yield from self._message_repository.consume_queue() 160 161 def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMessage]: 162 """ 163 This method is called when an exception is raised. 164 1. Stop all running streams 165 2. Raise the exception 166 """ 167 self._flag_exception(exception.stream_name, exception.exception) 168 self._logger.exception( 169 f"Exception while syncing stream {exception.stream_name}", exc_info=exception.exception 170 ) 171 172 stream_descriptor = StreamDescriptor(name=exception.stream_name) 173 if isinstance(exception.exception, AirbyteTracedException): 174 yield exception.exception.as_airbyte_message(stream_descriptor=stream_descriptor) 175 else: 176 yield AirbyteTracedException.from_exception( 177 exception.exception, 178 stream_descriptor=stream_descriptor, 179 message=f"An unexpected error occurred in stream {exception.stream_name}: {type(exception.exception).__name__}", 180 ).as_airbyte_message() 181 182 def _flag_exception(self, stream_name: str, exception: Exception) -> None: 183 self._exceptions_per_stream_name.setdefault(stream_name, []).append(exception) 184 185 def start_next_partition_generator(self) -> Optional[AirbyteMessage]: 186 """ 187 Start the next partition generator. 188 1. Pop the next stream to read from 189 2. Submit the partition generator to the thread pool manager 190 3. Add the stream to the list of streams currently generating partitions 191 4. Return a stream status message 192 """ 193 if self._stream_instances_to_start_partition_generation: 194 stream = self._stream_instances_to_start_partition_generation.pop(0) 195 self._thread_pool_manager.submit(self._partition_enqueuer.generate_partitions, stream) 196 self._streams_currently_generating_partitions.append(stream.name) 197 self._logger.info(f"Marking stream {stream.name} as STARTED") 198 self._logger.info(f"Syncing stream: {stream.name} ") 199 return stream_status_as_airbyte_message( 200 stream.as_airbyte_stream(), 201 AirbyteStreamStatus.STARTED, 202 ) 203 else: 204 return None 205 206 def is_done(self) -> bool: 207 """ 208 This method is called to check if the sync is done. 209 The sync is done when: 210 1. There are no more streams generating partitions 211 2. There are no more streams to read from 212 3. All partitions for all streams are closed 213 """ 214 is_done = all( 215 [ 216 self._is_stream_done(stream_name) 217 for stream_name in self._stream_name_to_instance.keys() 218 ] 219 ) 220 if is_done and self._exceptions_per_stream_name: 221 error_message = generate_failed_streams_error_message(self._exceptions_per_stream_name) 222 self._logger.info(error_message) 223 # We still raise at least one exception when a stream raises an exception because the platform currently relies 224 # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error 225 # type because this combined error isn't actionable, but rather the previously emitted individual errors. 226 raise AirbyteTracedException( 227 message=error_message, 228 internal_message="Concurrent read failure", 229 failure_type=FailureType.config_error, 230 ) 231 return is_done 232 233 def _is_stream_done(self, stream_name: str) -> bool: 234 return stream_name in self._streams_done 235 236 def _on_stream_is_done(self, stream_name: str) -> Iterable[AirbyteMessage]: 237 self._logger.info( 238 f"Read {self._record_counter[stream_name]} records from {stream_name} stream" 239 ) 240 self._logger.info(f"Marking stream {stream_name} as STOPPED") 241 stream = self._stream_name_to_instance[stream_name] 242 stream.cursor.ensure_at_least_one_state_emitted() 243 yield from self._message_repository.consume_queue() 244 self._logger.info(f"Finished syncing {stream.name}") 245 self._streams_done.add(stream_name) 246 stream_status = ( 247 AirbyteStreamStatus.INCOMPLETE 248 if self._exceptions_per_stream_name.get(stream_name, []) 249 else AirbyteStreamStatus.COMPLETE 250 ) 251 yield stream_status_as_airbyte_message(stream.as_airbyte_stream(), stream_status)
ConcurrentReadProcessor( stream_instances_to_read_from: List[airbyte_cdk.sources.streams.concurrent.abstract_stream.AbstractStream], partition_enqueuer: airbyte_cdk.sources.streams.concurrent.partition_enqueuer.PartitionEnqueuer, thread_pool_manager: airbyte_cdk.sources.concurrent_source.thread_pool_manager.ThreadPoolManager, logger: logging.Logger, slice_logger: airbyte_cdk.sources.utils.slice_logger.SliceLogger, message_repository: airbyte_cdk.MessageRepository, partition_reader: airbyte_cdk.sources.streams.concurrent.partition_reader.PartitionReader)
33 def __init__( 34 self, 35 stream_instances_to_read_from: List[AbstractStream], 36 partition_enqueuer: PartitionEnqueuer, 37 thread_pool_manager: ThreadPoolManager, 38 logger: logging.Logger, 39 slice_logger: SliceLogger, 40 message_repository: MessageRepository, 41 partition_reader: PartitionReader, 42 ): 43 """ 44 This class is responsible for handling items from a concurrent stream read process. 45 :param stream_instances_to_read_from: List of streams to read from 46 :param partition_enqueuer: PartitionEnqueuer instance 47 :param thread_pool_manager: ThreadPoolManager instance 48 :param logger: Logger instance 49 :param slice_logger: SliceLogger instance 50 :param message_repository: MessageRepository instance 51 :param partition_reader: PartitionReader instance 52 """ 53 self._stream_name_to_instance = {s.name: s for s in stream_instances_to_read_from} 54 self._record_counter = {} 55 self._streams_to_running_partitions: Dict[str, Set[Partition]] = {} 56 for stream in stream_instances_to_read_from: 57 self._streams_to_running_partitions[stream.name] = set() 58 self._record_counter[stream.name] = 0 59 self._thread_pool_manager = thread_pool_manager 60 self._partition_enqueuer = partition_enqueuer 61 self._stream_instances_to_start_partition_generation = stream_instances_to_read_from 62 self._streams_currently_generating_partitions: List[str] = [] 63 self._logger = logger 64 self._slice_logger = slice_logger 65 self._message_repository = message_repository 66 self._partition_reader = partition_reader 67 self._streams_done: Set[str] = set() 68 self._exceptions_per_stream_name: dict[str, List[Exception]] = {}
This class is responsible for handling items from a concurrent stream read process.
Parameters
- stream_instances_to_read_from: List of streams to read from
- partition_enqueuer: PartitionEnqueuer instance
- thread_pool_manager: ThreadPoolManager instance
- logger: Logger instance
- slice_logger: SliceLogger instance
- message_repository: MessageRepository instance
- partition_reader: PartitionReader instance
def
on_partition_generation_completed( self, sentinel: airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel.PartitionGenerationCompletedSentinel) -> Iterable[airbyte_cdk.AirbyteMessage]:
70 def on_partition_generation_completed( 71 self, sentinel: PartitionGenerationCompletedSentinel 72 ) -> Iterable[AirbyteMessage]: 73 """ 74 This method is called when a partition generation is completed. 75 1. Remove the stream from the list of streams currently generating partitions 76 2. If the stream is done, mark it as such and return a stream status message 77 3. If there are more streams to read from, start the next partition generator 78 """ 79 stream_name = sentinel.stream.name 80 self._streams_currently_generating_partitions.remove(sentinel.stream.name) 81 # It is possible for the stream to already be done if no partitions were generated 82 # If the partition generation process was completed and there are no partitions left to process, the stream is done 83 if ( 84 self._is_stream_done(stream_name) 85 or len(self._streams_to_running_partitions[stream_name]) == 0 86 ): 87 yield from self._on_stream_is_done(stream_name) 88 if self._stream_instances_to_start_partition_generation: 89 yield self.start_next_partition_generator() # type:ignore # None may be yielded
This method is called when a partition generation is completed.
- Remove the stream from the list of streams currently generating partitions
- If the stream is done, mark it as such and return a stream status message
- If there are more streams to read from, start the next partition generator
def
on_partition( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition) -> None:
91 def on_partition(self, partition: Partition) -> None: 92 """ 93 This method is called when a partition is generated. 94 1. Add the partition to the set of partitions for the stream 95 2. Log the slice if necessary 96 3. Submit the partition to the thread pool manager 97 """ 98 stream_name = partition.stream_name() 99 self._streams_to_running_partitions[stream_name].add(partition) 100 cursor = self._stream_name_to_instance[stream_name].cursor 101 if self._slice_logger.should_log_slice_message(self._logger): 102 self._message_repository.emit_message( 103 self._slice_logger.create_slice_log_message(partition.to_slice()) 104 ) 105 self._thread_pool_manager.submit( 106 self._partition_reader.process_partition, partition, cursor 107 )
This method is called when a partition is generated.
- Add the partition to the set of partitions for the stream
- Log the slice if necessary
- Submit the partition to the thread pool manager
def
on_partition_complete_sentinel( self, sentinel: airbyte_cdk.sources.streams.concurrent.partitions.types.PartitionCompleteSentinel) -> Iterable[airbyte_cdk.AirbyteMessage]:
109 def on_partition_complete_sentinel( 110 self, sentinel: PartitionCompleteSentinel 111 ) -> Iterable[AirbyteMessage]: 112 """ 113 This method is called when a partition is completed. 114 1. Close the partition 115 2. If the stream is done, mark it as such and return a stream status message 116 3. Emit messages that were added to the message repository 117 """ 118 partition = sentinel.partition 119 120 partitions_running = self._streams_to_running_partitions[partition.stream_name()] 121 if partition in partitions_running: 122 partitions_running.remove(partition) 123 # If all partitions were generated and this was the last one, the stream is done 124 if ( 125 partition.stream_name() not in self._streams_currently_generating_partitions 126 and len(partitions_running) == 0 127 ): 128 yield from self._on_stream_is_done(partition.stream_name()) 129 yield from self._message_repository.consume_queue()
This method is called when a partition is completed.
- Close the partition
- If the stream is done, mark it as such and return a stream status message
- Emit messages that were added to the message repository
131 def on_record(self, record: Record) -> Iterable[AirbyteMessage]: 132 """ 133 This method is called when a record is read from a partition. 134 1. Convert the record to an AirbyteMessage 135 2. If this is the first record for the stream, mark the stream as RUNNING 136 3. Increment the record counter for the stream 137 4. Ensures the cursor knows the record has been successfully emitted 138 5. Emit the message 139 6. Emit messages that were added to the message repository 140 """ 141 # Do not pass a transformer or a schema 142 # AbstractStreams are expected to return data as they are expected. 143 # Any transformation on the data should be done before reaching this point 144 message = stream_data_to_airbyte_message( 145 stream_name=record.stream_name, 146 data_or_message=record.data, 147 file_reference=record.file_reference, 148 ) 149 stream = self._stream_name_to_instance[record.stream_name] 150 151 if message.type == MessageType.RECORD: 152 if self._record_counter[stream.name] == 0: 153 self._logger.info(f"Marking stream {stream.name} as RUNNING") 154 yield stream_status_as_airbyte_message( 155 stream.as_airbyte_stream(), AirbyteStreamStatus.RUNNING 156 ) 157 self._record_counter[stream.name] += 1 158 yield message 159 yield from self._message_repository.consume_queue()
This method is called when a record is read from a partition.
- Convert the record to an AirbyteMessage
- If this is the first record for the stream, mark the stream as RUNNING
- Increment the record counter for the stream
- Ensures the cursor knows the record has been successfully emitted
- Emit the message
- Emit messages that were added to the message repository
def
on_exception( self, exception: airbyte_cdk.sources.concurrent_source.stream_thread_exception.StreamThreadException) -> Iterable[airbyte_cdk.AirbyteMessage]:
161 def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMessage]: 162 """ 163 This method is called when an exception is raised. 164 1. Stop all running streams 165 2. Raise the exception 166 """ 167 self._flag_exception(exception.stream_name, exception.exception) 168 self._logger.exception( 169 f"Exception while syncing stream {exception.stream_name}", exc_info=exception.exception 170 ) 171 172 stream_descriptor = StreamDescriptor(name=exception.stream_name) 173 if isinstance(exception.exception, AirbyteTracedException): 174 yield exception.exception.as_airbyte_message(stream_descriptor=stream_descriptor) 175 else: 176 yield AirbyteTracedException.from_exception( 177 exception.exception, 178 stream_descriptor=stream_descriptor, 179 message=f"An unexpected error occurred in stream {exception.stream_name}: {type(exception.exception).__name__}", 180 ).as_airbyte_message()
This method is called when an exception is raised.
- Stop all running streams
- Raise the exception
185 def start_next_partition_generator(self) -> Optional[AirbyteMessage]: 186 """ 187 Start the next partition generator. 188 1. Pop the next stream to read from 189 2. Submit the partition generator to the thread pool manager 190 3. Add the stream to the list of streams currently generating partitions 191 4. Return a stream status message 192 """ 193 if self._stream_instances_to_start_partition_generation: 194 stream = self._stream_instances_to_start_partition_generation.pop(0) 195 self._thread_pool_manager.submit(self._partition_enqueuer.generate_partitions, stream) 196 self._streams_currently_generating_partitions.append(stream.name) 197 self._logger.info(f"Marking stream {stream.name} as STARTED") 198 self._logger.info(f"Syncing stream: {stream.name} ") 199 return stream_status_as_airbyte_message( 200 stream.as_airbyte_stream(), 201 AirbyteStreamStatus.STARTED, 202 ) 203 else: 204 return None
Start the next partition generator.
- Pop the next stream to read from
- Submit the partition generator to the thread pool manager
- Add the stream to the list of streams currently generating partitions
- Return a stream status message
def
is_done(self) -> bool:
206 def is_done(self) -> bool: 207 """ 208 This method is called to check if the sync is done. 209 The sync is done when: 210 1. There are no more streams generating partitions 211 2. There are no more streams to read from 212 3. All partitions for all streams are closed 213 """ 214 is_done = all( 215 [ 216 self._is_stream_done(stream_name) 217 for stream_name in self._stream_name_to_instance.keys() 218 ] 219 ) 220 if is_done and self._exceptions_per_stream_name: 221 error_message = generate_failed_streams_error_message(self._exceptions_per_stream_name) 222 self._logger.info(error_message) 223 # We still raise at least one exception when a stream raises an exception because the platform currently relies 224 # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error 225 # type because this combined error isn't actionable, but rather the previously emitted individual errors. 226 raise AirbyteTracedException( 227 message=error_message, 228 internal_message="Concurrent read failure", 229 failure_type=FailureType.config_error, 230 ) 231 return is_done
This method is called to check if the sync is done. The sync is done when:
- There are no more streams generating partitions
- There are no more streams to read from
- All partitions for all streams are closed