airbyte_cdk.sources.concurrent_source.concurrent_source

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4import concurrent
  5import logging
  6from queue import Queue
  7from typing import Iterable, Iterator, List
  8
  9from airbyte_cdk.models import AirbyteMessage
 10from airbyte_cdk.sources.concurrent_source.concurrent_read_processor import ConcurrentReadProcessor
 11from airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel import (
 12    PartitionGenerationCompletedSentinel,
 13)
 14from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException
 15from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager
 16from airbyte_cdk.sources.message import InMemoryMessageRepository, MessageRepository
 17from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
 18from airbyte_cdk.sources.streams.concurrent.partition_enqueuer import PartitionEnqueuer
 19from airbyte_cdk.sources.streams.concurrent.partition_reader import PartitionReader
 20from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
 21from airbyte_cdk.sources.streams.concurrent.partitions.types import (
 22    PartitionCompleteSentinel,
 23    QueueItem,
 24)
 25from airbyte_cdk.sources.types import Record
 26from airbyte_cdk.sources.utils.slice_logger import DebugSliceLogger, SliceLogger
 27
 28
 29class ConcurrentSource:
 30    """
 31    A Source that reads data from multiple AbstractStreams concurrently.
 32    It does so by submitting partition generation, and partition read tasks to a thread pool.
 33    The tasks asynchronously add their output to a shared queue.
 34    The read is done when all partitions for all streams w ere generated and read.
 35    """
 36
 37    DEFAULT_TIMEOUT_SECONDS = 900
 38
 39    @staticmethod
 40    def create(
 41        num_workers: int,
 42        initial_number_of_partitions_to_generate: int,
 43        logger: logging.Logger,
 44        slice_logger: SliceLogger,
 45        message_repository: MessageRepository,
 46        timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
 47    ) -> "ConcurrentSource":
 48        is_single_threaded = initial_number_of_partitions_to_generate == 1 and num_workers == 1
 49        too_many_generator = (
 50            not is_single_threaded and initial_number_of_partitions_to_generate >= num_workers
 51        )
 52        assert (
 53            not too_many_generator
 54        ), "It is required to have more workers than threads generating partitions"
 55        threadpool = ThreadPoolManager(
 56            concurrent.futures.ThreadPoolExecutor(
 57                max_workers=num_workers, thread_name_prefix="workerpool"
 58            ),
 59            logger,
 60        )
 61        return ConcurrentSource(
 62            threadpool,
 63            logger,
 64            slice_logger,
 65            message_repository,
 66            initial_number_of_partitions_to_generate,
 67            timeout_seconds,
 68        )
 69
 70    def __init__(
 71        self,
 72        threadpool: ThreadPoolManager,
 73        logger: logging.Logger,
 74        slice_logger: SliceLogger = DebugSliceLogger(),
 75        message_repository: MessageRepository = InMemoryMessageRepository(),
 76        initial_number_partitions_to_generate: int = 1,
 77        timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
 78    ) -> None:
 79        """
 80        :param threadpool: The threadpool to submit tasks to
 81        :param logger: The logger to log to
 82        :param slice_logger: The slice logger used to create messages on new slices
 83        :param message_repository: The repository to emit messages to
 84        :param initial_number_partitions_to_generate: The initial number of concurrent partition generation tasks. Limiting this number ensures will limit the latency of the first records emitted. While the latency is not critical, emitting the records early allows the platform and the destination to process them as early as possible.
 85        :param timeout_seconds: The maximum number of seconds to wait for a record to be read from the queue. If no record is read within this time, the source will stop reading and return.
 86        """
 87        self._threadpool = threadpool
 88        self._logger = logger
 89        self._slice_logger = slice_logger
 90        self._message_repository = message_repository
 91        self._initial_number_partitions_to_generate = initial_number_partitions_to_generate
 92        self._timeout_seconds = timeout_seconds
 93
 94    def read(
 95        self,
 96        streams: List[AbstractStream],
 97    ) -> Iterator[AirbyteMessage]:
 98        self._logger.info("Starting syncing")
 99
