airbyte_cdk.sources.concurrent_source.concurrent_source

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5import concurrent
  6import logging
  7from queue import Queue
  8from typing import Iterable, Iterator, List, Optional
  9
 10from airbyte_cdk.models import AirbyteMessage
 11from airbyte_cdk.sources.concurrent_source.concurrent_read_processor import ConcurrentReadProcessor
 12from airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel import (
 13    PartitionGenerationCompletedSentinel,
 14)
 15from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException
 16from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager
 17from airbyte_cdk.sources.message import InMemoryMessageRepository, MessageRepository
 18from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
 19from airbyte_cdk.sources.streams.concurrent.partition_enqueuer import PartitionEnqueuer
 20from airbyte_cdk.sources.streams.concurrent.partition_reader import PartitionLogger, PartitionReader
 21from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
 22from airbyte_cdk.sources.streams.concurrent.partitions.types import (
 23    PartitionCompleteSentinel,
 24    QueueItem,
 25)
 26from airbyte_cdk.sources.types import Record
 27from airbyte_cdk.sources.utils.slice_logger import DebugSliceLogger, SliceLogger
 28
 29
 30class ConcurrentSource:
 31    """
 32    A Source that reads data from multiple AbstractStreams concurrently.
 33    It does so by submitting partition generation, and partition read tasks to a thread pool.
 34    The tasks asynchronously add their output to a shared queue.
 35    The read is done when all partitions for all streams w ere generated and read.
 36    """
 37
 38    DEFAULT_TIMEOUT_SECONDS = 900
 39
 40    @staticmethod
 41    def create(
 42        num_workers: int,
 43        initial_number_of_partitions_to_generate: int,
 44        logger: logging.Logger,
 45        slice_logger: SliceLogger,
 46        message_repository: MessageRepository,
 47        queue: Optional[Queue[QueueItem]] = None,
 48        timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
 49    ) -> "ConcurrentSource":
 50        if initial_number_of_partitions_to_generate < 1:
 51            raise ValueError(
 52                f"initial_number_of_partitions_to_generate must be >= 1, got {initial_number_of_partitions_to_generate}"
 53            )
 54        is_single_threaded = initial_number_of_partitions_to_generate == 1 and num_workers == 1
 55        too_many_generator = (
 56            not is_single_threaded and initial_number_of_partitions_to_generate >= num_workers
 57        )
 58        assert not too_many_generator, (
 59            "It is required to have more workers than threads generating partitions"
 60        )
 61        threadpool = ThreadPoolManager(
 62            concurrent.futures.ThreadPoolExecutor(
 63                max_workers=num_workers, thread_name_prefix="workerpool"
 64            ),
 65            logger,
 66        )
 67        return ConcurrentSource(
 68            threadpool=threadpool,
 69            logger=logger,
 70            slice_logger=slice_logger,
 71            queue=queue,
 72            message_repository=message_repository,
 73            initial_number_partitions_to_generate=initial_number_of_partitions_to_generate,
 74            timeout_seconds=timeout_seconds,
 75        )
 76
 77    def __init__(
 78        self,
 79        threadpool: ThreadPoolManager,
 80        logger: logging.Logger,
 81        slice_logger: SliceLogger = DebugSliceLogger(),
 82        queue: Optional[Queue[QueueItem]] = None,
 83        message_repository: MessageRepository = InMemoryMessageRepository(),
 84        initial_number_partitions_to_generate: int = 1,
 85        timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
 86    ) -> None:
 87        """
 88        :param threadpool: The threadpool to submit tasks to
 89        :param logger: The logger to log to
 90        :param slice_logger: The slice logger used to create messages on new slices
 91        :param message_repository: The repository to emit messages to
 92        :param initial_number_partitions_to_generate: The initial number of concurrent partition generation tasks. Limiting this number ensures will limit the latency of the first records emitted. While the latency is not critical, emitting the records early allows the platform and the destination to process them as early as possible.
 93        :param timeout_seconds: The maximum number of seconds to wait for a record to be read from the queue. If no record is read within this time, the source will stop reading and return.
 94        """
 95        self._threadpool = threadpool
 96        self._logger = logger
 97        self._slice_logger = slice_logger
 98        self._message_repository = message_repository
 99        self._initial_number_partitions_to_generate = initial_number_partitions_to_generate
