airbyte_cdk.sources.streams.concurrent.partition_reader
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2 3import logging 4from queue import Queue 5from typing import Optional 6 7from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException 8from airbyte_cdk.sources.message.repository import MessageRepository 9from airbyte_cdk.sources.streams.concurrent.cursor import Cursor 10from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition 11from airbyte_cdk.sources.streams.concurrent.partitions.types import ( 12 PartitionCompleteSentinel, 13 QueueItem, 14) 15from airbyte_cdk.sources.utils.slice_logger import SliceLogger 16 17 18# Since moving all the connector builder workflow to the concurrent CDK which required correct ordering 19# of grouping log messages onto the main write thread using the ConcurrentMessageRepository, this 20# separate flow and class that was used to log slices onto this partition's message_repository 21# should just be replaced by emitting messages directly onto the repository instead of an intermediary. 22class PartitionLogger: 23 """ 24 Helper class that provides a mechanism for passing a log message onto the current 25 partitions message repository 26 """ 27 28 def __init__( 29 self, 30 slice_logger: SliceLogger, 31 logger: logging.Logger, 32 message_repository: MessageRepository, 33 ): 34 self._slice_logger = slice_logger 35 self._logger = logger 36 self._message_repository = message_repository 37 38 def log(self, partition: Partition) -> None: 39 if self._slice_logger.should_log_slice_message(self._logger): 40 self._message_repository.emit_message( 41 self._slice_logger.create_slice_log_message(partition.to_slice()) 42 ) 43 44 45class PartitionReader: 46 """ 47 Generates records from a partition and puts them in a queue. 48 """ 49 50 _IS_SUCCESSFUL = True 51 52 def __init__( 53 self, 54 queue: Queue[QueueItem], 55 partition_logger: Optional[PartitionLogger] = None, 56 ) -> None: 57 """ 58 :param queue: The queue to put the records in. 59 """ 60 self._queue = queue 61 self._partition_logger = partition_logger 62 63 def process_partition(self, partition: Partition, cursor: Cursor) -> None: 64 """ 65 Process a partition and put the records in the output queue. 66 When all the partitions are added to the queue, a sentinel is added to the queue to indicate that all the partitions have been generated. 67 68 If an exception is encountered, the exception will be caught and put in the queue. This is very important because if we don't, the 69 main thread will have no way to know that something when wrong and will wait until the timeout is reached 70 71 This method is meant to be called from a thread. 72 :param partition: The partition to read data from 73 :return: None 74 """ 75 try: 76 if self._partition_logger: 77 self._partition_logger.log(partition) 78 79 for record in partition.read(): 80 self._queue.put(record) 81 cursor.observe(record) 82 cursor.close_partition(partition) 83 self._queue.put(PartitionCompleteSentinel(partition, self._IS_SUCCESSFUL)) 84 except Exception as e: 85 self._queue.put(StreamThreadException(e, partition.stream_name())) 86 self._queue.put(PartitionCompleteSentinel(partition, not self._IS_SUCCESSFUL))
class
PartitionLogger:
23class PartitionLogger: 24 """ 25 Helper class that provides a mechanism for passing a log message onto the current 26 partitions message repository 27 """ 28 29 def __init__( 30 self, 31 slice_logger: SliceLogger, 32 logger: logging.Logger, 33 message_repository: MessageRepository, 34 ): 35 self._slice_logger = slice_logger 36 self._logger = logger 37 self._message_repository = message_repository 38 39 def log(self, partition: Partition) -> None: 40 if self._slice_logger.should_log_slice_message(self._logger): 41 self._message_repository.emit_message( 42 self._slice_logger.create_slice_log_message(partition.to_slice()) 43 )
Helper class that provides a mechanism for passing a log message onto the current partitions message repository
PartitionLogger( slice_logger: airbyte_cdk.sources.utils.slice_logger.SliceLogger, logger: logging.Logger, message_repository: airbyte_cdk.MessageRepository)
def
log( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition) -> None:
class
PartitionReader:
46class PartitionReader: 47 """ 48 Generates records from a partition and puts them in a queue. 49 """ 50 51 _IS_SUCCESSFUL = True 52 53 def __init__( 54 self, 55 queue: Queue[QueueItem], 56 partition_logger: Optional[PartitionLogger] = None, 57 ) -> None: 58 """ 59 :param queue: The queue to put the records in. 60 """ 61 self._queue = queue 62 self._partition_logger = partition_logger 63 64 def process_partition(self, partition: Partition, cursor: Cursor) -> None: 65 """ 66 Process a partition and put the records in the output queue. 67 When all the partitions are added to the queue, a sentinel is added to the queue to indicate that all the partitions have been generated. 68 69 If an exception is encountered, the exception will be caught and put in the queue. This is very important because if we don't, the 70 main thread will have no way to know that something when wrong and will wait until the timeout is reached 71 72 This method is meant to be called from a thread. 73 :param partition: The partition to read data from 74 :return: None 75 """ 76 try: 77 if self._partition_logger: 78 self._partition_logger.log(partition) 79 80 for record in partition.read(): 81 self._queue.put(record) 82 cursor.observe(record) 83 cursor.close_partition(partition) 84 self._queue.put(PartitionCompleteSentinel(partition, self._IS_SUCCESSFUL)) 85 except Exception as e: 86 self._queue.put(StreamThreadException(e, partition.stream_name())) 87 self._queue.put(PartitionCompleteSentinel(partition, not self._IS_SUCCESSFUL))
Generates records from a partition and puts them in a queue.
PartitionReader( queue: queue.Queue[typing.Union[airbyte_cdk.Record, airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition, airbyte_cdk.sources.streams.concurrent.partitions.types.PartitionCompleteSentinel, airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel.PartitionGenerationCompletedSentinel, Exception, airbyte_cdk.AirbyteMessage]], partition_logger: Optional[PartitionLogger] = None)
53 def __init__( 54 self, 55 queue: Queue[QueueItem], 56 partition_logger: Optional[PartitionLogger] = None, 57 ) -> None: 58 """ 59 :param queue: The queue to put the records in. 60 """ 61 self._queue = queue 62 self._partition_logger = partition_logger
Parameters
- queue: The queue to put the records in.
def
process_partition( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition, cursor: airbyte_cdk.Cursor) -> None:
64 def process_partition(self, partition: Partition, cursor: Cursor) -> None: 65 """ 66 Process a partition and put the records in the output queue. 67 When all the partitions are added to the queue, a sentinel is added to the queue to indicate that all the partitions have been generated. 68 69 If an exception is encountered, the exception will be caught and put in the queue. This is very important because if we don't, the 70 main thread will have no way to know that something when wrong and will wait until the timeout is reached 71 72 This method is meant to be called from a thread. 73 :param partition: The partition to read data from 74 :return: None 75 """ 76 try: 77 if self._partition_logger: 78 self._partition_logger.log(partition) 79 80 for record in partition.read(): 81 self._queue.put(record) 82 cursor.observe(record) 83 cursor.close_partition(partition) 84 self._queue.put(PartitionCompleteSentinel(partition, self._IS_SUCCESSFUL)) 85 except Exception as e: 86 self._queue.put(StreamThreadException(e, partition.stream_name())) 87 self._queue.put(PartitionCompleteSentinel(partition, not self._IS_SUCCESSFUL))
Process a partition and put the records in the output queue. When all the partitions are added to the queue, a sentinel is added to the queue to indicate that all the partitions have been generated.
If an exception is encountered, the exception will be caught and put in the queue. This is very important because if we don't, the main thread will have no way to know that something when wrong and will wait until the timeout is reached
This method is meant to be called from a thread.
Parameters
- partition: The partition to read data from
Returns
None