100        # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less
101        # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating
102        # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more
103        # information and might even need to be configurable depending on the source
104        queue: Queue[QueueItem] = Queue(maxsize=10_000)
105        concurrent_stream_processor = ConcurrentReadProcessor(
106            streams,
107            PartitionEnqueuer(queue, self._threadpool),
108            self._threadpool,
109            self._logger,
110            self._slice_logger,
111            self._message_repository,
112            PartitionReader(queue),
113        )
114
115        # Enqueue initial partition generation tasks
116        yield from self._submit_initial_partition_generators(concurrent_stream_processor)
117
118        # Read from the queue until all partitions were generated and read
119        yield from self._consume_from_queue(
120            queue,
121            concurrent_stream_processor,
122        )
123        self._threadpool.check_for_errors_and_shutdown()
124        self._logger.info("Finished syncing")
125
126    def _submit_initial_partition_generators(
127        self, concurrent_stream_processor: ConcurrentReadProcessor
128    ) -> Iterable[AirbyteMessage]:
129        for _ in range(self._initial_number_partitions_to_generate):
130            status_message = concurrent_stream_processor.start_next_partition_generator()
131            if status_message:
132                yield status_message
133
134    def _consume_from_queue(
135        self,
136        queue: Queue[QueueItem],
137        concurrent_stream_processor: ConcurrentReadProcessor,
138    ) -> Iterable[AirbyteMessage]:
139        while airbyte_message_or_record_or_exception := queue.get():
140            yield from self._handle_item(
141                airbyte_message_or_record_or_exception,
142                concurrent_stream_processor,
143            )
144            if concurrent_stream_processor.is_done() and queue.empty():
145                # all partitions were generated and processed. we're done here
146                break
147
148    def _handle_item(
149        self,
150        queue_item: QueueItem,
151        concurrent_stream_processor: ConcurrentReadProcessor,
152    ) -> Iterable[AirbyteMessage]:
153        # handle queue item and call the appropriate handler depending on the type of the queue item
154        if isinstance(queue_item, StreamThreadException):
155            yield from concurrent_stream_processor.on_exception(queue_item)
156        elif isinstance(queue_item, PartitionGenerationCompletedSentinel):
157            yield from concurrent_stream_processor.on_partition_generation_completed(queue_item)
158        elif isinstance(queue_item, Partition):
159            concurrent_stream_processor.on_partition(queue_item)
160        elif isinstance(queue_item, PartitionCompleteSentinel):
161            yield from concurrent_stream_processor.on_partition_complete_sentinel(queue_item)
162        elif isinstance(queue_item, Record):
163            yield from concurrent_stream_processor.on_record(queue_item)
164        else:
165            raise ValueError(f"Unknown queue item type: {type(queue_item)}")
class ConcurrentSource:
 30class ConcurrentSource:
 31    """
 32    A Source that reads data from multiple AbstractStreams concurrently.
 33    It does so by submitting partition generation, and partition read tasks to a thread pool.
 34    The tasks asynchronously add their output to a shared queue.
 35    The read is done when all partitions for all streams w ere generated and read.
 36    """
 37
 38    DEFAULT_TIMEOUT_SECONDS = 900
 39
 40    @staticmethod
 41    def create(
 42        num_workers: int,
 43        initial_number_of_partitions_to_generate: int,
 44        logger: logging.Logger,
 45        slice_logger: SliceLogger,
 46        message_repository: MessageRepository,
 47        timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
 48    ) -> "ConcurrentSource":
 49        is_single_threaded = initial_number_of_partitions_to_generate == 1 and num_workers == 1
 50        too_many_generator = (
 51            not is_single_threaded and initial_number_of_partitions_to_generate >= num_workers
 52        )
 53        assert (
 54            not too_many_generator
 55        ), "It is required to have more workers than threads generating partitions"
 56        threadpool = ThreadPoolManager(
 57            concurrent.futures.ThreadPoolExecutor(
 58                max_workers=num_workers, thread_name_prefix="workerpool"
 59            ),
 60            logger,
 61        )
 62        return ConcurrentSource(
 63            threadpool,
 64            logger,
 65            slice_logger,
 66            message_repository,
 67            initial_number_of_partitions_to_generate,
 68            timeout_seconds,
 69        )
 70
 71    def __init__(
 72        self,
 73        threadpool: ThreadPoolManager,
 74        logger: logging.Logger,
 75        slice_logger: SliceLogger = DebugSliceLogger(),
 76        message_repository: MessageRepository = InMemoryMessageRepository(),
 77        initial_number_partitions_to_generate: int = 1,
 78        timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
 79    ) -> None:
 80        """
 81        :param threadpool: The threadpool to submit tasks to
 82        :param logger: The logger to log to
 83        :param slice_logger: The slice logger used to create messages on new slices
 84        :param message_repository: The repository to emit messages to
 85        :param initial_number_partitions_to_generate: The initial number of concurrent partition generation tasks. Limiting this number ensures will limit the latency of the first records emitted. While the latency is not critical, emitting the records early allows the platform and the destination to process them as early as possible.
 86        :param timeout_seconds: The maximum number of seconds to wait for a record to be read from the queue. If no record is read within this time, the source will stop reading and return.
 87        """
 88        self._threadpool = threadpool
 89        self._logger = logger
 90        self._slice_logger = slice_logger
 91        self._message_repository = message_repository
 92        self._initial_number_partitions_to_generate = initial_number_partitions_to_generate
 93        self._timeout_seconds = timeout_seconds
 94
 95    def read(
 96        self,
 97        streams: List[AbstractStream],
 98    ) -> Iterator[AirbyteMessage]:
 99        self._logger.info("Starting syncing")