100        self._timeout_seconds = timeout_seconds
101
102        # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less
103        # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating
104        # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more
105        # information and might even need to be configurable depending on the source
106        self._queue = queue or Queue(maxsize=10_000)
107
108    def read(
109        self,
110        streams: List[AbstractStream],
111    ) -> Iterator[AirbyteMessage]:
112        self._logger.info("Starting syncing")
113        concurrent_stream_processor = ConcurrentReadProcessor(
114            streams,
115            PartitionEnqueuer(self._queue, self._threadpool),
116            self._threadpool,
117            self._logger,
118            self._slice_logger,
119            self._message_repository,
120            PartitionReader(
121                self._queue,
122                PartitionLogger(self._slice_logger, self._logger, self._message_repository),
123            ),
124            max_concurrent_partition_generators=self._initial_number_partitions_to_generate,
125        )
126
127        # Enqueue initial partition generation tasks
128        yield from self._submit_initial_partition_generators(concurrent_stream_processor)
129
130        # Read from the queue until all partitions were generated and read
131        yield from self._consume_from_queue(
132            self._queue,
133            concurrent_stream_processor,
134        )
135        self._threadpool.check_for_errors_and_shutdown()
136        self._logger.info("Finished syncing")
137
138    def _submit_initial_partition_generators(
139        self, concurrent_stream_processor: ConcurrentReadProcessor
140    ) -> Iterable[AirbyteMessage]:
141        for _ in range(self._initial_number_partitions_to_generate):
142            status_message = concurrent_stream_processor.start_next_partition_generator()
143            if status_message:
144                yield status_message
145
146    def _consume_from_queue(
147        self,
148        queue: Queue[QueueItem],
149        concurrent_stream_processor: ConcurrentReadProcessor,
150    ) -> Iterable[AirbyteMessage]:
151        while airbyte_message_or_record_or_exception := queue.get():
152            yield from self._handle_item(
153                airbyte_message_or_record_or_exception,
154                concurrent_stream_processor,
155            )
156            # In the event that a partition raises an exception, anything remaining in
157            # the queue will be missed because is_done() can raise an exception and exit
158            # out of this loop before remaining items are consumed
159            if queue.empty() and concurrent_stream_processor.is_done():
160                # all partitions were generated and processed. we're done here
161                break
162
163    def _handle_item(
164        self,
165        queue_item: QueueItem,
166        concurrent_stream_processor: ConcurrentReadProcessor,
167    ) -> Iterable[AirbyteMessage]:
168        # handle queue item and call the appropriate handler depending on the type of the queue item
169        if isinstance(queue_item, StreamThreadException):
170            yield from concurrent_stream_processor.on_exception(queue_item)
171        elif isinstance(queue_item, PartitionGenerationCompletedSentinel):
172            yield from concurrent_stream_processor.on_partition_generation_completed(queue_item)
173        elif isinstance(queue_item, Partition):
174            concurrent_stream_processor.on_partition(queue_item)
175        elif isinstance(queue_item, PartitionCompleteSentinel):
176            yield from concurrent_stream_processor.on_partition_complete_sentinel(queue_item)
177        elif isinstance(queue_item, Record):
178            yield from concurrent_stream_processor.on_record(queue_item)
179        elif isinstance(queue_item, AirbyteMessage):
180            yield queue_item
181        else:
182            raise ValueError(f"Unknown queue item type: {type(queue_item)}")
class ConcurrentSource:
 31class ConcurrentSource:
 32    """
 33    A Source that reads data from multiple AbstractStreams concurrently.
 34    It does so by submitting partition generation, and partition read tasks to a thread pool.
 35    The tasks asynchronously add their output to a shared queue.
 36    The read is done when all partitions for all streams w ere generated and read.
 37    """
 38
 39    DEFAULT_TIMEOUT_SECONDS = 900
 40
 41    @staticmethod
 42    def create(
 43        num_workers: int,
 44        initial_number_of_partitions_to_generate: int,
 45        logger: logging.Logger,
 46        slice_logger: SliceLogger,
 47        message_repository: MessageRepository,
 48        queue: Optional[Queue[QueueItem]] = None,
 49        timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
 50    ) -> "ConcurrentSource":
 51        if initial_number_of_partitions_to_generate < 1:
 52            raise ValueError(
 53                f"initial_number_of_partitions_to_generate must be >= 1, got {initial_number_of_partitions_to_generate}"
 54            )
 55        is_single_threaded = initial_number_of_partitions_to_generate == 1 and num_workers == 1
 56        too_many_generator = (
 57            not is_single_threaded and initial_number_of_partitions_to_generate >= num_workers
 58        )
 59        assert not too_many_generator, (
 60            "It is required to have more workers than threads generating partitions"
 61        )
 62        threadpool = ThreadPoolManager(
 63            concurrent.futures.ThreadPoolExecutor(
 64                max_workers=num_workers, thread_name_prefix="workerpool"
 65            ),
 66            logger,
 67        )
 68        return ConcurrentSource(
 69            threadpool=threadpool,
 70            logger=logger,
 71            slice_logger=slice_logger,
 72            queue=queue,
 73            message_repository=message_repository,
 74            initial_number_partitions_to_generate=initial_number_of_partitions_to_generate,
 75            timeout_seconds=timeout_seconds,
 76        )
 77
 78    def __init__(
 79        self,
 80        threadpool: ThreadPoolManager,
 81        logger: logging.Logger,
 82        slice_logger: SliceLogger = DebugSliceLogger(),
 83        queue: Optional[Queue[QueueItem]] = None,
 84        message_repository: MessageRepository = InMemoryMessageRepository(),
 85        initial_number_partitions_to_generate: int = 1,
 86        timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
 87    ) -> None:
 88        """
 89        :param threadpool: The threadpool to submit tasks to
 90        :param logger: The logger to log to
 91        :param slice_logger: The slice logger used to create messages on new slices
 92        :param message_repository: The repository to emit messages to
 93        :param initial_number_partitions_to_generate: The initial number of concurrent partition generation tasks. Limiting this number ensures will limit the latency of the first records emitted. While the latency is not critical, emitting the records early allows the platform and the destination to process them as early as possible.
 94        :param timeout_seconds: The maximum number of seconds to wait for a record to be read from the queue. If no record is read within this time, the source will stop reading and return.
 95        """
 96        self._threadpool = threadpool
 97        self._logger = logger
 98        self._slice_logger = slice_logger
 99        self._message_repository = message_repository
