airbyte_cdk.sources.concurrent_source.concurrent_source
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4import concurrent 5import logging 6from queue import Queue 7from typing import Iterable, Iterator, List 8 9from airbyte_cdk.models import AirbyteMessage 10from airbyte_cdk.sources.concurrent_source.concurrent_read_processor import ConcurrentReadProcessor 11from airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel import ( 12 PartitionGenerationCompletedSentinel, 13) 14from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException 15from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager 16from airbyte_cdk.sources.message import InMemoryMessageRepository, MessageRepository 17from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream 18from airbyte_cdk.sources.streams.concurrent.partition_enqueuer import PartitionEnqueuer 19from airbyte_cdk.sources.streams.concurrent.partition_reader import PartitionReader 20from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition 21from airbyte_cdk.sources.streams.concurrent.partitions.types import ( 22 PartitionCompleteSentinel, 23 QueueItem, 24) 25from airbyte_cdk.sources.types import Record 26from airbyte_cdk.sources.utils.slice_logger import DebugSliceLogger, SliceLogger 27 28 29class ConcurrentSource: 30 """ 31 A Source that reads data from multiple AbstractStreams concurrently. 32 It does so by submitting partition generation, and partition read tasks to a thread pool. 33 The tasks asynchronously add their output to a shared queue. 34 The read is done when all partitions for all streams w ere generated and read. 35 """ 36 37 DEFAULT_TIMEOUT_SECONDS = 900 38 39 @staticmethod 40 def create( 41 num_workers: int, 42 initial_number_of_partitions_to_generate: int, 43 logger: logging.Logger, 44 slice_logger: SliceLogger, 45 message_repository: MessageRepository, 46 timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS, 47 ) -> "ConcurrentSource": 48 is_single_threaded = initial_number_of_partitions_to_generate == 1 and num_workers == 1 49 too_many_generator = ( 50 not is_single_threaded and initial_number_of_partitions_to_generate >= num_workers 51 ) 52 assert ( 53 not too_many_generator 54 ), "It is required to have more workers than threads generating partitions" 55 threadpool = ThreadPoolManager( 56 concurrent.futures.ThreadPoolExecutor( 57 max_workers=num_workers, thread_name_prefix="workerpool" 58 ), 59 logger, 60 ) 61 return ConcurrentSource( 62 threadpool, 63 logger, 64 slice_logger, 65 message_repository, 66 initial_number_of_partitions_to_generate, 67 timeout_seconds, 68 ) 69 70 def __init__( 71 self, 72 threadpool: ThreadPoolManager, 73 logger: logging.Logger, 74 slice_logger: SliceLogger = DebugSliceLogger(), 75 message_repository: MessageRepository = InMemoryMessageRepository(), 76 initial_number_partitions_to_generate: int = 1, 77 timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS, 78 ) -> None: 79 """ 80 :param threadpool: The threadpool to submit tasks to 81 :param logger: The logger to log to 82 :param slice_logger: The slice logger used to create messages on new slices 83 :param message_repository: The repository to emit messages to 84 :param initial_number_partitions_to_generate: The initial number of concurrent partition generation tasks. Limiting this number ensures will limit the latency of the first records emitted. While the latency is not critical, emitting the records early allows the platform and the destination to process them as early as possible. 85 :param timeout_seconds: The maximum number of seconds to wait for a record to be read from the queue. If no record is read within this time, the source will stop reading and return. 86 """ 87 self._threadpool = threadpool 88 self._logger = logger 89 self._slice_logger = slice_logger 90 self._message_repository = message_repository 91 self._initial_number_partitions_to_generate = initial_number_partitions_to_generate 92 self._timeout_seconds = timeout_seconds 93 94 def read( 95 self, 96 streams: List[AbstractStream], 97 ) -> Iterator[AirbyteMessage]: 98 self._logger.info("Starting syncing") 99 100 # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less 101 # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating 102 # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more 103 # information and might even need to be configurable depending on the source 104 queue: Queue[QueueItem] = Queue(maxsize=10_000) 105 concurrent_stream_processor = ConcurrentReadProcessor( 106 streams, 107 PartitionEnqueuer(queue, self._threadpool), 108 self._threadpool, 109 self._logger, 110 self._slice_logger, 111 self._message_repository, 112 PartitionReader(queue), 113 ) 114 115 # Enqueue initial partition generation tasks 116 yield from self._submit_initial_partition_generators(concurrent_stream_processor) 117 118 # Read from the queue until all partitions were generated and read 119 yield from self._consume_from_queue( 120 queue, 121 concurrent_stream_processor, 122 ) 123 self._threadpool.check_for_errors_and_shutdown() 124 self._logger.info("Finished syncing") 125 126 def _submit_initial_partition_generators( 127 self, concurrent_stream_processor: ConcurrentReadProcessor 128 ) -> Iterable[AirbyteMessage]: 129 for _ in range(self._initial_number_partitions_to_generate): 130 status_message = concurrent_stream_processor.start_next_partition_generator() 131 if status_message: 132 yield status_message 133 134 def _consume_from_queue( 135 self, 136 queue: Queue[QueueItem], 137 concurrent_stream_processor: ConcurrentReadProcessor, 138 ) -> Iterable[AirbyteMessage]: 139 while airbyte_message_or_record_or_exception := queue.get(): 140 yield from self._handle_item( 141 airbyte_message_or_record_or_exception, 142 concurrent_stream_processor, 143 ) 144 if concurrent_stream_processor.is_done() and queue.empty(): 145 # all partitions were generated and processed. we're done here 146 break 147 148 def _handle_item( 149 self, 150 queue_item: QueueItem, 151 concurrent_stream_processor: ConcurrentReadProcessor, 152 ) -> Iterable[AirbyteMessage]: 153 # handle queue item and call the appropriate handler depending on the type of the queue item 154 if isinstance(queue_item, StreamThreadException): 155 yield from concurrent_stream_processor.on_exception(queue_item) 156 elif isinstance(queue_item, PartitionGenerationCompletedSentinel): 157 yield from concurrent_stream_processor.on_partition_generation_completed(queue_item) 158 elif isinstance(queue_item, Partition): 159 concurrent_stream_processor.on_partition(queue_item) 160 elif isinstance(queue_item, PartitionCompleteSentinel): 161 yield from concurrent_stream_processor.on_partition_complete_sentinel(queue_item) 162 elif isinstance(queue_item, Record): 163 yield from concurrent_stream_processor.on_record(queue_item) 164 else: 165 raise ValueError(f"Unknown queue item type: {type(queue_item)}")
class
ConcurrentSource:
30class ConcurrentSource: 31 """ 32 A Source that reads data from multiple AbstractStreams concurrently. 33 It does so by submitting partition generation, and partition read tasks to a thread pool. 34 The tasks asynchronously add their output to a shared queue. 35 The read is done when all partitions for all streams w ere generated and read. 36 """ 37 38 DEFAULT_TIMEOUT_SECONDS = 900 39 40 @staticmethod 41 def create( 42 num_workers: int, 43 initial_number_of_partitions_to_generate: int, 44 logger: logging.Logger, 45 slice_logger: SliceLogger, 46 message_repository: MessageRepository, 47 timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS, 48 ) -> "ConcurrentSource": 49 is_single_threaded = initial_number_of_partitions_to_generate == 1 and num_workers == 1 50 too_many_generator = ( 51 not is_single_threaded and initial_number_of_partitions_to_generate >= num_workers 52 ) 53 assert ( 54 not too_many_generator 55 ), "It is required to have more workers than threads generating partitions" 56 threadpool = ThreadPoolManager( 57 concurrent.futures.ThreadPoolExecutor( 58 max_workers=num_workers, thread_name_prefix="workerpool" 59 ), 60 logger, 61 ) 62 return ConcurrentSource( 63 threadpool, 64 logger, 65 slice_logger, 66 message_repository, 67 initial_number_of_partitions_to_generate, 68 timeout_seconds, 69 ) 70 71 def __init__( 72 self, 73 threadpool: ThreadPoolManager, 74 logger: logging.Logger, 75 slice_logger: SliceLogger = DebugSliceLogger(), 76 message_repository: MessageRepository = InMemoryMessageRepository(), 77 initial_number_partitions_to_generate: int = 1, 78 timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS, 79 ) -> None: 80 """ 81 :param threadpool: The threadpool to submit tasks to 82 :param logger: The logger to log to 83 :param slice_logger: The slice logger used to create messages on new slices 84 :param message_repository: The repository to emit messages to 85 :param initial_number_partitions_to_generate: The initial number of concurrent partition generation tasks. Limiting this number ensures will limit the latency of the first records emitted. While the latency is not critical, emitting the records early allows the platform and the destination to process them as early as possible. 86 :param timeout_seconds: The maximum number of seconds to wait for a record to be read from the queue. If no record is read within this time, the source will stop reading and return. 87 """ 88 self._threadpool = threadpool 89 self._logger = logger 90 self._slice_logger = slice_logger 91 self._message_repository = message_repository 92 self._initial_number_partitions_to_generate = initial_number_partitions_to_generate 93 self._timeout_seconds = timeout_seconds 94 95 def read( 96 self, 97 streams: List[AbstractStream], 98 ) -> Iterator[AirbyteMessage]: 99 self._logger.info("Starting syncing") 100 101 # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less 102 # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating 103 # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more 104 # information and might even need to be configurable depending on the source 105 queue: Queue[QueueItem] = Queue(maxsize=10_000) 106 concurrent_stream_processor = ConcurrentReadProcessor( 107 streams, 108 PartitionEnqueuer(queue, self._threadpool), 109 self._threadpool, 110 self._logger, 111 self._slice_logger, 112 self._message_repository, 113 PartitionReader(queue), 114 ) 115 116 # Enqueue initial partition generation tasks 117 yield from self._submit_initial_partition_generators(concurrent_stream_processor) 118 119 # Read from the queue until all partitions were generated and read 120 yield from self._consume_from_queue( 121 queue, 122 concurrent_stream_processor, 123 ) 124 self._threadpool.check_for_errors_and_shutdown() 125 self._logger.info("Finished syncing") 126 127 def _submit_initial_partition_generators( 128 self, concurrent_stream_processor: ConcurrentReadProcessor 129 ) -> Iterable[AirbyteMessage]: 130 for _ in range(self._initial_number_partitions_to_generate): 131 status_message = concurrent_stream_processor.start_next_partition_generator() 132 if status_message: 133 yield status_message 134 135 def _consume_from_queue( 136 self, 137 queue: Queue[QueueItem], 138 concurrent_stream_processor: ConcurrentReadProcessor, 139 ) -> Iterable[AirbyteMessage]: 140 while airbyte_message_or_record_or_exception := queue.get(): 141 yield from self._handle_item( 142 airbyte_message_or_record_or_exception, 143 concurrent_stream_processor, 144 ) 145 if concurrent_stream_processor.is_done() and queue.empty(): 146 # all partitions were generated and processed. we're done here 147 break 148 149 def _handle_item( 150 self, 151 queue_item: QueueItem, 152 concurrent_stream_processor: ConcurrentReadProcessor, 153 ) -> Iterable[AirbyteMessage]: 154 # handle queue item and call the appropriate handler depending on the type of the queue item 155 if isinstance(queue_item, StreamThreadException): 156 yield from concurrent_stream_processor.on_exception(queue_item) 157 elif isinstance(queue_item, PartitionGenerationCompletedSentinel): 158 yield from concurrent_stream_processor.on_partition_generation_completed(queue_item) 159 elif isinstance(queue_item, Partition): 160 concurrent_stream_processor.on_partition(queue_item) 161 elif isinstance(queue_item, PartitionCompleteSentinel): 162 yield from concurrent_stream_processor.on_partition_complete_sentinel(queue_item) 163 elif isinstance(queue_item, Record): 164 yield from concurrent_stream_processor.on_record(queue_item) 165 else: 166 raise ValueError(f"Unknown queue item type: {type(queue_item)}")
A Source that reads data from multiple AbstractStreams concurrently. It does so by submitting partition generation, and partition read tasks to a thread pool. The tasks asynchronously add their output to a shared queue. The read is done when all partitions for all streams w ere generated and read.
ConcurrentSource( threadpool: airbyte_cdk.sources.concurrent_source.thread_pool_manager.ThreadPoolManager, logger: logging.Logger, slice_logger: airbyte_cdk.sources.utils.slice_logger.SliceLogger = <airbyte_cdk.sources.utils.slice_logger.DebugSliceLogger object>, message_repository: airbyte_cdk.MessageRepository = <airbyte_cdk.InMemoryMessageRepository object>, initial_number_partitions_to_generate: int = 1, timeout_seconds: int = 900)
71 def __init__( 72 self, 73 threadpool: ThreadPoolManager, 74 logger: logging.Logger, 75 slice_logger: SliceLogger = DebugSliceLogger(), 76 message_repository: MessageRepository = InMemoryMessageRepository(), 77 initial_number_partitions_to_generate: int = 1, 78 timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS, 79 ) -> None: 80 """ 81 :param threadpool: The threadpool to submit tasks to 82 :param logger: The logger to log to 83 :param slice_logger: The slice logger used to create messages on new slices 84 :param message_repository: The repository to emit messages to 85 :param initial_number_partitions_to_generate: The initial number of concurrent partition generation tasks. Limiting this number ensures will limit the latency of the first records emitted. While the latency is not critical, emitting the records early allows the platform and the destination to process them as early as possible. 86 :param timeout_seconds: The maximum number of seconds to wait for a record to be read from the queue. If no record is read within this time, the source will stop reading and return. 87 """ 88 self._threadpool = threadpool 89 self._logger = logger 90 self._slice_logger = slice_logger 91 self._message_repository = message_repository 92 self._initial_number_partitions_to_generate = initial_number_partitions_to_generate 93 self._timeout_seconds = timeout_seconds
Parameters
- threadpool: The threadpool to submit tasks to
- logger: The logger to log to
- slice_logger: The slice logger used to create messages on new slices
- message_repository: The repository to emit messages to
- initial_number_partitions_to_generate: The initial number of concurrent partition generation tasks. Limiting this number ensures will limit the latency of the first records emitted. While the latency is not critical, emitting the records early allows the platform and the destination to process them as early as possible.
- timeout_seconds: The maximum number of seconds to wait for a record to be read from the queue. If no record is read within this time, the source will stop reading and return.
@staticmethod
def
create( num_workers: int, initial_number_of_partitions_to_generate: int, logger: logging.Logger, slice_logger: airbyte_cdk.sources.utils.slice_logger.SliceLogger, message_repository: airbyte_cdk.MessageRepository, timeout_seconds: int = 900) -> ConcurrentSource:
40 @staticmethod 41 def create( 42 num_workers: int, 43 initial_number_of_partitions_to_generate: int, 44 logger: logging.Logger, 45 slice_logger: SliceLogger, 46 message_repository: MessageRepository, 47 timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS, 48 ) -> "ConcurrentSource": 49 is_single_threaded = initial_number_of_partitions_to_generate == 1 and num_workers == 1 50 too_many_generator = ( 51 not is_single_threaded and initial_number_of_partitions_to_generate >= num_workers 52 ) 53 assert ( 54 not too_many_generator 55 ), "It is required to have more workers than threads generating partitions" 56 threadpool = ThreadPoolManager( 57 concurrent.futures.ThreadPoolExecutor( 58 max_workers=num_workers, thread_name_prefix="workerpool" 59 ), 60 logger, 61 ) 62 return ConcurrentSource( 63 threadpool, 64 logger, 65 slice_logger, 66 message_repository, 67 initial_number_of_partitions_to_generate, 68 timeout_seconds, 69 )
def
read( self, streams: List[airbyte_cdk.sources.streams.concurrent.abstract_stream.AbstractStream]) -> Iterator[airbyte_cdk.AirbyteMessage]:
95 def read( 96 self, 97 streams: List[AbstractStream], 98 ) -> Iterator[AirbyteMessage]: 99 self._logger.info("Starting syncing") 100 101 # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less 102 # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating 103 # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more 104 # information and might even need to be configurable depending on the source 105 queue: Queue[QueueItem] = Queue(maxsize=10_000) 106 concurrent_stream_processor = ConcurrentReadProcessor( 107 streams, 108 PartitionEnqueuer(queue, self._threadpool), 109 self._threadpool, 110 self._logger, 111 self._slice_logger, 112 self._message_repository, 113 PartitionReader(queue), 114 ) 115 116 # Enqueue initial partition generation tasks 117 yield from self._submit_initial_partition_generators(concurrent_stream_processor) 118 119 # Read from the queue until all partitions were generated and read 120 yield from self._consume_from_queue( 121 queue, 122 concurrent_stream_processor, 123 ) 124 self._threadpool.check_for_errors_and_shutdown() 125 self._logger.info("Finished syncing")