100
101        # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less
102        # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating
103        # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more
104        # information and might even need to be configurable depending on the source
105        queue: Queue[QueueItem] = Queue(maxsize=10_000)
106        concurrent_stream_processor = ConcurrentReadProcessor(
107            streams,
108            PartitionEnqueuer(queue, self._threadpool),
109            self._threadpool,
110            self._logger,
111            self._slice_logger,
112            self._message_repository,
113            PartitionReader(queue),
114        )
115
116        # Enqueue initial partition generation tasks
117        yield from self._submit_initial_partition_generators(concurrent_stream_processor)
118
119        # Read from the queue until all partitions were generated and read
120        yield from self._consume_from_queue(
121            queue,
122            concurrent_stream_processor,
123        )
124        self._threadpool.check_for_errors_and_shutdown()
125        self._logger.info("Finished syncing")
126
127    def _submit_initial_partition_generators(
128        self, concurrent_stream_processor: ConcurrentReadProcessor
129    ) -> Iterable[AirbyteMessage]:
130        for _ in range(self._initial_number_partitions_to_generate):
131            status_message = concurrent_stream_processor.start_next_partition_generator()
132            if status_message:
133                yield status_message
134
135    def _consume_from_queue(
136        self,
137        queue: Queue[QueueItem],
138        concurrent_stream_processor: ConcurrentReadProcessor,
139    ) -> Iterable[AirbyteMessage]:
140        while airbyte_message_or_record_or_exception := queue.get():
141            yield from self._handle_item(
142                airbyte_message_or_record_or_exception,
143                concurrent_stream_processor,
144            )
145            if concurrent_stream_processor.is_done() and queue.empty():
146                # all partitions were generated and processed. we're done here
147                break
148
149    def _handle_item(
150        self,
151        queue_item: QueueItem,
152        concurrent_stream_processor: ConcurrentReadProcessor,
153    ) -> Iterable[AirbyteMessage]:
154        # handle queue item and call the appropriate handler depending on the type of the queue item
155        if isinstance(queue_item, StreamThreadException):
156            yield from concurrent_stream_processor.on_exception(queue_item)
157        elif isinstance(queue_item, PartitionGenerationCompletedSentinel):
158            yield from concurrent_stream_processor.on_partition_generation_completed(queue_item)
159        elif isinstance(queue_item, Partition):
160            concurrent_stream_processor.on_partition(queue_item)
161        elif isinstance(queue_item, PartitionCompleteSentinel):
162            yield from concurrent_stream_processor.on_partition_complete_sentinel(queue_item)
163        elif isinstance(queue_item, Record):
164            yield from concurrent_stream_processor.on_record(queue_item)
165        else:
166            raise ValueError(f"Unknown queue item type: {type(queue_item)}")

A Source that reads data from multiple AbstractStreams concurrently. It does so by submitting partition generation, and partition read tasks to a thread pool. The tasks asynchronously add their output to a shared queue. The read is done when all partitions for all streams w ere generated and read.

