airbyte_cdk.sources.streams.concurrent.partition_enqueuer
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4import time 5from queue import Queue 6 7from airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel import ( 8 PartitionGenerationCompletedSentinel, 9) 10from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException 11from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager 12from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream 13from airbyte_cdk.sources.streams.concurrent.partitions.types import QueueItem 14 15 16class PartitionEnqueuer: 17 """ 18 Generates partitions from a partition generator and puts them in a queue. 19 """ 20 21 def __init__( 22 self, 23 queue: Queue[QueueItem], 24 thread_pool_manager: ThreadPoolManager, 25 sleep_time_in_seconds: float = 0.1, 26 ) -> None: 27 """ 28 :param queue: The queue to put the partitions in. 29 :param throttler: The throttler to use to throttle the partition generation. 30 """ 31 self._queue = queue 32 self._thread_pool_manager = thread_pool_manager 33 self._sleep_time_in_seconds = sleep_time_in_seconds 34 35 def generate_partitions(self, stream: AbstractStream) -> None: 36 """ 37 Generate partitions from a partition generator and put them in a queue. 38 When all the partitions are added to the queue, a sentinel is added to the queue to indicate that all the partitions have been generated. 39 40 If an exception is encountered, the exception will be caught and put in the queue. This is very important because if we don't, the 41 main thread will have no way to know that something when wrong and will wait until the timeout is reached 42 43 This method is meant to be called in a separate thread. 44 """ 45 try: 46 for partition in stream.generate_partitions(): 47 # Adding partitions to the queue generates futures. To avoid having too many futures, we throttle here. We understand that 48 # we might add more futures than the limit by throttling in the threads while it is the main thread that actual adds the 49 # future but we expect the delta between the max futures length and the actual to be small enough that it would not be an 50 # issue. We do this in the threads because we want the main thread to always be processing QueueItems as if it does not, the 51 # queue size could grow and generating OOM issues. 52 # 53 # Also note that we do not expect this to create deadlocks where all worker threads wait because we have less 54 # PartitionEnqueuer threads than worker threads. 55 # 56 # Also note that prune_to_validate_has_reached_futures_limit has a lock while pruning which might create a bottleneck in 57 # terms of performance. 58 while self._thread_pool_manager.prune_to_validate_has_reached_futures_limit(): 59 time.sleep(self._sleep_time_in_seconds) 60 self._queue.put(partition) 61 self._queue.put(PartitionGenerationCompletedSentinel(stream)) 62 except Exception as e: 63 self._queue.put(StreamThreadException(e, stream.name)) 64 self._queue.put(PartitionGenerationCompletedSentinel(stream))
class
PartitionEnqueuer:
17class PartitionEnqueuer: 18 """ 19 Generates partitions from a partition generator and puts them in a queue. 20 """ 21 22 def __init__( 23 self, 24 queue: Queue[QueueItem], 25 thread_pool_manager: ThreadPoolManager, 26 sleep_time_in_seconds: float = 0.1, 27 ) -> None: 28 """ 29 :param queue: The queue to put the partitions in. 30 :param throttler: The throttler to use to throttle the partition generation. 31 """ 32 self._queue = queue 33 self._thread_pool_manager = thread_pool_manager 34 self._sleep_time_in_seconds = sleep_time_in_seconds 35 36 def generate_partitions(self, stream: AbstractStream) -> None: 37 """ 38 Generate partitions from a partition generator and put them in a queue. 39 When all the partitions are added to the queue, a sentinel is added to the queue to indicate that all the partitions have been generated. 40 41 If an exception is encountered, the exception will be caught and put in the queue. This is very important because if we don't, the 42 main thread will have no way to know that something when wrong and will wait until the timeout is reached 43 44 This method is meant to be called in a separate thread. 45 """ 46 try: 47 for partition in stream.generate_partitions(): 48 # Adding partitions to the queue generates futures. To avoid having too many futures, we throttle here. We understand that 49 # we might add more futures than the limit by throttling in the threads while it is the main thread that actual adds the 50 # future but we expect the delta between the max futures length and the actual to be small enough that it would not be an 51 # issue. We do this in the threads because we want the main thread to always be processing QueueItems as if it does not, the 52 # queue size could grow and generating OOM issues. 53 # 54 # Also note that we do not expect this to create deadlocks where all worker threads wait because we have less 55 # PartitionEnqueuer threads than worker threads. 56 # 57 # Also note that prune_to_validate_has_reached_futures_limit has a lock while pruning which might create a bottleneck in 58 # terms of performance. 59 while self._thread_pool_manager.prune_to_validate_has_reached_futures_limit(): 60 time.sleep(self._sleep_time_in_seconds) 61 self._queue.put(partition) 62 self._queue.put(PartitionGenerationCompletedSentinel(stream)) 63 except Exception as e: 64 self._queue.put(StreamThreadException(e, stream.name)) 65 self._queue.put(PartitionGenerationCompletedSentinel(stream))
Generates partitions from a partition generator and puts them in a queue.
PartitionEnqueuer( queue: queue.Queue[typing.Union[airbyte_cdk.Record, airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition, airbyte_cdk.sources.streams.concurrent.partitions.types.PartitionCompleteSentinel, airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel.PartitionGenerationCompletedSentinel, Exception]], thread_pool_manager: airbyte_cdk.sources.concurrent_source.thread_pool_manager.ThreadPoolManager, sleep_time_in_seconds: float = 0.1)
22 def __init__( 23 self, 24 queue: Queue[QueueItem], 25 thread_pool_manager: ThreadPoolManager, 26 sleep_time_in_seconds: float = 0.1, 27 ) -> None: 28 """ 29 :param queue: The queue to put the partitions in. 30 :param throttler: The throttler to use to throttle the partition generation. 31 """ 32 self._queue = queue 33 self._thread_pool_manager = thread_pool_manager 34 self._sleep_time_in_seconds = sleep_time_in_seconds
Parameters
- queue: The queue to put the partitions in.
- throttler: The throttler to use to throttle the partition generation.
def
generate_partitions( self, stream: airbyte_cdk.sources.streams.concurrent.abstract_stream.AbstractStream) -> None:
36 def generate_partitions(self, stream: AbstractStream) -> None: 37 """ 38 Generate partitions from a partition generator and put them in a queue. 39 When all the partitions are added to the queue, a sentinel is added to the queue to indicate that all the partitions have been generated. 40 41 If an exception is encountered, the exception will be caught and put in the queue. This is very important because if we don't, the 42 main thread will have no way to know that something when wrong and will wait until the timeout is reached 43 44 This method is meant to be called in a separate thread. 45 """ 46 try: 47 for partition in stream.generate_partitions(): 48 # Adding partitions to the queue generates futures. To avoid having too many futures, we throttle here. We understand that 49 # we might add more futures than the limit by throttling in the threads while it is the main thread that actual adds the 50 # future but we expect the delta between the max futures length and the actual to be small enough that it would not be an 51 # issue. We do this in the threads because we want the main thread to always be processing QueueItems as if it does not, the 52 # queue size could grow and generating OOM issues. 53 # 54 # Also note that we do not expect this to create deadlocks where all worker threads wait because we have less 55 # PartitionEnqueuer threads than worker threads. 56 # 57 # Also note that prune_to_validate_has_reached_futures_limit has a lock while pruning which might create a bottleneck in 58 # terms of performance. 59 while self._thread_pool_manager.prune_to_validate_has_reached_futures_limit(): 60 time.sleep(self._sleep_time_in_seconds) 61 self._queue.put(partition) 62 self._queue.put(PartitionGenerationCompletedSentinel(stream)) 63 except Exception as e: 64 self._queue.put(StreamThreadException(e, stream.name)) 65 self._queue.put(PartitionGenerationCompletedSentinel(stream))
Generate partitions from a partition generator and put them in a queue. When all the partitions are added to the queue, a sentinel is added to the queue to indicate that all the partitions have been generated.
If an exception is encountered, the exception will be caught and put in the queue. This is very important because if we don't, the main thread will have no way to know that something when wrong and will wait until the timeout is reached
This method is meant to be called in a separate thread.