airbyte_cdk.sources.concurrent_source.concurrent_source
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import concurrent 6import logging 7from queue import Queue 8from typing import Iterable, Iterator, List, Optional 9 10from airbyte_cdk.models import AirbyteMessage 11from airbyte_cdk.sources.concurrent_source.concurrent_read_processor import ConcurrentReadProcessor 12from airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel import ( 13 PartitionGenerationCompletedSentinel, 14) 15from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException 16from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager 17from airbyte_cdk.sources.message import InMemoryMessageRepository, MessageRepository 18from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream 19from airbyte_cdk.sources.streams.concurrent.partition_enqueuer import PartitionEnqueuer 20from airbyte_cdk.sources.streams.concurrent.partition_reader import PartitionLogger, PartitionReader 21from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition 22from airbyte_cdk.sources.streams.concurrent.partitions.types import ( 23 PartitionCompleteSentinel, 24 QueueItem, 25) 26from airbyte_cdk.sources.types import Record 27from airbyte_cdk.sources.utils.slice_logger import DebugSliceLogger, SliceLogger 28 29 30class ConcurrentSource: 31 """ 32 A Source that reads data from multiple AbstractStreams concurrently. 33 It does so by submitting partition generation, and partition read tasks to a thread pool. 34 The tasks asynchronously add their output to a shared queue. 35 The read is done when all partitions for all streams w ere generated and read. 36 """ 37 38 DEFAULT_TIMEOUT_SECONDS = 900 39 40 @staticmethod 41 def create( 42 num_workers: int, 43 initial_number_of_partitions_to_generate: int, 44 logger: logging.Logger, 45 slice_logger: SliceLogger, 46 message_repository: MessageRepository, 47 queue: Optional[Queue[QueueItem]] = None, 48 timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS, 49 ) -> "ConcurrentSource": 50 if initial_number_of_partitions_to_generate < 1: 51 raise ValueError( 52 f"initial_number_of_partitions_to_generate must be >= 1, got {initial_number_of_partitions_to_generate}" 53 ) 54 is_single_threaded = initial_number_of_partitions_to_generate == 1 and num_workers == 1 55 too_many_generator = ( 56 not is_single_threaded and initial_number_of_partitions_to_generate >= num_workers 57 ) 58 assert not too_many_generator, ( 59 "It is required to have more workers than threads generating partitions" 60 ) 61 threadpool = ThreadPoolManager( 62 concurrent.futures.ThreadPoolExecutor( 63 max_workers=num_workers, thread_name_prefix="workerpool" 64 ), 65 logger, 66 ) 67 return ConcurrentSource( 68 threadpool=threadpool, 69 logger=logger, 70 slice_logger=slice_logger, 71 queue=queue, 72 message_repository=message_repository, 73 initial_number_partitions_to_generate=initial_number_of_partitions_to_generate, 74 timeout_seconds=timeout_seconds, 75 ) 76 77 def __init__( 78 self, 79 threadpool: ThreadPoolManager, 80 logger: logging.Logger, 81 slice_logger: SliceLogger = DebugSliceLogger(), 82 queue: Optional[Queue[QueueItem]] = None, 83 message_repository: MessageRepository = InMemoryMessageRepository(), 84 initial_number_partitions_to_generate: int = 1, 85 timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS, 86 ) -> None: 87 """ 88 :param threadpool: The threadpool to submit tasks to 89 :param logger: The logger to log to 90 :param slice_logger: The slice logger used to create messages on new slices 91 :param message_repository: The repository to emit messages to 92 :param initial_number_partitions_to_generate: The initial number of concurrent partition generation tasks. Limiting this number ensures will limit the latency of the first records emitted. While the latency is not critical, emitting the records early allows the platform and the destination to process them as early as possible. 93 :param timeout_seconds: The maximum number of seconds to wait for a record to be read from the queue. If no record is read within this time, the source will stop reading and return. 94 """ 95 self._threadpool = threadpool 96 self._logger = logger 97 self._slice_logger = slice_logger 98 self._message_repository = message_repository 99 self._initial_number_partitions_to_generate = initial_number_partitions_to_generate 100 self._timeout_seconds = timeout_seconds 101 102 # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less 103 # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating 104 # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more 105 # information and might even need to be configurable depending on the source 106 self._queue = queue or Queue(maxsize=10_000) 107 108 def read( 109 self, 110 streams: List[AbstractStream], 111 ) -> Iterator[AirbyteMessage]: 112 self._logger.info("Starting syncing") 113 concurrent_stream_processor = ConcurrentReadProcessor( 114 streams, 115 PartitionEnqueuer(self._queue, self._threadpool), 116 self._threadpool, 117 self._logger, 118 self._slice_logger, 119 self._message_repository, 120 PartitionReader( 121 self._queue, 122 PartitionLogger(self._slice_logger, self._logger, self._message_repository), 123 ), 124 max_concurrent_partition_generators=self._initial_number_partitions_to_generate, 125 ) 126 127 # Enqueue initial partition generation tasks 128 yield from self._submit_initial_partition_generators(concurrent_stream_processor) 129 130 # Read from the queue until all partitions were generated and read 131 yield from self._consume_from_queue( 132 self._queue, 133 concurrent_stream_processor, 134 ) 135 self._threadpool.check_for_errors_and_shutdown() 136 self._logger.info("Finished syncing") 137 138 def _submit_initial_partition_generators( 139 self, concurrent_stream_processor: ConcurrentReadProcessor 140 ) -> Iterable[AirbyteMessage]: 141 for _ in range(self._initial_number_partitions_to_generate): 142 status_message = concurrent_stream_processor.start_next_partition_generator() 143 if status_message: 144 yield status_message 145 146 def _consume_from_queue( 147 self, 148 queue: Queue[QueueItem], 149 concurrent_stream_processor: ConcurrentReadProcessor, 150 ) -> Iterable[AirbyteMessage]: 151 while airbyte_message_or_record_or_exception := queue.get(): 152 yield from self._handle_item( 153 airbyte_message_or_record_or_exception, 154 concurrent_stream_processor, 155 ) 156 # In the event that a partition raises an exception, anything remaining in 157 # the queue will be missed because is_done() can raise an exception and exit 158 # out of this loop before remaining items are consumed 159 if queue.empty() and concurrent_stream_processor.is_done(): 160 # all partitions were generated and processed. we're done here 161 break 162 163 def _handle_item( 164 self, 165 queue_item: QueueItem, 166 concurrent_stream_processor: ConcurrentReadProcessor, 167 ) -> Iterable[AirbyteMessage]: 168 # handle queue item and call the appropriate handler depending on the type of the queue item 169 if isinstance(queue_item, StreamThreadException): 170 yield from concurrent_stream_processor.on_exception(queue_item) 171 elif isinstance(queue_item, PartitionGenerationCompletedSentinel): 172 yield from concurrent_stream_processor.on_partition_generation_completed(queue_item) 173 elif isinstance(queue_item, Partition): 174 concurrent_stream_processor.on_partition(queue_item) 175 elif isinstance(queue_item, PartitionCompleteSentinel): 176 yield from concurrent_stream_processor.on_partition_complete_sentinel(queue_item) 177 elif isinstance(queue_item, Record): 178 yield from concurrent_stream_processor.on_record(queue_item) 179 elif isinstance(queue_item, AirbyteMessage): 180 yield queue_item 181 else: 182 raise ValueError(f"Unknown queue item type: {type(queue_item)}")
class
ConcurrentSource:
31class ConcurrentSource: 32 """ 33 A Source that reads data from multiple AbstractStreams concurrently. 34 It does so by submitting partition generation, and partition read tasks to a thread pool. 35 The tasks asynchronously add their output to a shared queue. 36 The read is done when all partitions for all streams w ere generated and read. 37 """ 38 39 DEFAULT_TIMEOUT_SECONDS = 900 40 41 @staticmethod 42 def create( 43 num_workers: int, 44 initial_number_of_partitions_to_generate: int, 45 logger: logging.Logger, 46 slice_logger: SliceLogger, 47 message_repository: MessageRepository, 48 queue: Optional[Queue[QueueItem]] = None, 49 timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS, 50 ) -> "ConcurrentSource": 51 if initial_number_of_partitions_to_generate < 1: 52 raise ValueError( 53 f"initial_number_of_partitions_to_generate must be >= 1, got {initial_number_of_partitions_to_generate}" 54 ) 55 is_single_threaded = initial_number_of_partitions_to_generate == 1 and num_workers == 1 56 too_many_generator = ( 57 not is_single_threaded and initial_number_of_partitions_to_generate >= num_workers 58 ) 59 assert not too_many_generator, ( 60 "It is required to have more workers than threads generating partitions" 61 ) 62 threadpool = ThreadPoolManager( 63 concurrent.futures.ThreadPoolExecutor( 64 max_workers=num_workers, thread_name_prefix="workerpool" 65 ), 66 logger, 67 ) 68 return ConcurrentSource( 69 threadpool=threadpool, 70 logger=logger, 71 slice_logger=slice_logger, 72 queue=queue, 73 message_repository=message_repository, 74 initial_number_partitions_to_generate=initial_number_of_partitions_to_generate, 75 timeout_seconds=timeout_seconds, 76 ) 77 78 def __init__( 79 self, 80 threadpool: ThreadPoolManager, 81 logger: logging.Logger, 82 slice_logger: SliceLogger = DebugSliceLogger(), 83 queue: Optional[Queue[QueueItem]] = None, 84 message_repository: MessageRepository = InMemoryMessageRepository(), 85 initial_number_partitions_to_generate: int = 1, 86 timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS, 87 ) -> None: 88 """ 89 :param threadpool: The threadpool to submit tasks to 90 :param logger: The logger to log to 91 :param slice_logger: The slice logger used to create messages on new slices 92 :param message_repository: The repository to emit messages to 93 :param initial_number_partitions_to_generate: The initial number of concurrent partition generation tasks. Limiting this number ensures will limit the latency of the first records emitted. While the latency is not critical, emitting the records early allows the platform and the destination to process them as early as possible. 94 :param timeout_seconds: The maximum number of seconds to wait for a record to be read from the queue. If no record is read within this time, the source will stop reading and return. 95 """ 96 self._threadpool = threadpool 97 self._logger = logger 98 self._slice_logger = slice_logger 99 self._message_repository = message_repository 100 self._initial_number_partitions_to_generate = initial_number_partitions_to_generate 101 self._timeout_seconds = timeout_seconds 102 103 # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less 104 # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating 105 # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more 106 # information and might even need to be configurable depending on the source 107 self._queue = queue or Queue(maxsize=10_000) 108 109 def read( 110 self, 111 streams: List[AbstractStream], 112 ) -> Iterator[AirbyteMessage]: 113 self._logger.info("Starting syncing") 114 concurrent_stream_processor = ConcurrentReadProcessor( 115 streams, 116 PartitionEnqueuer(self._queue, self._threadpool), 117 self._threadpool, 118 self._logger, 119 self._slice_logger, 120 self._message_repository, 121 PartitionReader( 122 self._queue, 123 PartitionLogger(self._slice_logger, self._logger, self._message_repository), 124 ), 125 max_concurrent_partition_generators=self._initial_number_partitions_to_generate, 126 ) 127 128 # Enqueue initial partition generation tasks 129 yield from self._submit_initial_partition_generators(concurrent_stream_processor) 130 131 # Read from the queue until all partitions were generated and read 132 yield from self._consume_from_queue( 133 self._queue, 134 concurrent_stream_processor, 135 ) 136 self._threadpool.check_for_errors_and_shutdown() 137 self._logger.info("Finished syncing") 138 139 def _submit_initial_partition_generators( 140 self, concurrent_stream_processor: ConcurrentReadProcessor 141 ) -> Iterable[AirbyteMessage]: 142 for _ in range(self._initial_number_partitions_to_generate): 143 status_message = concurrent_stream_processor.start_next_partition_generator() 144 if status_message: 145 yield status_message 146 147 def _consume_from_queue( 148 self, 149 queue: Queue[QueueItem], 150 concurrent_stream_processor: ConcurrentReadProcessor, 151 ) -> Iterable[AirbyteMessage]: 152 while airbyte_message_or_record_or_exception := queue.get(): 153 yield from self._handle_item( 154 airbyte_message_or_record_or_exception, 155 concurrent_stream_processor, 156 ) 157 # In the event that a partition raises an exception, anything remaining in 158 # the queue will be missed because is_done() can raise an exception and exit 159 # out of this loop before remaining items are consumed 160 if queue.empty() and concurrent_stream_processor.is_done(): 161 # all partitions were generated and processed. we're done here 162 break 163 164 def _handle_item( 165 self, 166 queue_item: QueueItem, 167 concurrent_stream_processor: ConcurrentReadProcessor, 168 ) -> Iterable[AirbyteMessage]: 169 # handle queue item and call the appropriate handler depending on the type of the queue item 170 if isinstance(queue_item, StreamThreadException): 171 yield from concurrent_stream_processor.on_exception(queue_item) 172 elif isinstance(queue_item, PartitionGenerationCompletedSentinel): 173 yield from concurrent_stream_processor.on_partition_generation_completed(queue_item) 174 elif isinstance(queue_item, Partition): 175 concurrent_stream_processor.on_partition(queue_item) 176 elif isinstance(queue_item, PartitionCompleteSentinel): 177 yield from concurrent_stream_processor.on_partition_complete_sentinel(queue_item) 178 elif isinstance(queue_item, Record): 179 yield from concurrent_stream_processor.on_record(queue_item) 180 elif isinstance(queue_item, AirbyteMessage): 181 yield queue_item 182 else: 183 raise ValueError(f"Unknown queue item type: {type(queue_item)}")
A Source that reads data from multiple AbstractStreams concurrently. It does so by submitting partition generation, and partition read tasks to a thread pool. The tasks asynchronously add their output to a shared queue. The read is done when all partitions for all streams w ere generated and read.
ConcurrentSource( threadpool: airbyte_cdk.sources.concurrent_source.thread_pool_manager.ThreadPoolManager, logger: logging.Logger, slice_logger: airbyte_cdk.sources.utils.slice_logger.SliceLogger = <airbyte_cdk.sources.utils.slice_logger.DebugSliceLogger object>, queue: Optional[queue.Queue[Union[airbyte_cdk.Record, airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition, airbyte_cdk.sources.streams.concurrent.partitions.types.PartitionCompleteSentinel, airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel.PartitionGenerationCompletedSentinel, Exception, airbyte_cdk.AirbyteMessage]]] = None, message_repository: airbyte_cdk.MessageRepository = <airbyte_cdk.InMemoryMessageRepository object>, initial_number_partitions_to_generate: int = 1, timeout_seconds: int = 900)
78 def __init__( 79 self, 80 threadpool: ThreadPoolManager, 81 logger: logging.Logger, 82 slice_logger: SliceLogger = DebugSliceLogger(), 83 queue: Optional[Queue[QueueItem]] = None, 84 message_repository: MessageRepository = InMemoryMessageRepository(), 85 initial_number_partitions_to_generate: int = 1, 86 timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS, 87 ) -> None: 88 """ 89 :param threadpool: The threadpool to submit tasks to 90 :param logger: The logger to log to 91 :param slice_logger: The slice logger used to create messages on new slices 92 :param message_repository: The repository to emit messages to 93 :param initial_number_partitions_to_generate: The initial number of concurrent partition generation tasks. Limiting this number ensures will limit the latency of the first records emitted. While the latency is not critical, emitting the records early allows the platform and the destination to process them as early as possible. 94 :param timeout_seconds: The maximum number of seconds to wait for a record to be read from the queue. If no record is read within this time, the source will stop reading and return. 95 """ 96 self._threadpool = threadpool 97 self._logger = logger 98 self._slice_logger = slice_logger 99 self._message_repository = message_repository 100 self._initial_number_partitions_to_generate = initial_number_partitions_to_generate 101 self._timeout_seconds = timeout_seconds 102 103 # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less 104 # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating 105 # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more 106 # information and might even need to be configurable depending on the source 107 self._queue = queue or Queue(maxsize=10_000)
Parameters
- threadpool: The threadpool to submit tasks to
- logger: The logger to log to
- slice_logger: The slice logger used to create messages on new slices
- message_repository: The repository to emit messages to
- initial_number_partitions_to_generate: The initial number of concurrent partition generation tasks. Limiting this number ensures will limit the latency of the first records emitted. While the latency is not critical, emitting the records early allows the platform and the destination to process them as early as possible.
- timeout_seconds: The maximum number of seconds to wait for a record to be read from the queue. If no record is read within this time, the source will stop reading and return.
@staticmethod
def
create( num_workers: int, initial_number_of_partitions_to_generate: int, logger: logging.Logger, slice_logger: airbyte_cdk.sources.utils.slice_logger.SliceLogger, message_repository: airbyte_cdk.MessageRepository, queue: Optional[queue.Queue[Union[airbyte_cdk.Record, airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition, airbyte_cdk.sources.streams.concurrent.partitions.types.PartitionCompleteSentinel, airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel.PartitionGenerationCompletedSentinel, Exception, airbyte_cdk.AirbyteMessage]]] = None, timeout_seconds: int = 900) -> ConcurrentSource:
41 @staticmethod 42 def create( 43 num_workers: int, 44 initial_number_of_partitions_to_generate: int, 45 logger: logging.Logger, 46 slice_logger: SliceLogger, 47 message_repository: MessageRepository, 48 queue: Optional[Queue[QueueItem]] = None, 49 timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS, 50 ) -> "ConcurrentSource": 51 if initial_number_of_partitions_to_generate < 1: 52 raise ValueError( 53 f"initial_number_of_partitions_to_generate must be >= 1, got {initial_number_of_partitions_to_generate}" 54 ) 55 is_single_threaded = initial_number_of_partitions_to_generate == 1 and num_workers == 1 56 too_many_generator = ( 57 not is_single_threaded and initial_number_of_partitions_to_generate >= num_workers 58 ) 59 assert not too_many_generator, ( 60 "It is required to have more workers than threads generating partitions" 61 ) 62 threadpool = ThreadPoolManager( 63 concurrent.futures.ThreadPoolExecutor( 64 max_workers=num_workers, thread_name_prefix="workerpool" 65 ), 66 logger, 67 ) 68 return ConcurrentSource( 69 threadpool=threadpool, 70 logger=logger, 71 slice_logger=slice_logger, 72 queue=queue, 73 message_repository=message_repository, 74 initial_number_partitions_to_generate=initial_number_of_partitions_to_generate, 75 timeout_seconds=timeout_seconds, 76 )
def
read( self, streams: List[airbyte_cdk.sources.streams.concurrent.abstract_stream.AbstractStream]) -> Iterator[airbyte_cdk.AirbyteMessage]:
109 def read( 110 self, 111 streams: List[AbstractStream], 112 ) -> Iterator[AirbyteMessage]: 113 self._logger.info("Starting syncing") 114 concurrent_stream_processor = ConcurrentReadProcessor( 115 streams, 116 PartitionEnqueuer(self._queue, self._threadpool), 117 self._threadpool, 118 self._logger, 119 self._slice_logger, 120 self._message_repository, 121 PartitionReader( 122 self._queue, 123 PartitionLogger(self._slice_logger, self._logger, self._message_repository), 124 ), 125 max_concurrent_partition_generators=self._initial_number_partitions_to_generate, 126 ) 127 128 # Enqueue initial partition generation tasks 129 yield from self._submit_initial_partition_generators(concurrent_stream_processor) 130 131 # Read from the queue until all partitions were generated and read 132 yield from self._consume_from_queue( 133 self._queue, 134 concurrent_stream_processor, 135 ) 136 self._threadpool.check_for_errors_and_shutdown() 137 self._logger.info("Finished syncing")