100        self._initial_number_partitions_to_generate = initial_number_partitions_to_generate
101        self._timeout_seconds = timeout_seconds
102
103        # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less
104        # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating
105        # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more
106        # information and might even need to be configurable depending on the source
107        self._queue = queue or Queue(maxsize=10_000)
108
109    def read(
110        self,
111        streams: List[AbstractStream],
112    ) -> Iterator[AirbyteMessage]:
113        self._logger.info("Starting syncing")
114        concurrent_stream_processor = ConcurrentReadProcessor(
115            streams,
116            PartitionEnqueuer(self._queue, self._threadpool),
117            self._threadpool,
118            self._logger,
119            self._slice_logger,
120            self._message_repository,
121            PartitionReader(
122                self._queue,
123                PartitionLogger(self._slice_logger, self._logger, self._message_repository),
124            ),
125            max_concurrent_partition_generators=self._initial_number_partitions_to_generate,
126        )
127
128        # Enqueue initial partition generation tasks
129        yield from self._submit_initial_partition_generators(concurrent_stream_processor)
130
131        # Read from the queue until all partitions were generated and read
132        yield from self._consume_from_queue(
133            self._queue,
134            concurrent_stream_processor,
135        )
136        self._threadpool.check_for_errors_and_shutdown()
137        self._logger.info("Finished syncing")
138
139    def _submit_initial_partition_generators(
140        self, concurrent_stream_processor: ConcurrentReadProcessor
141    ) -> Iterable[AirbyteMessage]:
142        for _ in range(self._initial_number_partitions_to_generate):
143            status_message = concurrent_stream_processor.start_next_partition_generator()
144            if status_message:
145                yield status_message
146
147    def _consume_from_queue(
148        self,
149        queue: Queue[QueueItem],
150        concurrent_stream_processor: ConcurrentReadProcessor,
151    ) -> Iterable[AirbyteMessage]:
152        while airbyte_message_or_record_or_exception := queue.get():
153            yield from self._handle_item(
154                airbyte_message_or_record_or_exception,
155                concurrent_stream_processor,
156            )
157            # In the event that a partition raises an exception, anything remaining in
158            # the queue will be missed because is_done() can raise an exception and exit
159            # out of this loop before remaining items are consumed
160            if queue.empty() and concurrent_stream_processor.is_done():
161                # all partitions were generated and processed. we're done here
162                break
163
164    def _handle_item(
165        self,
166        queue_item: QueueItem,
167        concurrent_stream_processor: ConcurrentReadProcessor,
168    ) -> Iterable[AirbyteMessage]:
169        # handle queue item and call the appropriate handler depending on the type of the queue item
170        if isinstance(queue_item, StreamThreadException):
171            yield from concurrent_stream_processor.on_exception(queue_item)
172        elif isinstance(queue_item, PartitionGenerationCompletedSentinel):
173            yield from concurrent_stream_processor.on_partition_generation_completed(queue_item)
174        elif isinstance(queue_item, Partition):
175            concurrent_stream_processor.on_partition(queue_item)
176        elif isinstance(queue_item, PartitionCompleteSentinel):
177            yield from concurrent_stream_processor.on_partition_complete_sentinel(queue_item)
178        elif isinstance(queue_item, Record):
179            yield from concurrent_stream_processor.on_record(queue_item)
180        elif isinstance(queue_item, AirbyteMessage):
181            yield queue_item
182        else:
183            raise ValueError(f"Unknown queue item type: {type(queue_item)}")

