airbyte_cdk.sources.concurrent_source.concurrent_read_processor
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4import logging 5import os 6from typing import Dict, Iterable, List, Optional, Set 7 8from airbyte_cdk.exception_handler import generate_failed_streams_error_message 9from airbyte_cdk.models import AirbyteMessage, AirbyteStreamStatus, FailureType, StreamDescriptor 10from airbyte_cdk.models import Type as MessageType 11from airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel import ( 12 PartitionGenerationCompletedSentinel, 13) 14from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException 15from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager 16from airbyte_cdk.sources.message import MessageRepository 17from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream 18from airbyte_cdk.sources.streams.concurrent.partition_enqueuer import PartitionEnqueuer 19from airbyte_cdk.sources.streams.concurrent.partition_reader import PartitionReader 20from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition 21from airbyte_cdk.sources.streams.concurrent.partitions.types import PartitionCompleteSentinel 22from airbyte_cdk.sources.types import Record 23from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message 24from airbyte_cdk.sources.utils.slice_logger import SliceLogger 25from airbyte_cdk.utils import AirbyteTracedException 26from airbyte_cdk.utils.stream_status_utils import ( 27 as_airbyte_message as stream_status_as_airbyte_message, 28) 29 30 31class ConcurrentReadProcessor: 32 def __init__( 33 self, 34 stream_instances_to_read_from: List[AbstractStream], 35 partition_enqueuer: PartitionEnqueuer, 36 thread_pool_manager: ThreadPoolManager, 37 logger: logging.Logger, 38 slice_logger: SliceLogger, 39 message_repository: MessageRepository, 40 partition_reader: PartitionReader, 41 ): 42 """ 43 This class is responsible for handling items from a concurrent stream read process. 44 :param stream_instances_to_read_from: List of streams to read from 45 :param partition_enqueuer: PartitionEnqueuer instance 46 :param thread_pool_manager: ThreadPoolManager instance 47 :param logger: Logger instance 48 :param slice_logger: SliceLogger instance 49 :param message_repository: MessageRepository instance 50 :param partition_reader: PartitionReader instance 51 """ 52 self._stream_name_to_instance = {s.name: s for s in stream_instances_to_read_from} 53 self._record_counter = {} 54 self._streams_to_running_partitions: Dict[str, Set[Partition]] = {} 55 for stream in stream_instances_to_read_from: 56 self._streams_to_running_partitions[stream.name] = set() 57 self._record_counter[stream.name] = 0 58 self._thread_pool_manager = thread_pool_manager 59 self._partition_enqueuer = partition_enqueuer 60 self._stream_instances_to_start_partition_generation = stream_instances_to_read_from 61 self._streams_currently_generating_partitions: List[str] = [] 62 self._logger = logger 63 self._slice_logger = slice_logger 64 self._message_repository = message_repository 65 self._partition_reader = partition_reader 66 self._streams_done: Set[str] = set() 67 self._exceptions_per_stream_name: dict[str, List[Exception]] = {} 68 69 def on_partition_generation_completed( 70 self, sentinel: PartitionGenerationCompletedSentinel 71 ) -> Iterable[AirbyteMessage]: 72 """ 73 This method is called when a partition generation is completed. 74 1. Remove the stream from the list of streams currently generating partitions 75 2. If the stream is done, mark it as such and return a stream status message 76 3. If there are more streams to read from, start the next partition generator 77 """ 78 stream_name = sentinel.stream.name 79 self._streams_currently_generating_partitions.remove(sentinel.stream.name) 80 # It is possible for the stream to already be done if no partitions were generated 81 # If the partition generation process was completed and there are no partitions left to process, the stream is done 82 if ( 83 self._is_stream_done(stream_name) 84 or len(self._streams_to_running_partitions[stream_name]) == 0 85 ): 86 yield from self._on_stream_is_done(stream_name) 87 if self._stream_instances_to_start_partition_generation: 88 yield self.start_next_partition_generator() # type:ignore # None may be yielded 89 90 def on_partition(self, partition: Partition) -> None: 91 """ 92 This method is called when a partition is generated. 93 1. Add the partition to the set of partitions for the stream 94 2. Log the slice if necessary 95 3. Submit the partition to the thread pool manager 96 """ 97 stream_name = partition.stream_name() 98 self._streams_to_running_partitions[stream_name].add(partition) 99 cursor = self._stream_name_to_instance[stream_name].cursor 100 if self._slice_logger.should_log_slice_message(self._logger): 101 self._message_repository.emit_message( 102 self._slice_logger.create_slice_log_message(partition.to_slice()) 103 ) 104 self._thread_pool_manager.submit( 105 self._partition_reader.process_partition, partition, cursor 106 ) 107 108 def on_partition_complete_sentinel( 109 self, sentinel: PartitionCompleteSentinel 110 ) -> Iterable[AirbyteMessage]: 111 """ 112 This method is called when a partition is completed. 113 1. Close the partition 114 2. If the stream is done, mark it as such and return a stream status message 115 3. Emit messages that were added to the message repository 116 """ 117 partition = sentinel.partition 118 119 partitions_running = self._streams_to_running_partitions[partition.stream_name()] 120 if partition in partitions_running: 121 partitions_running.remove(partition) 122 # If all partitions were generated and this was the last one, the stream is done 123 if ( 124 partition.stream_name() not in self._streams_currently_generating_partitions 125 and len(partitions_running) == 0 126 ): 127 yield from self._on_stream_is_done(partition.stream_name()) 128 yield from self._message_repository.consume_queue() 129 130 def on_record(self, record: Record) -> Iterable[AirbyteMessage]: 131 """ 132 This method is called when a record is read from a partition. 133 1. Convert the record to an AirbyteMessage 134 2. If this is the first record for the stream, mark the stream as RUNNING 135 3. Increment the record counter for the stream 136 4. Ensures the cursor knows the record has been successfully emitted 137 5. Emit the message 138 6. Emit messages that were added to the message repository 139 """ 140 # Do not pass a transformer or a schema 141 # AbstractStreams are expected to return data as they are expected. 142 # Any transformation on the data should be done before reaching this point 143 message = stream_data_to_airbyte_message( 144 stream_name=record.stream_name, 145 data_or_message=record.data, 146 file_reference=record.file_reference, 147 ) 148 stream = self._stream_name_to_instance[record.stream_name] 149 150 if message.type == MessageType.RECORD: 151 if self._record_counter[stream.name] == 0: 152 self._logger.info(f"Marking stream {stream.name} as RUNNING") 153 yield stream_status_as_airbyte_message( 154 stream.as_airbyte_stream(), AirbyteStreamStatus.RUNNING 155 ) 156 self._record_counter[stream.name] += 1 157 yield message 158 yield from self._message_repository.consume_queue() 159 160 def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMessage]: 161 """ 162 This method is called when an exception is raised. 163 1. Stop all running streams 164 2. Raise the exception 165 """ 166 self._flag_exception(exception.stream_name, exception.exception) 167 self._logger.exception( 168 f"Exception while syncing stream {exception.stream_name}", exc_info=exception.exception 169 ) 170 171 stream_descriptor = StreamDescriptor(name=exception.stream_name) 172 if isinstance(exception.exception, AirbyteTracedException): 173 yield exception.exception.as_airbyte_message(stream_descriptor=stream_descriptor) 174 else: 175 yield AirbyteTracedException.from_exception( 176 exception, stream_descriptor=stream_descriptor 177 ).as_airbyte_message() 178 179 def _flag_exception(self, stream_name: str, exception: Exception) -> None: 180 self._exceptions_per_stream_name.setdefault(stream_name, []).append(exception) 181 182 def start_next_partition_generator(self) -> Optional[AirbyteMessage]: 183 """ 184 Start the next partition generator. 185 1. Pop the next stream to read from 186 2. Submit the partition generator to the thread pool manager 187 3. Add the stream to the list of streams currently generating partitions 188 4. Return a stream status message 189 """ 190 if self._stream_instances_to_start_partition_generation: 191 stream = self._stream_instances_to_start_partition_generation.pop(0) 192 self._thread_pool_manager.submit(self._partition_enqueuer.generate_partitions, stream) 193 self._streams_currently_generating_partitions.append(stream.name) 194 self._logger.info(f"Marking stream {stream.name} as STARTED") 195 self._logger.info(f"Syncing stream: {stream.name} ") 196 return stream_status_as_airbyte_message( 197 stream.as_airbyte_stream(), 198 AirbyteStreamStatus.STARTED, 199 ) 200 else: 201 return None 202 203 def is_done(self) -> bool: 204 """ 205 This method is called to check if the sync is done. 206 The sync is done when: 207 1. There are no more streams generating partitions 208 2. There are no more streams to read from 209 3. All partitions for all streams are closed 210 """ 211 is_done = all( 212 [ 213 self._is_stream_done(stream_name) 214 for stream_name in self._stream_name_to_instance.keys() 215 ] 216 ) 217 if is_done and self._exceptions_per_stream_name: 218 error_message = generate_failed_streams_error_message(self._exceptions_per_stream_name) 219 self._logger.info(error_message) 220 # We still raise at least one exception when a stream raises an exception because the platform currently relies 221 # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error 222 # type because this combined error isn't actionable, but rather the previously emitted individual errors. 223 raise AirbyteTracedException( 224 message=error_message, 225 internal_message="Concurrent read failure", 226 failure_type=FailureType.config_error, 227 ) 228 return is_done 229 230 def _is_stream_done(self, stream_name: str) -> bool: 231 return stream_name in self._streams_done 232 233 def _on_stream_is_done(self, stream_name: str) -> Iterable[AirbyteMessage]: 234 self._logger.info( 235 f"Read {self._record_counter[stream_name]} records from {stream_name} stream" 236 ) 237 self._logger.info(f"Marking stream {stream_name} as STOPPED") 238 stream = self._stream_name_to_instance[stream_name] 239 stream.cursor.ensure_at_least_one_state_emitted() 240 yield from self._message_repository.consume_queue() 241 self._logger.info(f"Finished syncing {stream.name}") 242 self._streams_done.add(stream_name) 243 stream_status = ( 244 AirbyteStreamStatus.INCOMPLETE 245 if self._exceptions_per_stream_name.get(stream_name, []) 246 else AirbyteStreamStatus.COMPLETE 247 ) 248 yield stream_status_as_airbyte_message(stream.as_airbyte_stream(), stream_status)
class
ConcurrentReadProcessor:
32class ConcurrentReadProcessor: 33 def __init__( 34 self, 35 stream_instances_to_read_from: List[AbstractStream], 36 partition_enqueuer: PartitionEnqueuer, 37 thread_pool_manager: ThreadPoolManager, 38 logger: logging.Logger, 39 slice_logger: SliceLogger, 40 message_repository: MessageRepository, 41 partition_reader: PartitionReader, 42 ): 43 """ 44 This class is responsible for handling items from a concurrent stream read process. 45 :param stream_instances_to_read_from: List of streams to read from 46 :param partition_enqueuer: PartitionEnqueuer instance 47 :param thread_pool_manager: ThreadPoolManager instance 48 :param logger: Logger instance 49 :param slice_logger: SliceLogger instance 50 :param message_repository: MessageRepository instance 51 :param partition_reader: PartitionReader instance 52 """ 53 self._stream_name_to_instance = {s.name: s for s in stream_instances_to_read_from} 54 self._record_counter = {} 55 self._streams_to_running_partitions: Dict[str, Set[Partition]] = {} 56 for stream in stream_instances_to_read_from: 57 self._streams_to_running_partitions[stream.name] = set() 58 self._record_counter[stream.name] = 0 59 self._thread_pool_manager = thread_pool_manager 60 self._partition_enqueuer = partition_enqueuer 61 self._stream_instances_to_start_partition_generation = stream_instances_to_read_from 62 self._streams_currently_generating_partitions: List[str] = [] 63 self._logger = logger 64 self._slice_logger = slice_logger 65 self._message_repository = message_repository 66 self._partition_reader = partition_reader 67 self._streams_done: Set[str] = set() 68 self._exceptions_per_stream_name: dict[str, List[Exception]] = {} 69 70 def on_partition_generation_completed( 71 self, sentinel: PartitionGenerationCompletedSentinel 72 ) -> Iterable[AirbyteMessage]: 73 """ 74 This method is called when a partition generation is completed. 75 1. Remove the stream from the list of streams currently generating partitions 76 2. If the stream is done, mark it as such and return a stream status message 77 3. If there are more streams to read from, start the next partition generator 78 """ 79 stream_name = sentinel.stream.name 80 self._streams_currently_generating_partitions.remove(sentinel.stream.name) 81 # It is possible for the stream to already be done if no partitions were generated 82 # If the partition generation process was completed and there are no partitions left to process, the stream is done 83 if ( 84 self._is_stream_done(stream_name) 85 or len(self._streams_to_running_partitions[stream_name]) == 0 86 ): 87 yield from self._on_stream_is_done(stream_name) 88 if self._stream_instances_to_start_partition_generation: 89 yield self.start_next_partition_generator() # type:ignore # None may be yielded 90 91 def on_partition(self, partition: Partition) -> None: 92 """ 93 This method is called when a partition is generated. 94 1. Add the partition to the set of partitions for the stream 95 2. Log the slice if necessary 96 3. Submit the partition to the thread pool manager 97 """ 98 stream_name = partition.stream_name() 99 self._streams_to_running_partitions[stream_name].add(partition) 100 cursor = self._stream_name_to_instance[stream_name].cursor 101 if self._slice_logger.should_log_slice_message(self._logger): 102 self._message_repository.emit_message( 103 self._slice_logger.create_slice_log_message(partition.to_slice()) 104 ) 105 self._thread_pool_manager.submit( 106 self._partition_reader.process_partition, partition, cursor 107 ) 108 109 def on_partition_complete_sentinel( 110 self, sentinel: PartitionCompleteSentinel 111 ) -> Iterable[AirbyteMessage]: 112 """ 113 This method is called when a partition is completed. 114 1. Close the partition 115 2. If the stream is done, mark it as such and return a stream status message 116 3. Emit messages that were added to the message repository 117 """ 118 partition = sentinel.partition 119 120 partitions_running = self._streams_to_running_partitions[partition.stream_name()] 121 if partition in partitions_running: 122 partitions_running.remove(partition) 123 # If all partitions were generated and this was the last one, the stream is done 124 if ( 125 partition.stream_name() not in self._streams_currently_generating_partitions 126 and len(partitions_running) == 0 127 ): 128 yield from self._on_stream_is_done(partition.stream_name()) 129 yield from self._message_repository.consume_queue() 130 131 def on_record(self, record: Record) -> Iterable[AirbyteMessage]: 132 """ 133 This method is called when a record is read from a partition. 134 1. Convert the record to an AirbyteMessage 135 2. If this is the first record for the stream, mark the stream as RUNNING 136 3. Increment the record counter for the stream 137 4. Ensures the cursor knows the record has been successfully emitted 138 5. Emit the message 139 6. Emit messages that were added to the message repository 140 """ 141 # Do not pass a transformer or a schema 142 # AbstractStreams are expected to return data as they are expected. 143 # Any transformation on the data should be done before reaching this point 144 message = stream_data_to_airbyte_message( 145 stream_name=record.stream_name, 146 data_or_message=record.data, 147 file_reference=record.file_reference, 148 ) 149 stream = self._stream_name_to_instance[record.stream_name] 150 151 if message.type == MessageType.RECORD: 152 if self._record_counter[stream.name] == 0: 153 self._logger.info(f"Marking stream {stream.name} as RUNNING") 154 yield stream_status_as_airbyte_message( 155 stream.as_airbyte_stream(), AirbyteStreamStatus.RUNNING 156 ) 157 self._record_counter[stream.name] += 1 158 yield message 159 yield from self._message_repository.consume_queue() 160 161 def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMessage]: 162 """ 163 This method is called when an exception is raised. 164 1. Stop all running streams 165 2. Raise the exception 166 """ 167 self._flag_exception(exception.stream_name, exception.exception) 168 self._logger.exception( 169 f"Exception while syncing stream {exception.stream_name}", exc_info=exception.exception 170 ) 171 172 stream_descriptor = StreamDescriptor(name=exception.stream_name) 173 if isinstance(exception.exception, AirbyteTracedException): 174 yield exception.exception.as_airbyte_message(stream_descriptor=stream_descriptor) 175 else: 176 yield AirbyteTracedException.from_exception( 177 exception, stream_descriptor=stream_descriptor 178 ).as_airbyte_message() 179 180 def _flag_exception(self, stream_name: str, exception: Exception) -> None: 181 self._exceptions_per_stream_name.setdefault(stream_name, []).append(exception) 182 183 def start_next_partition_generator(self) -> Optional[AirbyteMessage]: 184 """ 185 Start the next partition generator. 186 1. Pop the next stream to read from 187 2. Submit the partition generator to the thread pool manager 188 3. Add the stream to the list of streams currently generating partitions 189 4. Return a stream status message 190 """ 191 if self._stream_instances_to_start_partition_generation: 192 stream = self._stream_instances_to_start_partition_generation.pop(0) 193 self._thread_pool_manager.submit(self._partition_enqueuer.generate_partitions, stream) 194 self._streams_currently_generating_partitions.append(stream.name) 195 self._logger.info(f"Marking stream {stream.name} as STARTED") 196 self._logger.info(f"Syncing stream: {stream.name} ") 197 return stream_status_as_airbyte_message( 198 stream.as_airbyte_stream(), 199 AirbyteStreamStatus.STARTED, 200 ) 201 else: 202 return None 203 204 def is_done(self) -> bool: 205 """ 206 This method is called to check if the sync is done. 207 The sync is done when: 208 1. There are no more streams generating partitions 209 2. There are no more streams to read from 210 3. All partitions for all streams are closed 211 """ 212 is_done = all( 213 [ 214 self._is_stream_done(stream_name) 215 for stream_name in self._stream_name_to_instance.keys() 216 ] 217 ) 218 if is_done and self._exceptions_per_stream_name: 219 error_message = generate_failed_streams_error_message(self._exceptions_per_stream_name) 220 self._logger.info(error_message) 221 # We still raise at least one exception when a stream raises an exception because the platform currently relies 222 # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error 223 # type because this combined error isn't actionable, but rather the previously emitted individual errors. 224 raise AirbyteTracedException( 225 message=error_message, 226 internal_message="Concurrent read failure", 227 failure_type=FailureType.config_error, 228 ) 229 return is_done 230 231 def _is_stream_done(self, stream_name: str) -> bool: 232 return stream_name in self._streams_done 233 234 def _on_stream_is_done(self, stream_name: str) -> Iterable[AirbyteMessage]: 235 self._logger.info( 236 f"Read {self._record_counter[stream_name]} records from {stream_name} stream" 237 ) 238 self._logger.info(f"Marking stream {stream_name} as STOPPED") 239 stream = self._stream_name_to_instance[stream_name] 240 stream.cursor.ensure_at_least_one_state_emitted() 241 yield from self._message_repository.consume_queue() 242 self._logger.info(f"Finished syncing {stream.name}") 243 self._streams_done.add(stream_name) 244 stream_status = ( 245 AirbyteStreamStatus.INCOMPLETE 246 if self._exceptions_per_stream_name.get(stream_name, []) 247 else AirbyteStreamStatus.COMPLETE 248 ) 249 yield stream_status_as_airbyte_message(stream.as_airbyte_stream(), stream_status)
ConcurrentReadProcessor( stream_instances_to_read_from: List[airbyte_cdk.sources.streams.concurrent.abstract_stream.AbstractStream], partition_enqueuer: airbyte_cdk.sources.streams.concurrent.partition_enqueuer.PartitionEnqueuer, thread_pool_manager: airbyte_cdk.sources.concurrent_source.thread_pool_manager.ThreadPoolManager, logger: logging.Logger, slice_logger: airbyte_cdk.sources.utils.slice_logger.SliceLogger, message_repository: airbyte_cdk.MessageRepository, partition_reader: airbyte_cdk.sources.streams.concurrent.partition_reader.PartitionReader)
33 def __init__( 34 self, 35 stream_instances_to_read_from: List[AbstractStream], 36 partition_enqueuer: PartitionEnqueuer, 37 thread_pool_manager: ThreadPoolManager, 38 logger: logging.Logger, 39 slice_logger: SliceLogger, 40 message_repository: MessageRepository, 41 partition_reader: PartitionReader, 42 ): 43 """ 44 This class is responsible for handling items from a concurrent stream read process. 45 :param stream_instances_to_read_from: List of streams to read from 46 :param partition_enqueuer: PartitionEnqueuer instance 47 :param thread_pool_manager: ThreadPoolManager instance 48 :param logger: Logger instance 49 :param slice_logger: SliceLogger instance 50 :param message_repository: MessageRepository instance 51 :param partition_reader: PartitionReader instance 52 """ 53 self._stream_name_to_instance = {s.name: s for s in stream_instances_to_read_from} 54 self._record_counter = {} 55 self._streams_to_running_partitions: Dict[str, Set[Partition]] = {} 56 for stream in stream_instances_to_read_from: 57 self._streams_to_running_partitions[stream.name] = set() 58 self._record_counter[stream.name] = 0 59 self._thread_pool_manager = thread_pool_manager 60 self._partition_enqueuer = partition_enqueuer 61 self._stream_instances_to_start_partition_generation = stream_instances_to_read_from 62 self._streams_currently_generating_partitions: List[str] = [] 63 self._logger = logger 64 self._slice_logger = slice_logger 65 self._message_repository = message_repository 66 self._partition_reader = partition_reader 67 self._streams_done: Set[str] = set() 68 self._exceptions_per_stream_name: dict[str, List[Exception]] = {}
This class is responsible for handling items from a concurrent stream read process.
Parameters
- stream_instances_to_read_from: List of streams to read from
- partition_enqueuer: PartitionEnqueuer instance
- thread_pool_manager: ThreadPoolManager instance
- logger: Logger instance
- slice_logger: SliceLogger instance
- message_repository: MessageRepository instance
- partition_reader: PartitionReader instance
def
on_partition_generation_completed( self, sentinel: airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel.PartitionGenerationCompletedSentinel) -> Iterable[airbyte_cdk.AirbyteMessage]:
70 def on_partition_generation_completed( 71 self, sentinel: PartitionGenerationCompletedSentinel 72 ) -> Iterable[AirbyteMessage]: 73 """ 74 This method is called when a partition generation is completed. 75 1. Remove the stream from the list of streams currently generating partitions 76 2. If the stream is done, mark it as such and return a stream status message 77 3. If there are more streams to read from, start the next partition generator 78 """ 79 stream_name = sentinel.stream.name 80 self._streams_currently_generating_partitions.remove(sentinel.stream.name) 81 # It is possible for the stream to already be done if no partitions were generated 82 # If the partition generation process was completed and there are no partitions left to process, the stream is done 83 if ( 84 self._is_stream_done(stream_name) 85 or len(self._streams_to_running_partitions[stream_name]) == 0 86 ): 87 yield from self._on_stream_is_done(stream_name) 88 if self._stream_instances_to_start_partition_generation: 89 yield self.start_next_partition_generator() # type:ignore # None may be yielded
This method is called when a partition generation is completed.
- Remove the stream from the list of streams currently generating partitions
- If the stream is done, mark it as such and return a stream status message
- If there are more streams to read from, start the next partition generator
def
on_partition( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition) -> None:
91 def on_partition(self, partition: Partition) -> None: 92 """ 93 This method is called when a partition is generated. 94 1. Add the partition to the set of partitions for the stream 95 2. Log the slice if necessary 96 3. Submit the partition to the thread pool manager 97 """ 98 stream_name = partition.stream_name() 99 self._streams_to_running_partitions[stream_name].add(partition) 100 cursor = self._stream_name_to_instance[stream_name].cursor 101 if self._slice_logger.should_log_slice_message(self._logger): 102 self._message_repository.emit_message( 103 self._slice_logger.create_slice_log_message(partition.to_slice()) 104 ) 105 self._thread_pool_manager.submit( 106 self._partition_reader.process_partition, partition, cursor 107 )
This method is called when a partition is generated.
- Add the partition to the set of partitions for the stream
- Log the slice if necessary
- Submit the partition to the thread pool manager
def
on_partition_complete_sentinel( self, sentinel: airbyte_cdk.sources.streams.concurrent.partitions.types.PartitionCompleteSentinel) -> Iterable[airbyte_cdk.AirbyteMessage]:
109 def on_partition_complete_sentinel( 110 self, sentinel: PartitionCompleteSentinel 111 ) -> Iterable[AirbyteMessage]: 112 """ 113 This method is called when a partition is completed. 114 1. Close the partition 115 2. If the stream is done, mark it as such and return a stream status message 116 3. Emit messages that were added to the message repository 117 """ 118 partition = sentinel.partition 119 120 partitions_running = self._streams_to_running_partitions[partition.stream_name()] 121 if partition in partitions_running: 122 partitions_running.remove(partition) 123 # If all partitions were generated and this was the last one, the stream is done 124 if ( 125 partition.stream_name() not in self._streams_currently_generating_partitions 126 and len(partitions_running) == 0 127 ): 128 yield from self._on_stream_is_done(partition.stream_name()) 129 yield from self._message_repository.consume_queue()
This method is called when a partition is completed.
- Close the partition
- If the stream is done, mark it as such and return a stream status message
- Emit messages that were added to the message repository
131 def on_record(self, record: Record) -> Iterable[AirbyteMessage]: 132 """ 133 This method is called when a record is read from a partition. 134 1. Convert the record to an AirbyteMessage 135 2. If this is the first record for the stream, mark the stream as RUNNING 136 3. Increment the record counter for the stream 137 4. Ensures the cursor knows the record has been successfully emitted 138 5. Emit the message 139 6. Emit messages that were added to the message repository 140 """ 141 # Do not pass a transformer or a schema 142 # AbstractStreams are expected to return data as they are expected. 143 # Any transformation on the data should be done before reaching this point 144 message = stream_data_to_airbyte_message( 145 stream_name=record.stream_name, 146 data_or_message=record.data, 147 file_reference=record.file_reference, 148 ) 149 stream = self._stream_name_to_instance[record.stream_name] 150 151 if message.type == MessageType.RECORD: 152 if self._record_counter[stream.name] == 0: 153 self._logger.info(f"Marking stream {stream.name} as RUNNING") 154 yield stream_status_as_airbyte_message( 155 stream.as_airbyte_stream(), AirbyteStreamStatus.RUNNING 156 ) 157 self._record_counter[stream.name] += 1 158 yield message 159 yield from self._message_repository.consume_queue()
This method is called when a record is read from a partition.
- Convert the record to an AirbyteMessage
- If this is the first record for the stream, mark the stream as RUNNING
- Increment the record counter for the stream
- Ensures the cursor knows the record has been successfully emitted
- Emit the message
- Emit messages that were added to the message repository
def
on_exception( self, exception: airbyte_cdk.sources.concurrent_source.stream_thread_exception.StreamThreadException) -> Iterable[airbyte_cdk.AirbyteMessage]:
161 def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMessage]: 162 """ 163 This method is called when an exception is raised. 164 1. Stop all running streams 165 2. Raise the exception 166 """ 167 self._flag_exception(exception.stream_name, exception.exception) 168 self._logger.exception( 169 f"Exception while syncing stream {exception.stream_name}", exc_info=exception.exception 170 ) 171 172 stream_descriptor = StreamDescriptor(name=exception.stream_name) 173 if isinstance(exception.exception, AirbyteTracedException): 174 yield exception.exception.as_airbyte_message(stream_descriptor=stream_descriptor) 175 else: 176 yield AirbyteTracedException.from_exception( 177 exception, stream_descriptor=stream_descriptor 178 ).as_airbyte_message()
This method is called when an exception is raised.
- Stop all running streams
- Raise the exception
183 def start_next_partition_generator(self) -> Optional[AirbyteMessage]: 184 """ 185 Start the next partition generator. 186 1. Pop the next stream to read from 187 2. Submit the partition generator to the thread pool manager 188 3. Add the stream to the list of streams currently generating partitions 189 4. Return a stream status message 190 """ 191 if self._stream_instances_to_start_partition_generation: 192 stream = self._stream_instances_to_start_partition_generation.pop(0) 193 self._thread_pool_manager.submit(self._partition_enqueuer.generate_partitions, stream) 194 self._streams_currently_generating_partitions.append(stream.name) 195 self._logger.info(f"Marking stream {stream.name} as STARTED") 196 self._logger.info(f"Syncing stream: {stream.name} ") 197 return stream_status_as_airbyte_message( 198 stream.as_airbyte_stream(), 199 AirbyteStreamStatus.STARTED, 200 ) 201 else: 202 return None
Start the next partition generator.
- Pop the next stream to read from
- Submit the partition generator to the thread pool manager
- Add the stream to the list of streams currently generating partitions
- Return a stream status message
def
is_done(self) -> bool:
204 def is_done(self) -> bool: 205 """ 206 This method is called to check if the sync is done. 207 The sync is done when: 208 1. There are no more streams generating partitions 209 2. There are no more streams to read from 210 3. All partitions for all streams are closed 211 """ 212 is_done = all( 213 [ 214 self._is_stream_done(stream_name) 215 for stream_name in self._stream_name_to_instance.keys() 216 ] 217 ) 218 if is_done and self._exceptions_per_stream_name: 219 error_message = generate_failed_streams_error_message(self._exceptions_per_stream_name) 220 self._logger.info(error_message) 221 # We still raise at least one exception when a stream raises an exception because the platform currently relies 222 # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error 223 # type because this combined error isn't actionable, but rather the previously emitted individual errors. 224 raise AirbyteTracedException( 225 message=error_message, 226 internal_message="Concurrent read failure", 227 failure_type=FailureType.config_error, 228 ) 229 return is_done
This method is called to check if the sync is done. The sync is done when:
- There are no more streams generating partitions
- There are no more streams to read from
- All partitions for all streams are closed