airbyte_cdk.sources.concurrent_source.concurrent_read_processor
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4import logging 5from typing import Dict, Iterable, List, Optional, Set 6 7from airbyte_cdk.exception_handler import generate_failed_streams_error_message 8from airbyte_cdk.models import AirbyteMessage, AirbyteStreamStatus, FailureType, StreamDescriptor 9from airbyte_cdk.models import Type as MessageType 10from airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel import ( 11 PartitionGenerationCompletedSentinel, 12) 13from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException 14from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager 15from airbyte_cdk.sources.message import MessageRepository 16from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream 17from airbyte_cdk.sources.streams.concurrent.partition_enqueuer import PartitionEnqueuer 18from airbyte_cdk.sources.streams.concurrent.partition_reader import PartitionReader 19from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition 20from airbyte_cdk.sources.streams.concurrent.partitions.types import PartitionCompleteSentinel 21from airbyte_cdk.sources.types import Record 22from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message 23from airbyte_cdk.sources.utils.slice_logger import SliceLogger 24from airbyte_cdk.utils import AirbyteTracedException 25from airbyte_cdk.utils.stream_status_utils import ( 26 as_airbyte_message as stream_status_as_airbyte_message, 27) 28 29 30class ConcurrentReadProcessor: 31 def __init__( 32 self, 33 stream_instances_to_read_from: List[AbstractStream], 34 partition_enqueuer: PartitionEnqueuer, 35 thread_pool_manager: ThreadPoolManager, 36 logger: logging.Logger, 37 slice_logger: SliceLogger, 38 message_repository: MessageRepository, 39 partition_reader: PartitionReader, 40 ): 41 """ 42 This class is responsible for handling items from a concurrent stream read process. 43 :param stream_instances_to_read_from: List of streams to read from 44 :param partition_enqueuer: PartitionEnqueuer instance 45 :param thread_pool_manager: ThreadPoolManager instance 46 :param logger: Logger instance 47 :param slice_logger: SliceLogger instance 48 :param message_repository: MessageRepository instance 49 :param partition_reader: PartitionReader instance 50 """ 51 self._stream_name_to_instance = {s.name: s for s in stream_instances_to_read_from} 52 self._record_counter = {} 53 self._streams_to_running_partitions: Dict[str, Set[Partition]] = {} 54 for stream in stream_instances_to_read_from: 55 self._streams_to_running_partitions[stream.name] = set() 56 self._record_counter[stream.name] = 0 57 self._thread_pool_manager = thread_pool_manager 58 self._partition_enqueuer = partition_enqueuer 59 self._stream_instances_to_start_partition_generation = stream_instances_to_read_from 60 self._streams_currently_generating_partitions: List[str] = [] 61 self._logger = logger 62 self._slice_logger = slice_logger 63 self._message_repository = message_repository 64 self._partition_reader = partition_reader 65 self._streams_done: Set[str] = set() 66 self._exceptions_per_stream_name: dict[str, List[Exception]] = {} 67 68 def on_partition_generation_completed( 69 self, sentinel: PartitionGenerationCompletedSentinel 70 ) -> Iterable[AirbyteMessage]: 71 """ 72 This method is called when a partition generation is completed. 73 1. Remove the stream from the list of streams currently generating partitions 74 2. If the stream is done, mark it as such and return a stream status message 75 3. If there are more streams to read from, start the next partition generator 76 """ 77 stream_name = sentinel.stream.name 78 self._streams_currently_generating_partitions.remove(sentinel.stream.name) 79 # It is possible for the stream to already be done if no partitions were generated 80 # If the partition generation process was completed and there are no partitions left to process, the stream is done 81 if ( 82 self._is_stream_done(stream_name) 83 or len(self._streams_to_running_partitions[stream_name]) == 0 84 ): 85 yield from self._on_stream_is_done(stream_name) 86 if self._stream_instances_to_start_partition_generation: 87 yield self.start_next_partition_generator() # type:ignore # None may be yielded 88 89 def on_partition(self, partition: Partition) -> None: 90 """ 91 This method is called when a partition is generated. 92 1. Add the partition to the set of partitions for the stream 93 2. Log the slice if necessary 94 3. Submit the partition to the thread pool manager 95 """ 96 stream_name = partition.stream_name() 97 self._streams_to_running_partitions[stream_name].add(partition) 98 if self._slice_logger.should_log_slice_message(self._logger): 99 self._message_repository.emit_message( 100 self._slice_logger.create_slice_log_message(partition.to_slice()) 101 ) 102 self._thread_pool_manager.submit(self._partition_reader.process_partition, partition) 103 104 def on_partition_complete_sentinel( 105 self, sentinel: PartitionCompleteSentinel 106 ) -> Iterable[AirbyteMessage]: 107 """ 108 This method is called when a partition is completed. 109 1. Close the partition 110 2. If the stream is done, mark it as such and return a stream status message 111 3. Emit messages that were added to the message repository 112 """ 113 partition = sentinel.partition 114 115 try: 116 if sentinel.is_successful: 117 stream = self._stream_name_to_instance[partition.stream_name()] 118 stream.cursor.close_partition(partition) 119 except Exception as exception: 120 self._flag_exception(partition.stream_name(), exception) 121 yield AirbyteTracedException.from_exception( 122 exception, stream_descriptor=StreamDescriptor(name=partition.stream_name()) 123 ).as_sanitized_airbyte_message() 124 finally: 125 partitions_running = self._streams_to_running_partitions[partition.stream_name()] 126 if partition in partitions_running: 127 partitions_running.remove(partition) 128 # If all partitions were generated and this was the last one, the stream is done 129 if ( 130 partition.stream_name() not in self._streams_currently_generating_partitions 131 and len(partitions_running) == 0 132 ): 133 yield from self._on_stream_is_done(partition.stream_name()) 134 yield from self._message_repository.consume_queue() 135 136 def on_record(self, record: Record) -> Iterable[AirbyteMessage]: 137 """ 138 This method is called when a record is read from a partition. 139 1. Convert the record to an AirbyteMessage 140 2. If this is the first record for the stream, mark the stream as RUNNING 141 3. Increment the record counter for the stream 142 4. Ensures the cursor knows the record has been successfully emitted 143 5. Emit the message 144 6. Emit messages that were added to the message repository 145 """ 146 # Do not pass a transformer or a schema 147 # AbstractStreams are expected to return data as they are expected. 148 # Any transformation on the data should be done before reaching this point 149 message = stream_data_to_airbyte_message( 150 stream_name=record.stream_name, 151 data_or_message=record.data, 152 is_file_transfer_message=record.is_file_transfer_message, 153 ) 154 stream = self._stream_name_to_instance[record.stream_name] 155 156 if message.type == MessageType.RECORD: 157 if self._record_counter[stream.name] == 0: 158 self._logger.info(f"Marking stream {stream.name} as RUNNING") 159 yield stream_status_as_airbyte_message( 160 stream.as_airbyte_stream(), AirbyteStreamStatus.RUNNING 161 ) 162 self._record_counter[stream.name] += 1 163 stream.cursor.observe(record) 164 yield message 165 yield from self._message_repository.consume_queue() 166 167 def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMessage]: 168 """ 169 This method is called when an exception is raised. 170 1. Stop all running streams 171 2. Raise the exception 172 """ 173 self._flag_exception(exception.stream_name, exception.exception) 174 self._logger.exception( 175 f"Exception while syncing stream {exception.stream_name}", exc_info=exception.exception 176 ) 177 178 stream_descriptor = StreamDescriptor(name=exception.stream_name) 179 if isinstance(exception.exception, AirbyteTracedException): 180 yield exception.exception.as_airbyte_message(stream_descriptor=stream_descriptor) 181 else: 182 yield AirbyteTracedException.from_exception( 183 exception, stream_descriptor=stream_descriptor 184 ).as_airbyte_message() 185 186 def _flag_exception(self, stream_name: str, exception: Exception) -> None: 187 self._exceptions_per_stream_name.setdefault(stream_name, []).append(exception) 188 189 def start_next_partition_generator(self) -> Optional[AirbyteMessage]: 190 """ 191 Start the next partition generator. 192 1. Pop the next stream to read from 193 2. Submit the partition generator to the thread pool manager 194 3. Add the stream to the list of streams currently generating partitions 195 4. Return a stream status message 196 """ 197 if self._stream_instances_to_start_partition_generation: 198 stream = self._stream_instances_to_start_partition_generation.pop(0) 199 self._thread_pool_manager.submit(self._partition_enqueuer.generate_partitions, stream) 200 self._streams_currently_generating_partitions.append(stream.name) 201 self._logger.info(f"Marking stream {stream.name} as STARTED") 202 self._logger.info(f"Syncing stream: {stream.name} ") 203 return stream_status_as_airbyte_message( 204 stream.as_airbyte_stream(), 205 AirbyteStreamStatus.STARTED, 206 ) 207 else: 208 return None 209 210 def is_done(self) -> bool: 211 """ 212 This method is called to check if the sync is done. 213 The sync is done when: 214 1. There are no more streams generating partitions 215 2. There are no more streams to read from 216 3. All partitions for all streams are closed 217 """ 218 is_done = all( 219 [ 220 self._is_stream_done(stream_name) 221 for stream_name in self._stream_name_to_instance.keys() 222 ] 223 ) 224 if is_done and self._exceptions_per_stream_name: 225 error_message = generate_failed_streams_error_message(self._exceptions_per_stream_name) 226 self._logger.info(error_message) 227 # We still raise at least one exception when a stream raises an exception because the platform currently relies 228 # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error 229 # type because this combined error isn't actionable, but rather the previously emitted individual errors. 230 raise AirbyteTracedException( 231 message=error_message, 232 internal_message="Concurrent read failure", 233 failure_type=FailureType.config_error, 234 ) 235 return is_done 236 237 def _is_stream_done(self, stream_name: str) -> bool: 238 return stream_name in self._streams_done 239 240 def _on_stream_is_done(self, stream_name: str) -> Iterable[AirbyteMessage]: 241 self._logger.info( 242 f"Read {self._record_counter[stream_name]} records from {stream_name} stream" 243 ) 244 self._logger.info(f"Marking stream {stream_name} as STOPPED") 245 stream = self._stream_name_to_instance[stream_name] 246 stream.cursor.ensure_at_least_one_state_emitted() 247 yield from self._message_repository.consume_queue() 248 self._logger.info(f"Finished syncing {stream.name}") 249 self._streams_done.add(stream_name) 250 stream_status = ( 251 AirbyteStreamStatus.INCOMPLETE 252 if self._exceptions_per_stream_name.get(stream_name, []) 253 else AirbyteStreamStatus.COMPLETE 254 ) 255 yield stream_status_as_airbyte_message(stream.as_airbyte_stream(), stream_status)
class
ConcurrentReadProcessor:
31class ConcurrentReadProcessor: 32 def __init__( 33 self, 34 stream_instances_to_read_from: List[AbstractStream], 35 partition_enqueuer: PartitionEnqueuer, 36 thread_pool_manager: ThreadPoolManager, 37 logger: logging.Logger, 38 slice_logger: SliceLogger, 39 message_repository: MessageRepository, 40 partition_reader: PartitionReader, 41 ): 42 """ 43 This class is responsible for handling items from a concurrent stream read process. 44 :param stream_instances_to_read_from: List of streams to read from 45 :param partition_enqueuer: PartitionEnqueuer instance 46 :param thread_pool_manager: ThreadPoolManager instance 47 :param logger: Logger instance 48 :param slice_logger: SliceLogger instance 49 :param message_repository: MessageRepository instance 50 :param partition_reader: PartitionReader instance 51 """ 52 self._stream_name_to_instance = {s.name: s for s in stream_instances_to_read_from} 53 self._record_counter = {} 54 self._streams_to_running_partitions: Dict[str, Set[Partition]] = {} 55 for stream in stream_instances_to_read_from: 56 self._streams_to_running_partitions[stream.name] = set() 57 self._record_counter[stream.name] = 0 58 self._thread_pool_manager = thread_pool_manager 59 self._partition_enqueuer = partition_enqueuer 60 self._stream_instances_to_start_partition_generation = stream_instances_to_read_from 61 self._streams_currently_generating_partitions: List[str] = [] 62 self._logger = logger 63 self._slice_logger = slice_logger 64 self._message_repository = message_repository 65 self._partition_reader = partition_reader 66 self._streams_done: Set[str] = set() 67 self._exceptions_per_stream_name: dict[str, List[Exception]] = {} 68 69 def on_partition_generation_completed( 70 self, sentinel: PartitionGenerationCompletedSentinel 71 ) -> Iterable[AirbyteMessage]: 72 """ 73 This method is called when a partition generation is completed. 74 1. Remove the stream from the list of streams currently generating partitions 75 2. If the stream is done, mark it as such and return a stream status message 76 3. If there are more streams to read from, start the next partition generator 77 """ 78 stream_name = sentinel.stream.name 79 self._streams_currently_generating_partitions.remove(sentinel.stream.name) 80 # It is possible for the stream to already be done if no partitions were generated 81 # If the partition generation process was completed and there are no partitions left to process, the stream is done 82 if ( 83 self._is_stream_done(stream_name) 84 or len(self._streams_to_running_partitions[stream_name]) == 0 85 ): 86 yield from self._on_stream_is_done(stream_name) 87 if self._stream_instances_to_start_partition_generation: 88 yield self.start_next_partition_generator() # type:ignore # None may be yielded 89 90 def on_partition(self, partition: Partition) -> None: 91 """ 92 This method is called when a partition is generated. 93 1. Add the partition to the set of partitions for the stream 94 2. Log the slice if necessary 95 3. Submit the partition to the thread pool manager 96 """ 97 stream_name = partition.stream_name() 98 self._streams_to_running_partitions[stream_name].add(partition) 99 if self._slice_logger.should_log_slice_message(self._logger): 100 self._message_repository.emit_message( 101 self._slice_logger.create_slice_log_message(partition.to_slice()) 102 ) 103 self._thread_pool_manager.submit(self._partition_reader.process_partition, partition) 104 105 def on_partition_complete_sentinel( 106 self, sentinel: PartitionCompleteSentinel 107 ) -> Iterable[AirbyteMessage]: 108 """ 109 This method is called when a partition is completed. 110 1. Close the partition 111 2. If the stream is done, mark it as such and return a stream status message 112 3. Emit messages that were added to the message repository 113 """ 114 partition = sentinel.partition 115 116 try: 117 if sentinel.is_successful: 118 stream = self._stream_name_to_instance[partition.stream_name()] 119 stream.cursor.close_partition(partition) 120 except Exception as exception: 121 self._flag_exception(partition.stream_name(), exception) 122 yield AirbyteTracedException.from_exception( 123 exception, stream_descriptor=StreamDescriptor(name=partition.stream_name()) 124 ).as_sanitized_airbyte_message() 125 finally: 126 partitions_running = self._streams_to_running_partitions[partition.stream_name()] 127 if partition in partitions_running: 128 partitions_running.remove(partition) 129 # If all partitions were generated and this was the last one, the stream is done 130 if ( 131 partition.stream_name() not in self._streams_currently_generating_partitions 132 and len(partitions_running) == 0 133 ): 134 yield from self._on_stream_is_done(partition.stream_name()) 135 yield from self._message_repository.consume_queue() 136 137 def on_record(self, record: Record) -> Iterable[AirbyteMessage]: 138 """ 139 This method is called when a record is read from a partition. 140 1. Convert the record to an AirbyteMessage 141 2. If this is the first record for the stream, mark the stream as RUNNING 142 3. Increment the record counter for the stream 143 4. Ensures the cursor knows the record has been successfully emitted 144 5. Emit the message 145 6. Emit messages that were added to the message repository 146 """ 147 # Do not pass a transformer or a schema 148 # AbstractStreams are expected to return data as they are expected. 149 # Any transformation on the data should be done before reaching this point 150 message = stream_data_to_airbyte_message( 151 stream_name=record.stream_name, 152 data_or_message=record.data, 153 is_file_transfer_message=record.is_file_transfer_message, 154 ) 155 stream = self._stream_name_to_instance[record.stream_name] 156 157 if message.type == MessageType.RECORD: 158 if self._record_counter[stream.name] == 0: 159 self._logger.info(f"Marking stream {stream.name} as RUNNING") 160 yield stream_status_as_airbyte_message( 161 stream.as_airbyte_stream(), AirbyteStreamStatus.RUNNING 162 ) 163 self._record_counter[stream.name] += 1 164 stream.cursor.observe(record) 165 yield message 166 yield from self._message_repository.consume_queue() 167 168 def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMessage]: 169 """ 170 This method is called when an exception is raised. 171 1. Stop all running streams 172 2. Raise the exception 173 """ 174 self._flag_exception(exception.stream_name, exception.exception) 175 self._logger.exception( 176 f"Exception while syncing stream {exception.stream_name}", exc_info=exception.exception 177 ) 178 179 stream_descriptor = StreamDescriptor(name=exception.stream_name) 180 if isinstance(exception.exception, AirbyteTracedException): 181 yield exception.exception.as_airbyte_message(stream_descriptor=stream_descriptor) 182 else: 183 yield AirbyteTracedException.from_exception( 184 exception, stream_descriptor=stream_descriptor 185 ).as_airbyte_message() 186 187 def _flag_exception(self, stream_name: str, exception: Exception) -> None: 188 self._exceptions_per_stream_name.setdefault(stream_name, []).append(exception) 189 190 def start_next_partition_generator(self) -> Optional[AirbyteMessage]: 191 """ 192 Start the next partition generator. 193 1. Pop the next stream to read from 194 2. Submit the partition generator to the thread pool manager 195 3. Add the stream to the list of streams currently generating partitions 196 4. Return a stream status message 197 """ 198 if self._stream_instances_to_start_partition_generation: 199 stream = self._stream_instances_to_start_partition_generation.pop(0) 200 self._thread_pool_manager.submit(self._partition_enqueuer.generate_partitions, stream) 201 self._streams_currently_generating_partitions.append(stream.name) 202 self._logger.info(f"Marking stream {stream.name} as STARTED") 203 self._logger.info(f"Syncing stream: {stream.name} ") 204 return stream_status_as_airbyte_message( 205 stream.as_airbyte_stream(), 206 AirbyteStreamStatus.STARTED, 207 ) 208 else: 209 return None 210 211 def is_done(self) -> bool: 212 """ 213 This method is called to check if the sync is done. 214 The sync is done when: 215 1. There are no more streams generating partitions 216 2. There are no more streams to read from 217 3. All partitions for all streams are closed 218 """ 219 is_done = all( 220 [ 221 self._is_stream_done(stream_name) 222 for stream_name in self._stream_name_to_instance.keys() 223 ] 224 ) 225 if is_done and self._exceptions_per_stream_name: 226 error_message = generate_failed_streams_error_message(self._exceptions_per_stream_name) 227 self._logger.info(error_message) 228 # We still raise at least one exception when a stream raises an exception because the platform currently relies 229 # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error 230 # type because this combined error isn't actionable, but rather the previously emitted individual errors. 231 raise AirbyteTracedException( 232 message=error_message, 233 internal_message="Concurrent read failure", 234 failure_type=FailureType.config_error, 235 ) 236 return is_done 237 238 def _is_stream_done(self, stream_name: str) -> bool: 239 return stream_name in self._streams_done 240 241 def _on_stream_is_done(self, stream_name: str) -> Iterable[AirbyteMessage]: 242 self._logger.info( 243 f"Read {self._record_counter[stream_name]} records from {stream_name} stream" 244 ) 245 self._logger.info(f"Marking stream {stream_name} as STOPPED") 246 stream = self._stream_name_to_instance[stream_name] 247 stream.cursor.ensure_at_least_one_state_emitted() 248 yield from self._message_repository.consume_queue() 249 self._logger.info(f"Finished syncing {stream.name}") 250 self._streams_done.add(stream_name) 251 stream_status = ( 252 AirbyteStreamStatus.INCOMPLETE 253 if self._exceptions_per_stream_name.get(stream_name, []) 254 else AirbyteStreamStatus.COMPLETE 255 ) 256 yield stream_status_as_airbyte_message(stream.as_airbyte_stream(), stream_status)
ConcurrentReadProcessor( stream_instances_to_read_from: List[airbyte_cdk.sources.streams.concurrent.abstract_stream.AbstractStream], partition_enqueuer: airbyte_cdk.sources.streams.concurrent.partition_enqueuer.PartitionEnqueuer, thread_pool_manager: airbyte_cdk.sources.concurrent_source.thread_pool_manager.ThreadPoolManager, logger: logging.Logger, slice_logger: airbyte_cdk.sources.utils.slice_logger.SliceLogger, message_repository: airbyte_cdk.MessageRepository, partition_reader: airbyte_cdk.sources.streams.concurrent.partition_reader.PartitionReader)
32 def __init__( 33 self, 34 stream_instances_to_read_from: List[AbstractStream], 35 partition_enqueuer: PartitionEnqueuer, 36 thread_pool_manager: ThreadPoolManager, 37 logger: logging.Logger, 38 slice_logger: SliceLogger, 39 message_repository: MessageRepository, 40 partition_reader: PartitionReader, 41 ): 42 """ 43 This class is responsible for handling items from a concurrent stream read process. 44 :param stream_instances_to_read_from: List of streams to read from 45 :param partition_enqueuer: PartitionEnqueuer instance 46 :param thread_pool_manager: ThreadPoolManager instance 47 :param logger: Logger instance 48 :param slice_logger: SliceLogger instance 49 :param message_repository: MessageRepository instance 50 :param partition_reader: PartitionReader instance 51 """ 52 self._stream_name_to_instance = {s.name: s for s in stream_instances_to_read_from} 53 self._record_counter = {} 54 self._streams_to_running_partitions: Dict[str, Set[Partition]] = {} 55 for stream in stream_instances_to_read_from: 56 self._streams_to_running_partitions[stream.name] = set() 57 self._record_counter[stream.name] = 0 58 self._thread_pool_manager = thread_pool_manager 59 self._partition_enqueuer = partition_enqueuer 60 self._stream_instances_to_start_partition_generation = stream_instances_to_read_from 61 self._streams_currently_generating_partitions: List[str] = [] 62 self._logger = logger 63 self._slice_logger = slice_logger 64 self._message_repository = message_repository 65 self._partition_reader = partition_reader 66 self._streams_done: Set[str] = set() 67 self._exceptions_per_stream_name: dict[str, List[Exception]] = {}
This class is responsible for handling items from a concurrent stream read process.
Parameters
- stream_instances_to_read_from: List of streams to read from
- partition_enqueuer: PartitionEnqueuer instance
- thread_pool_manager: ThreadPoolManager instance
- logger: Logger instance
- slice_logger: SliceLogger instance
- message_repository: MessageRepository instance
- partition_reader: PartitionReader instance
def
on_partition_generation_completed( self, sentinel: airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel.PartitionGenerationCompletedSentinel) -> Iterable[airbyte_cdk.AirbyteMessage]:
69 def on_partition_generation_completed( 70 self, sentinel: PartitionGenerationCompletedSentinel 71 ) -> Iterable[AirbyteMessage]: 72 """ 73 This method is called when a partition generation is completed. 74 1. Remove the stream from the list of streams currently generating partitions 75 2. If the stream is done, mark it as such and return a stream status message 76 3. If there are more streams to read from, start the next partition generator 77 """ 78 stream_name = sentinel.stream.name 79 self._streams_currently_generating_partitions.remove(sentinel.stream.name) 80 # It is possible for the stream to already be done if no partitions were generated 81 # If the partition generation process was completed and there are no partitions left to process, the stream is done 82 if ( 83 self._is_stream_done(stream_name) 84 or len(self._streams_to_running_partitions[stream_name]) == 0 85 ): 86 yield from self._on_stream_is_done(stream_name) 87 if self._stream_instances_to_start_partition_generation: 88 yield self.start_next_partition_generator() # type:ignore # None may be yielded
This method is called when a partition generation is completed.
- Remove the stream from the list of streams currently generating partitions
- If the stream is done, mark it as such and return a stream status message
- If there are more streams to read from, start the next partition generator
def
on_partition( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition) -> None:
90 def on_partition(self, partition: Partition) -> None: 91 """ 92 This method is called when a partition is generated. 93 1. Add the partition to the set of partitions for the stream 94 2. Log the slice if necessary 95 3. Submit the partition to the thread pool manager 96 """ 97 stream_name = partition.stream_name() 98 self._streams_to_running_partitions[stream_name].add(partition) 99 if self._slice_logger.should_log_slice_message(self._logger): 100 self._message_repository.emit_message( 101 self._slice_logger.create_slice_log_message(partition.to_slice()) 102 ) 103 self._thread_pool_manager.submit(self._partition_reader.process_partition, partition)
This method is called when a partition is generated.
- Add the partition to the set of partitions for the stream
- Log the slice if necessary
- Submit the partition to the thread pool manager
def
on_partition_complete_sentinel( self, sentinel: airbyte_cdk.sources.streams.concurrent.partitions.types.PartitionCompleteSentinel) -> Iterable[airbyte_cdk.AirbyteMessage]:
105 def on_partition_complete_sentinel( 106 self, sentinel: PartitionCompleteSentinel 107 ) -> Iterable[AirbyteMessage]: 108 """ 109 This method is called when a partition is completed. 110 1. Close the partition 111 2. If the stream is done, mark it as such and return a stream status message 112 3. Emit messages that were added to the message repository 113 """ 114 partition = sentinel.partition 115 116 try: 117 if sentinel.is_successful: 118 stream = self._stream_name_to_instance[partition.stream_name()] 119 stream.cursor.close_partition(partition) 120 except Exception as exception: 121 self._flag_exception(partition.stream_name(), exception) 122 yield AirbyteTracedException.from_exception( 123 exception, stream_descriptor=StreamDescriptor(name=partition.stream_name()) 124 ).as_sanitized_airbyte_message() 125 finally: 126 partitions_running = self._streams_to_running_partitions[partition.stream_name()] 127 if partition in partitions_running: 128 partitions_running.remove(partition) 129 # If all partitions were generated and this was the last one, the stream is done 130 if ( 131 partition.stream_name() not in self._streams_currently_generating_partitions 132 and len(partitions_running) == 0 133 ): 134 yield from self._on_stream_is_done(partition.stream_name()) 135 yield from self._message_repository.consume_queue()
This method is called when a partition is completed.
- Close the partition
- If the stream is done, mark it as such and return a stream status message
- Emit messages that were added to the message repository
137 def on_record(self, record: Record) -> Iterable[AirbyteMessage]: 138 """ 139 This method is called when a record is read from a partition. 140 1. Convert the record to an AirbyteMessage 141 2. If this is the first record for the stream, mark the stream as RUNNING 142 3. Increment the record counter for the stream 143 4. Ensures the cursor knows the record has been successfully emitted 144 5. Emit the message 145 6. Emit messages that were added to the message repository 146 """ 147 # Do not pass a transformer or a schema 148 # AbstractStreams are expected to return data as they are expected. 149 # Any transformation on the data should be done before reaching this point 150 message = stream_data_to_airbyte_message( 151 stream_name=record.stream_name, 152 data_or_message=record.data, 153 is_file_transfer_message=record.is_file_transfer_message, 154 ) 155 stream = self._stream_name_to_instance[record.stream_name] 156 157 if message.type == MessageType.RECORD: 158 if self._record_counter[stream.name] == 0: 159 self._logger.info(f"Marking stream {stream.name} as RUNNING") 160 yield stream_status_as_airbyte_message( 161 stream.as_airbyte_stream(), AirbyteStreamStatus.RUNNING 162 ) 163 self._record_counter[stream.name] += 1 164 stream.cursor.observe(record) 165 yield message 166 yield from self._message_repository.consume_queue()
This method is called when a record is read from a partition.
- Convert the record to an AirbyteMessage
- If this is the first record for the stream, mark the stream as RUNNING
- Increment the record counter for the stream
- Ensures the cursor knows the record has been successfully emitted
- Emit the message
- Emit messages that were added to the message repository
def
on_exception( self, exception: airbyte_cdk.sources.concurrent_source.stream_thread_exception.StreamThreadException) -> Iterable[airbyte_cdk.AirbyteMessage]:
168 def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMessage]: 169 """ 170 This method is called when an exception is raised. 171 1. Stop all running streams 172 2. Raise the exception 173 """ 174 self._flag_exception(exception.stream_name, exception.exception) 175 self._logger.exception( 176 f"Exception while syncing stream {exception.stream_name}", exc_info=exception.exception 177 ) 178 179 stream_descriptor = StreamDescriptor(name=exception.stream_name) 180 if isinstance(exception.exception, AirbyteTracedException): 181 yield exception.exception.as_airbyte_message(stream_descriptor=stream_descriptor) 182 else: 183 yield AirbyteTracedException.from_exception( 184 exception, stream_descriptor=stream_descriptor 185 ).as_airbyte_message()
This method is called when an exception is raised.
- Stop all running streams
- Raise the exception
190 def start_next_partition_generator(self) -> Optional[AirbyteMessage]: 191 """ 192 Start the next partition generator. 193 1. Pop the next stream to read from 194 2. Submit the partition generator to the thread pool manager 195 3. Add the stream to the list of streams currently generating partitions 196 4. Return a stream status message 197 """ 198 if self._stream_instances_to_start_partition_generation: 199 stream = self._stream_instances_to_start_partition_generation.pop(0) 200 self._thread_pool_manager.submit(self._partition_enqueuer.generate_partitions, stream) 201 self._streams_currently_generating_partitions.append(stream.name) 202 self._logger.info(f"Marking stream {stream.name} as STARTED") 203 self._logger.info(f"Syncing stream: {stream.name} ") 204 return stream_status_as_airbyte_message( 205 stream.as_airbyte_stream(), 206 AirbyteStreamStatus.STARTED, 207 ) 208 else: 209 return None
Start the next partition generator.
- Pop the next stream to read from
- Submit the partition generator to the thread pool manager
- Add the stream to the list of streams currently generating partitions
- Return a stream status message
def
is_done(self) -> bool:
211 def is_done(self) -> bool: 212 """ 213 This method is called to check if the sync is done. 214 The sync is done when: 215 1. There are no more streams generating partitions 216 2. There are no more streams to read from 217 3. All partitions for all streams are closed 218 """ 219 is_done = all( 220 [ 221 self._is_stream_done(stream_name) 222 for stream_name in self._stream_name_to_instance.keys() 223 ] 224 ) 225 if is_done and self._exceptions_per_stream_name: 226 error_message = generate_failed_streams_error_message(self._exceptions_per_stream_name) 227 self._logger.info(error_message) 228 # We still raise at least one exception when a stream raises an exception because the platform currently relies 229 # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error 230 # type because this combined error isn't actionable, but rather the previously emitted individual errors. 231 raise AirbyteTracedException( 232 message=error_message, 233 internal_message="Concurrent read failure", 234 failure_type=FailureType.config_error, 235 ) 236 return is_done
This method is called to check if the sync is done. The sync is done when:
- There are no more streams generating partitions
- There are no more streams to read from
- All partitions for all streams are closed