airbyte_cdk.sources.concurrent_source.concurrent_source
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import concurrent 6import logging 7from queue import Queue 8from typing import Iterable, Iterator, List, Optional 9 10from airbyte_cdk.models import AirbyteMessage 11from airbyte_cdk.sources.concurrent_source.concurrent_read_processor import ConcurrentReadProcessor 12from airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel import ( 13 PartitionGenerationCompletedSentinel, 14) 15from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException 16from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager 17from airbyte_cdk.sources.message import InMemoryMessageRepository, MessageRepository 18from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream 19from airbyte_cdk.sources.streams.concurrent.partition_enqueuer import PartitionEnqueuer 20from airbyte_cdk.sources.streams.concurrent.partition_reader import PartitionLogger, PartitionReader 21from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition 22from airbyte_cdk.sources.streams.concurrent.partitions.types import ( 23 PartitionCompleteSentinel, 24 QueueItem, 25) 26from airbyte_cdk.sources.types import Record 27from airbyte_cdk.sources.utils.slice_logger import DebugSliceLogger, SliceLogger 28 29 30class ConcurrentSource: 31 """ 32 A Source that reads data from multiple AbstractStreams concurrently. 33 It does so by submitting partition generation, and partition read tasks to a thread pool. 34 The tasks asynchronously add their output to a shared queue. 35 The read is done when all partitions for all streams w ere generated and read. 36 """ 37 38 DEFAULT_TIMEOUT_SECONDS = 900 39 40 @staticmethod 41 def create( 42 num_workers: int, 43 initial_number_of_partitions_to_generate: int, 44 logger: logging.Logger, 45 slice_logger: SliceLogger, 46 message_repository: MessageRepository, 47 queue: Optional[Queue[QueueItem]] = None, 48 timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS, 49 ) -> "ConcurrentSource": 50 is_single_threaded = initial_number_of_partitions_to_generate == 1 and num_workers == 1 51 too_many_generator = ( 52 not is_single_threaded and initial_number_of_partitions_to_generate >= num_workers 53 ) 54 assert not too_many_generator, ( 55 "It is required to have more workers than threads generating partitions" 56 ) 57 threadpool = ThreadPoolManager( 58 concurrent.futures.ThreadPoolExecutor( 59 max_workers=num_workers, thread_name_prefix="workerpool" 60 ), 61 logger, 62 ) 63 return ConcurrentSource( 64 threadpool=threadpool, 65 logger=logger, 66 slice_logger=slice_logger, 67 queue=queue, 68 message_repository=message_repository, 69 initial_number_partitions_to_generate=initial_number_of_partitions_to_generate, 70 timeout_seconds=timeout_seconds, 71 ) 72 73 def __init__( 74 self, 75 threadpool: ThreadPoolManager, 76 logger: logging.Logger, 77 slice_logger: SliceLogger = DebugSliceLogger(), 78 queue: Optional[Queue[QueueItem]] = None, 79 message_repository: MessageRepository = InMemoryMessageRepository(), 80 initial_number_partitions_to_generate: int = 1, 81 timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS, 82 ) -> None: 83 """ 84 :param threadpool: The threadpool to submit tasks to 85 :param logger: The logger to log to 86 :param slice_logger: The slice logger used to create messages on new slices 87 :param message_repository: The repository to emit messages to 88 :param initial_number_partitions_to_generate: The initial number of concurrent partition generation tasks. Limiting this number ensures will limit the latency of the first records emitted. While the latency is not critical, emitting the records early allows the platform and the destination to process them as early as possible. 89 :param timeout_seconds: The maximum number of seconds to wait for a record to be read from the queue. If no record is read within this time, the source will stop reading and return. 90 """ 91 self._threadpool = threadpool 92 self._logger = logger 93 self._slice_logger = slice_logger 94 self._message_repository = message_repository 95 self._initial_number_partitions_to_generate = initial_number_partitions_to_generate 96 self._timeout_seconds = timeout_seconds 97 98 # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less 99 # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating 100 # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more 101 # information and might even need to be configurable depending on the source 102 self._queue = queue or Queue(maxsize=10_000) 103 104 def read( 105 self, 106 streams: List[AbstractStream], 107 ) -> Iterator[AirbyteMessage]: 108 self._logger.info("Starting syncing") 109 concurrent_stream_processor = ConcurrentReadProcessor( 110 streams, 111 PartitionEnqueuer(self._queue, self._threadpool), 112 self._threadpool, 113 self._logger, 114 self._slice_logger, 115 self._message_repository, 116 PartitionReader( 117 self._queue, 118 PartitionLogger(self._slice_logger, self._logger, self._message_repository), 119 ), 120 ) 121 122 # Enqueue initial partition generation tasks 123 yield from self._submit_initial_partition_generators(concurrent_stream_processor) 124 125 # Read from the queue until all partitions were generated and read 126 yield from self._consume_from_queue( 127 self._queue, 128 concurrent_stream_processor, 129 ) 130 self._threadpool.check_for_errors_and_shutdown() 131 self._logger.info("Finished syncing") 132 133 def _submit_initial_partition_generators( 134 self, concurrent_stream_processor: ConcurrentReadProcessor 135 ) -> Iterable[AirbyteMessage]: 136 for _ in range(self._initial_number_partitions_to_generate): 137 status_message = concurrent_stream_processor.start_next_partition_generator() 138 if status_message: 139 yield status_message 140 141 def _consume_from_queue( 142 self, 143 queue: Queue[QueueItem], 144 concurrent_stream_processor: ConcurrentReadProcessor, 145 ) -> Iterable[AirbyteMessage]: 146 while airbyte_message_or_record_or_exception := queue.get(): 147 yield from self._handle_item( 148 airbyte_message_or_record_or_exception, 149 concurrent_stream_processor, 150 ) 151 # In the event that a partition raises an exception, anything remaining in 152 # the queue will be missed because is_done() can raise an exception and exit 153 # out of this loop before remaining items are consumed 154 if queue.empty() and concurrent_stream_processor.is_done(): 155 # all partitions were generated and processed. we're done here 156 break 157 158 def _handle_item( 159 self, 160 queue_item: QueueItem, 161 concurrent_stream_processor: ConcurrentReadProcessor, 162 ) -> Iterable[AirbyteMessage]: 163 # handle queue item and call the appropriate handler depending on the type of the queue item 164 if isinstance(queue_item, StreamThreadException): 165 yield from concurrent_stream_processor.on_exception(queue_item) 166 elif isinstance(queue_item, PartitionGenerationCompletedSentinel): 167 yield from concurrent_stream_processor.on_partition_generation_completed(queue_item) 168 elif isinstance(queue_item, Partition): 169 concurrent_stream_processor.on_partition(queue_item) 170 elif isinstance(queue_item, PartitionCompleteSentinel): 171 yield from concurrent_stream_processor.on_partition_complete_sentinel(queue_item) 172 elif isinstance(queue_item, Record): 173 yield from concurrent_stream_processor.on_record(queue_item) 174 elif isinstance(queue_item, AirbyteMessage): 175 yield queue_item 176 else: 177 raise ValueError(f"Unknown queue item type: {type(queue_item)}")
class
ConcurrentSource:
31class ConcurrentSource: 32 """ 33 A Source that reads data from multiple AbstractStreams concurrently. 34 It does so by submitting partition generation, and partition read tasks to a thread pool. 35 The tasks asynchronously add their output to a shared queue. 36 The read is done when all partitions for all streams w ere generated and read. 37 """ 38 39 DEFAULT_TIMEOUT_SECONDS = 900 40 41 @staticmethod 42 def create( 43 num_workers: int, 44 initial_number_of_partitions_to_generate: int, 45 logger: logging.Logger, 46 slice_logger: SliceLogger, 47 message_repository: MessageRepository, 48 queue: Optional[Queue[QueueItem]] = None, 49 timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS, 50 ) -> "ConcurrentSource": 51 is_single_threaded = initial_number_of_partitions_to_generate == 1 and num_workers == 1 52 too_many_generator = ( 53 not is_single_threaded and initial_number_of_partitions_to_generate >= num_workers 54 ) 55 assert not too_many_generator, ( 56 "It is required to have more workers than threads generating partitions" 57 ) 58 threadpool = ThreadPoolManager( 59 concurrent.futures.ThreadPoolExecutor( 60 max_workers=num_workers, thread_name_prefix="workerpool" 61 ), 62 logger, 63 ) 64 return ConcurrentSource( 65 threadpool=threadpool, 66 logger=logger, 67 slice_logger=slice_logger, 68 queue=queue, 69 message_repository=message_repository, 70 initial_number_partitions_to_generate=initial_number_of_partitions_to_generate, 71 timeout_seconds=timeout_seconds, 72 ) 73 74 def __init__( 75 self, 76 threadpool: ThreadPoolManager, 77 logger: logging.Logger, 78 slice_logger: SliceLogger = DebugSliceLogger(), 79 queue: Optional[Queue[QueueItem]] = None, 80 message_repository: MessageRepository = InMemoryMessageRepository(), 81 initial_number_partitions_to_generate: int = 1, 82 timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS, 83 ) -> None: 84 """ 85 :param threadpool: The threadpool to submit tasks to 86 :param logger: The logger to log to 87 :param slice_logger: The slice logger used to create messages on new slices 88 :param message_repository: The repository to emit messages to 89 :param initial_number_partitions_to_generate: The initial number of concurrent partition generation tasks. Limiting this number ensures will limit the latency of the first records emitted. While the latency is not critical, emitting the records early allows the platform and the destination to process them as early as possible. 90 :param timeout_seconds: The maximum number of seconds to wait for a record to be read from the queue. If no record is read within this time, the source will stop reading and return. 91 """ 92 self._threadpool = threadpool 93 self._logger = logger 94 self._slice_logger = slice_logger 95 self._message_repository = message_repository 96 self._initial_number_partitions_to_generate = initial_number_partitions_to_generate 97 self._timeout_seconds = timeout_seconds 98 99 # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less 100 # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating 101 # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more 102 # information and might even need to be configurable depending on the source 103 self._queue = queue or Queue(maxsize=10_000) 104 105 def read( 106 self, 107 streams: List[AbstractStream], 108 ) -> Iterator[AirbyteMessage]: 109 self._logger.info("Starting syncing") 110 concurrent_stream_processor = ConcurrentReadProcessor( 111 streams, 112 PartitionEnqueuer(self._queue, self._threadpool), 113 self._threadpool, 114 self._logger, 115 self._slice_logger, 116 self._message_repository, 117 PartitionReader( 118 self._queue, 119 PartitionLogger(self._slice_logger, self._logger, self._message_repository), 120 ), 121 ) 122 123 # Enqueue initial partition generation tasks 124 yield from self._submit_initial_partition_generators(concurrent_stream_processor) 125 126 # Read from the queue until all partitions were generated and read 127 yield from self._consume_from_queue( 128 self._queue, 129 concurrent_stream_processor, 130 ) 131 self._threadpool.check_for_errors_and_shutdown() 132 self._logger.info("Finished syncing") 133 134 def _submit_initial_partition_generators( 135 self, concurrent_stream_processor: ConcurrentReadProcessor 136 ) -> Iterable[AirbyteMessage]: 137 for _ in range(self._initial_number_partitions_to_generate): 138 status_message = concurrent_stream_processor.start_next_partition_generator() 139 if status_message: 140 yield status_message 141 142 def _consume_from_queue( 143 self, 144 queue: Queue[QueueItem], 145 concurrent_stream_processor: ConcurrentReadProcessor, 146 ) -> Iterable[AirbyteMessage]: 147 while airbyte_message_or_record_or_exception := queue.get(): 148 yield from self._handle_item( 149 airbyte_message_or_record_or_exception, 150 concurrent_stream_processor, 151 ) 152 # In the event that a partition raises an exception, anything remaining in 153 # the queue will be missed because is_done() can raise an exception and exit 154 # out of this loop before remaining items are consumed 155 if queue.empty() and concurrent_stream_processor.is_done(): 156 # all partitions were generated and processed. we're done here 157 break 158 159 def _handle_item( 160 self, 161 queue_item: QueueItem, 162 concurrent_stream_processor: ConcurrentReadProcessor, 163 ) -> Iterable[AirbyteMessage]: 164 # handle queue item and call the appropriate handler depending on the type of the queue item 165 if isinstance(queue_item, StreamThreadException): 166 yield from concurrent_stream_processor.on_exception(queue_item) 167 elif isinstance(queue_item, PartitionGenerationCompletedSentinel): 168 yield from concurrent_stream_processor.on_partition_generation_completed(queue_item) 169 elif isinstance(queue_item, Partition): 170 concurrent_stream_processor.on_partition(queue_item) 171 elif isinstance(queue_item, PartitionCompleteSentinel): 172 yield from concurrent_stream_processor.on_partition_complete_sentinel(queue_item) 173 elif isinstance(queue_item, Record): 174 yield from concurrent_stream_processor.on_record(queue_item) 175 elif isinstance(queue_item, AirbyteMessage): 176 yield queue_item 177 else: 178 raise ValueError(f"Unknown queue item type: {type(queue_item)}")
A Source that reads data from multiple AbstractStreams concurrently. It does so by submitting partition generation, and partition read tasks to a thread pool. The tasks asynchronously add their output to a shared queue. The read is done when all partitions for all streams w ere generated and read.
ConcurrentSource( threadpool: airbyte_cdk.sources.concurrent_source.thread_pool_manager.ThreadPoolManager, logger: logging.Logger, slice_logger: airbyte_cdk.sources.utils.slice_logger.SliceLogger = <airbyte_cdk.sources.utils.slice_logger.DebugSliceLogger object>, queue: Optional[queue.Queue[Union[airbyte_cdk.Record, airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition, airbyte_cdk.sources.streams.concurrent.partitions.types.PartitionCompleteSentinel, airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel.PartitionGenerationCompletedSentinel, Exception, airbyte_cdk.AirbyteMessage]]] = None, message_repository: airbyte_cdk.MessageRepository = <airbyte_cdk.InMemoryMessageRepository object>, initial_number_partitions_to_generate: int = 1, timeout_seconds: int = 900)
74 def __init__( 75 self, 76 threadpool: ThreadPoolManager, 77 logger: logging.Logger, 78 slice_logger: SliceLogger = DebugSliceLogger(), 79 queue: Optional[Queue[QueueItem]] = None, 80 message_repository: MessageRepository = InMemoryMessageRepository(), 81 initial_number_partitions_to_generate: int = 1, 82 timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS, 83 ) -> None: 84 """ 85 :param threadpool: The threadpool to submit tasks to 86 :param logger: The logger to log to 87 :param slice_logger: The slice logger used to create messages on new slices 88 :param message_repository: The repository to emit messages to 89 :param initial_number_partitions_to_generate: The initial number of concurrent partition generation tasks. Limiting this number ensures will limit the latency of the first records emitted. While the latency is not critical, emitting the records early allows the platform and the destination to process them as early as possible. 90 :param timeout_seconds: The maximum number of seconds to wait for a record to be read from the queue. If no record is read within this time, the source will stop reading and return. 91 """ 92 self._threadpool = threadpool 93 self._logger = logger 94 self._slice_logger = slice_logger 95 self._message_repository = message_repository 96 self._initial_number_partitions_to_generate = initial_number_partitions_to_generate 97 self._timeout_seconds = timeout_seconds 98 99 # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less 100 # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating 101 # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more 102 # information and might even need to be configurable depending on the source 103 self._queue = queue or Queue(maxsize=10_000)
Parameters
- threadpool: The threadpool to submit tasks to
- logger: The logger to log to
- slice_logger: The slice logger used to create messages on new slices
- message_repository: The repository to emit messages to
- initial_number_partitions_to_generate: The initial number of concurrent partition generation tasks. Limiting this number ensures will limit the latency of the first records emitted. While the latency is not critical, emitting the records early allows the platform and the destination to process them as early as possible.
- timeout_seconds: The maximum number of seconds to wait for a record to be read from the queue. If no record is read within this time, the source will stop reading and return.
@staticmethod
def
create( num_workers: int, initial_number_of_partitions_to_generate: int, logger: logging.Logger, slice_logger: airbyte_cdk.sources.utils.slice_logger.SliceLogger, message_repository: airbyte_cdk.MessageRepository, queue: Optional[queue.Queue[Union[airbyte_cdk.Record, airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition, airbyte_cdk.sources.streams.concurrent.partitions.types.PartitionCompleteSentinel, airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel.PartitionGenerationCompletedSentinel, Exception, airbyte_cdk.AirbyteMessage]]] = None, timeout_seconds: int = 900) -> ConcurrentSource:
41 @staticmethod 42 def create( 43 num_workers: int, 44 initial_number_of_partitions_to_generate: int, 45 logger: logging.Logger, 46 slice_logger: SliceLogger, 47 message_repository: MessageRepository, 48 queue: Optional[Queue[QueueItem]] = None, 49 timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS, 50 ) -> "ConcurrentSource": 51 is_single_threaded = initial_number_of_partitions_to_generate == 1 and num_workers == 1 52 too_many_generator = ( 53 not is_single_threaded and initial_number_of_partitions_to_generate >= num_workers 54 ) 55 assert not too_many_generator, ( 56 "It is required to have more workers than threads generating partitions" 57 ) 58 threadpool = ThreadPoolManager( 59 concurrent.futures.ThreadPoolExecutor( 60 max_workers=num_workers, thread_name_prefix="workerpool" 61 ), 62 logger, 63 ) 64 return ConcurrentSource( 65 threadpool=threadpool, 66 logger=logger, 67 slice_logger=slice_logger, 68 queue=queue, 69 message_repository=message_repository, 70 initial_number_partitions_to_generate=initial_number_of_partitions_to_generate, 71 timeout_seconds=timeout_seconds, 72 )
def
read( self, streams: List[airbyte_cdk.sources.streams.concurrent.abstract_stream.AbstractStream]) -> Iterator[airbyte_cdk.AirbyteMessage]:
105 def read( 106 self, 107 streams: List[AbstractStream], 108 ) -> Iterator[AirbyteMessage]: 109 self._logger.info("Starting syncing") 110 concurrent_stream_processor = ConcurrentReadProcessor( 111 streams, 112 PartitionEnqueuer(self._queue, self._threadpool), 113 self._threadpool, 114 self._logger, 115 self._slice_logger, 116 self._message_repository, 117 PartitionReader( 118 self._queue, 119 PartitionLogger(self._slice_logger, self._logger, self._message_repository), 120 ), 121 ) 122 123 # Enqueue initial partition generation tasks 124 yield from self._submit_initial_partition_generators(concurrent_stream_processor) 125 126 # Read from the queue until all partitions were generated and read 127 yield from self._consume_from_queue( 128 self._queue, 129 concurrent_stream_processor, 130 ) 131 self._threadpool.check_for_errors_and_shutdown() 132 self._logger.info("Finished syncing")