ConcurrentSource( threadpool: airbyte_cdk.sources.concurrent_source.thread_pool_manager.ThreadPoolManager, logger: logging.Logger, slice_logger: airbyte_cdk.sources.utils.slice_logger.SliceLogger = <airbyte_cdk.sources.utils.slice_logger.DebugSliceLogger object>, message_repository: airbyte_cdk.MessageRepository = <airbyte_cdk.InMemoryMessageRepository object>, initial_number_partitions_to_generate: int = 1, timeout_seconds: int = 900)
71    def __init__(
72        self,
73        threadpool: ThreadPoolManager,
74        logger: logging.Logger,
75        slice_logger: SliceLogger = DebugSliceLogger(),
76        message_repository: MessageRepository = InMemoryMessageRepository(),
77        initial_number_partitions_to_generate: int = 1,
78        timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
79    ) -> None:
80        """
81        :param threadpool: The threadpool to submit tasks to
82        :param logger: The logger to log to
83        :param slice_logger: The slice logger used to create messages on new slices
84        :param message_repository: The repository to emit messages to
85        :param initial_number_partitions_to_generate: The initial number of concurrent partition generation tasks. Limiting this number ensures will limit the latency of the first records emitted. While the latency is not critical, emitting the records early allows the platform and the destination to process them as early as possible.
86        :param timeout_seconds: The maximum number of seconds to wait for a record to be read from the queue. If no record is read within this time, the source will stop reading and return.
87        """
88        self._threadpool = threadpool
89        self._logger = logger
90        self._slice_logger = slice_logger
91        self._message_repository = message_repository
92        self._initial_number_partitions_to_generate = initial_number_partitions_to_generate
93        self._timeout_seconds = timeout_seconds
Parameters
  • threadpool: The threadpool to submit tasks to
  • logger: The logger to log to
  • slice_logger: The slice logger used to create messages on new slices
  • message_repository: The repository to emit messages to
  • initial_number_partitions_to_generate: The initial number of concurrent partition generation tasks. Limiting this number ensures will limit the latency of the first records emitted. While the latency is not critical, emitting the records early allows the platform and the destination to process them as early as possible.
  • timeout_seconds: The maximum number of seconds to wait for a record to be read from the queue. If no record is read within this time, the source will stop reading and return.
DEFAULT_TIMEOUT_SECONDS = 900
@staticmethod
def create( num_workers: int, initial_number_of_partitions_to_generate: int, logger: logging.Logger, slice_logger: airbyte_cdk.sources.utils.slice_logger.SliceLogger, message_repository: airbyte_cdk.MessageRepository, timeout_seconds: int = 900) -> ConcurrentSource:
40    @staticmethod
41    def create(
42        num_workers: int,
43        initial_number_of_partitions_to_generate: int,
44        logger: logging.Logger,
45        slice_logger: SliceLogger,
46        message_repository: MessageRepository,
47        timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
48    ) -> "ConcurrentSource":
49        is_single_threaded = initial_number_of_partitions_to_generate == 1 and num_workers == 1
50        too_many_generator = (
51            not is_single_threaded and initial_number_of_partitions_to_generate >= num_workers
52        )
53        assert (
54            not too_many_generator
55        ), "It is required to have more workers than threads generating partitions"
56        threadpool = ThreadPoolManager(
57            concurrent.futures.ThreadPoolExecutor(
58                max_workers=num_workers, thread_name_prefix="workerpool"
59            ),
60            logger,
61        )
62        return ConcurrentSource(
63            threadpool,
64            logger,
65            slice_logger,
66            message_repository,
67            initial_number_of_partitions_to_generate,
68            timeout_seconds,
69        )
 95    def read(
 96        self,
 97        streams: List[AbstractStream],
 98    ) -> Iterator[AirbyteMessage]:
 99        self._logger.info("Starting syncing")
100
101        # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less
102        # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating
103        # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more
104        # information and might even need to be configurable depending on the source
105        queue: Queue[QueueItem] = Queue(maxsize=10_000)
106        concurrent_stream_processor = ConcurrentReadProcessor(
107            streams,
108            PartitionEnqueuer(queue, self._threadpool),
109            self._threadpool,
110            self._logger,
111            self._slice_logger,
112            self._message_repository,
113            PartitionReader(queue),
114        )
115
116        # Enqueue initial partition generation tasks
117        yield from self._submit_initial_partition_generators(concurrent_stream_processor)
118
119        # Read from the queue until all partitions were generated and read
120        yield from self._consume_from_queue(
121            queue,
122            concurrent_stream_processor,
123        )
124        self._threadpool.check_for_errors_and_shutdown()
125        self._logger.info("Finished syncing")