A Source that reads data from multiple AbstractStreams concurrently. It does so by submitting partition generation, and partition read tasks to a thread pool. The tasks asynchronously add their output to a shared queue. The read is done when all partitions for all streams w ere generated and read.

 78    def __init__(
 79        self,
 80        threadpool: ThreadPoolManager,
 81        logger: logging.Logger,
 82        slice_logger: SliceLogger = DebugSliceLogger(),
 83        queue: Optional[Queue[QueueItem]] = None,
 84        message_repository: MessageRepository = InMemoryMessageRepository(),
 85        initial_number_partitions_to_generate: int = 1,
 86        timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
 87    ) -> None:
 88        """
 89        :param threadpool: The threadpool to submit tasks to
 90        :param logger: The logger to log to
 91        :param slice_logger: The slice logger used to create messages on new slices
 92        :param message_repository: The repository to emit messages to
 93        :param initial_number_partitions_to_generate: The initial number of concurrent partition generation tasks. Limiting this number ensures will limit the latency of the first records emitted. While the latency is not critical, emitting the records early allows the platform and the destination to process them as early as possible.
 94        :param timeout_seconds: The maximum number of seconds to wait for a record to be read from the queue. If no record is read within this time, the source will stop reading and return.
 95        """
 96        self._threadpool = threadpool
 97        self._logger = logger
 98        self._slice_logger = slice_logger
 99        self._message_repository = message_repository
100        self._initial_number_partitions_to_generate = initial_number_partitions_to_generate
101        self._timeout_seconds = timeout_seconds
102
103        # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less
104        # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating
105        # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more
106        # information and might even need to be configurable depending on the source
107        self._queue = queue or Queue(maxsize=10_000)
Parameters
  • threadpool: The threadpool to submit tasks to
  • logger: The logger to log to
  • slice_logger: The slice logger used to create messages on new slices
  • message_repository: The repository to emit messages to
  • initial_number_partitions_to_generate: The initial number of concurrent partition generation tasks. Limiting this number ensures will limit the latency of the first records emitted. While the latency is not critical, emitting the records early allows the platform and the destination to process them as early as possible.
  • timeout_seconds: The maximum number of seconds to wait for a record to be read from the queue. If no record is read within this time, the source will stop reading and return.
DEFAULT_TIMEOUT_SECONDS = 900
41    @staticmethod
42    def create(
43        num_workers: int,
44        initial_number_of_partitions_to_generate: int,
45        logger: logging.Logger,
46        slice_logger: SliceLogger,
47        message_repository: MessageRepository,
48        queue: Optional[Queue[QueueItem]] = None,
49        timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
50    ) -> "ConcurrentSource":
51        if initial_number_of_partitions_to_generate < 1:
52            raise ValueError(
53                f"initial_number_of_partitions_to_generate must be >= 1, got {initial_number_of_partitions_to_generate}"
54            )
55        is_single_threaded = initial_number_of_partitions_to_generate == 1 and num_workers == 1
56        too_many_generator = (
57            not is_single_threaded and initial_number_of_partitions_to_generate >= num_workers
58        )
59        assert not too_many_generator, (
60            "It is required to have more workers than threads generating partitions"
61        )
62        threadpool = ThreadPoolManager(
63            concurrent.futures.ThreadPoolExecutor(
64                max_workers=num_workers, thread_name_prefix="workerpool"
65            ),
66            logger,
67        )
68        return ConcurrentSource(
69            threadpool=threadpool,
70            logger=logger,
71            slice_logger=slice_logger,
72            queue=queue,
73            message_repository=message_repository,
74            initial_number_partitions_to_generate=initial_number_of_partitions_to_generate,
75            timeout_seconds=timeout_seconds,
76        )
109    def read(
110        self,
111        streams: List[AbstractStream],
112    ) -> Iterator[AirbyteMessage]:
113        self._logger.info("Starting syncing")
114        concurrent_stream_processor = ConcurrentReadProcessor(
115            streams,
116            PartitionEnqueuer(self._queue, self._threadpool),
117            self._threadpool,
118            self._logger,
119            self._slice_logger,
120            self._message_repository,
121            PartitionReader(
122                self._queue,
123                PartitionLogger(self._slice_logger, self._logger, self._message_repository),
124            ),
125            max_concurrent_partition_generators=self._initial_number_partitions_to_generate,
126        )
127
128        # Enqueue initial partition generation tasks
129        yield from self._submit_initial_partition_generators(concurrent_stream_processor)
130
131        # Read from the queue until all partitions were generated and read
132        yield from self._consume_from_queue(
133            self._queue,
134            concurrent_stream_processor,
135        )
136        self._threadpool.check_for_errors_and_shutdown()
137        self._logger.info("Finished syncing")