airbyte_cdk.sources.streams.concurrent.partition_enqueuer

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4import time
 5from queue import Queue
 6
 7from airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel import (
 8    PartitionGenerationCompletedSentinel,
 9)
10from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException
11from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager
12from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
13from airbyte_cdk.sources.streams.concurrent.partitions.types import QueueItem
14
15
16class PartitionEnqueuer:
17    """
18    Generates partitions from a partition generator and puts them in a queue.
19    """
20
21    def __init__(
22        self,
23        queue: Queue[QueueItem],
24        thread_pool_manager: ThreadPoolManager,
25        sleep_time_in_seconds: float = 0.1,
26    ) -> None:
27        """
28        :param queue:  The queue to put the partitions in.
29        :param throttler: The throttler to use to throttle the partition generation.
30        """
31        self._queue = queue
32        self._thread_pool_manager = thread_pool_manager
33        self._sleep_time_in_seconds = sleep_time_in_seconds
34
35    def generate_partitions(self, stream: AbstractStream) -> None:
36        """
37        Generate partitions from a partition generator and put them in a queue.
38        When all the partitions are added to the queue, a sentinel is added to the queue to indicate that all the partitions have been generated.
39
40        If an exception is encountered, the exception will be caught and put in the queue. This is very important because if we don't, the
41        main thread will have no way to know that something when wrong and will wait until the timeout is reached
42
43        This method is meant to be called in a separate thread.
44        """
45        try:
46            for partition in stream.generate_partitions():
47                # Adding partitions to the queue generates futures. To avoid having too many futures, we throttle here. We understand that
48                # we might add more futures than the limit by throttling in the threads while it is the main thread that actual adds the
49                # future but we expect the delta between the max futures length and the actual to be small enough that it would not be an
50                # issue. We do this in the threads because we want the main thread to always be processing QueueItems as if it does not, the
51                # queue size could grow and generating OOM issues.
52                #
53                # Also note that we do not expect this to create deadlocks where all worker threads wait because we have less
54                # PartitionEnqueuer threads than worker threads.
55                #
56                # Also note that prune_to_validate_has_reached_futures_limit has a lock while pruning which might create a bottleneck in
57                # terms of performance.
58                while self._thread_pool_manager.prune_to_validate_has_reached_futures_limit():
59                    time.sleep(self._sleep_time_in_seconds)
60                self._queue.put(partition)
61            self._queue.put(PartitionGenerationCompletedSentinel(stream))
62        except Exception as e:
63            self._queue.put(StreamThreadException(e, stream.name))
64            self._queue.put(PartitionGenerationCompletedSentinel(stream))
class PartitionEnqueuer:
17class PartitionEnqueuer:
18    """
19    Generates partitions from a partition generator and puts them in a queue.
20    """
21
22    def __init__(
23        self,
24        queue: Queue[QueueItem],
25        thread_pool_manager: ThreadPoolManager,
26        sleep_time_in_seconds: float = 0.1,
27    ) -> None:
28        """
29        :param queue:  The queue to put the partitions in.
30        :param throttler: The throttler to use to throttle the partition generation.
31        """
32        self._queue = queue
33        self._thread_pool_manager = thread_pool_manager
34        self._sleep_time_in_seconds = sleep_time_in_seconds
35
36    def generate_partitions(self, stream: AbstractStream) -> None:
37        """
38        Generate partitions from a partition generator and put them in a queue.
39        When all the partitions are added to the queue, a sentinel is added to the queue to indicate that all the partitions have been generated.
40
41        If an exception is encountered, the exception will be caught and put in the queue. This is very important because if we don't, the
42        main thread will have no way to know that something when wrong and will wait until the timeout is reached
43
44        This method is meant to be called in a separate thread.
45        """
46        try:
47            for partition in stream.generate_partitions():
48                # Adding partitions to the queue generates futures. To avoid having too many futures, we throttle here. We understand that
49                # we might add more futures than the limit by throttling in the threads while it is the main thread that actual adds the
50                # future but we expect the delta between the max futures length and the actual to be small enough that it would not be an
51                # issue. We do this in the threads because we want the main thread to always be processing QueueItems as if it does not, the
52                # queue size could grow and generating OOM issues.
53                #
54                # Also note that we do not expect this to create deadlocks where all worker threads wait because we have less
55                # PartitionEnqueuer threads than worker threads.
56                #
57                # Also note that prune_to_validate_has_reached_futures_limit has a lock while pruning which might create a bottleneck in
58                # terms of performance.
59                while self._thread_pool_manager.prune_to_validate_has_reached_futures_limit():
60                    time.sleep(self._sleep_time_in_seconds)
61                self._queue.put(partition)
62            self._queue.put(PartitionGenerationCompletedSentinel(stream))
63        except Exception as e:
64            self._queue.put(StreamThreadException(e, stream.name))
65            self._queue.put(PartitionGenerationCompletedSentinel(stream))

Generates partitions from a partition generator and puts them in a queue.

22    def __init__(
23        self,
24        queue: Queue[QueueItem],
25        thread_pool_manager: ThreadPoolManager,
26        sleep_time_in_seconds: float = 0.1,
27    ) -> None:
28        """
29        :param queue:  The queue to put the partitions in.
30        :param throttler: The throttler to use to throttle the partition generation.
31        """
32        self._queue = queue
33        self._thread_pool_manager = thread_pool_manager
34        self._sleep_time_in_seconds = sleep_time_in_seconds
Parameters
  • queue: The queue to put the partitions in.
  • throttler: The throttler to use to throttle the partition generation.
def generate_partitions( self, stream: airbyte_cdk.sources.streams.concurrent.abstract_stream.AbstractStream) -> None:
36    def generate_partitions(self, stream: AbstractStream) -> None:
37        """
38        Generate partitions from a partition generator and put them in a queue.
39        When all the partitions are added to the queue, a sentinel is added to the queue to indicate that all the partitions have been generated.
40
41        If an exception is encountered, the exception will be caught and put in the queue. This is very important because if we don't, the
42        main thread will have no way to know that something when wrong and will wait until the timeout is reached
43
44        This method is meant to be called in a separate thread.
45        """
46        try:
47            for partition in stream.generate_partitions():
48                # Adding partitions to the queue generates futures. To avoid having too many futures, we throttle here. We understand that
49                # we might add more futures than the limit by throttling in the threads while it is the main thread that actual adds the
50                # future but we expect the delta between the max futures length and the actual to be small enough that it would not be an
51                # issue. We do this in the threads because we want the main thread to always be processing QueueItems as if it does not, the
52                # queue size could grow and generating OOM issues.
53                #
54                # Also note that we do not expect this to create deadlocks where all worker threads wait because we have less
55                # PartitionEnqueuer threads than worker threads.
56                #
57                # Also note that prune_to_validate_has_reached_futures_limit has a lock while pruning which might create a bottleneck in
58                # terms of performance.
59                while self._thread_pool_manager.prune_to_validate_has_reached_futures_limit():
60                    time.sleep(self._sleep_time_in_seconds)
61                self._queue.put(partition)
62            self._queue.put(PartitionGenerationCompletedSentinel(stream))
63        except Exception as e:
64            self._queue.put(StreamThreadException(e, stream.name))
65            self._queue.put(PartitionGenerationCompletedSentinel(stream))

Generate partitions from a partition generator and put them in a queue. When all the partitions are added to the queue, a sentinel is added to the queue to indicate that all the partitions have been generated.

If an exception is encountered, the exception will be caught and put in the queue. This is very important because if we don't, the main thread will have no way to know that something when wrong and will wait until the timeout is reached

This method is meant to be called in a separate thread.