airbyte_cdk.sources.concurrent_source.concurrent_source

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5import concurrent
  6import logging
  7from queue import Queue
  8from typing import Iterable, Iterator, List, Optional
  9
 10from airbyte_cdk.models import AirbyteMessage
 11from airbyte_cdk.sources.concurrent_source.concurrent_read_processor import ConcurrentReadProcessor
 12from airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel import (
 13    PartitionGenerationCompletedSentinel,
 14)
 15from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException
 16from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager
 17from airbyte_cdk.sources.message import InMemoryMessageRepository, MessageRepository
 18from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
 19from airbyte_cdk.sources.streams.concurrent.partition_enqueuer import PartitionEnqueuer
 20from airbyte_cdk.sources.streams.concurrent.partition_reader import PartitionLogger, PartitionReader
 21from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
 22from airbyte_cdk.sources.streams.concurrent.partitions.types import (
 23    PartitionCompleteSentinel,
 24    QueueItem,
 25)
 26from airbyte_cdk.sources.types import Record
 27from airbyte_cdk.sources.utils.slice_logger import DebugSliceLogger, SliceLogger
 28
 29
 30class ConcurrentSource:
 31    """
 32    A Source that reads data from multiple AbstractStreams concurrently.
 33    It does so by submitting partition generation, and partition read tasks to a thread pool.
 34    The tasks asynchronously add their output to a shared queue.
 35    The read is done when all partitions for all streams w ere generated and read.
 36    """
 37
 38    DEFAULT_TIMEOUT_SECONDS = 900
 39
 40    @staticmethod
 41    def create(
 42        num_workers: int,
 43        initial_number_of_partitions_to_generate: int,
 44        logger: logging.Logger,
 45        slice_logger: SliceLogger,
 46        message_repository: MessageRepository,
 47        queue: Optional[Queue[QueueItem]] = None,
 48        timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
 49    ) -> "ConcurrentSource":
 50        is_single_threaded = initial_number_of_partitions_to_generate == 1 and num_workers == 1
 51        too_many_generator = (
 52            not is_single_threaded and initial_number_of_partitions_to_generate >= num_workers
 53        )
 54        assert not too_many_generator, (
 55            "It is required to have more workers than threads generating partitions"
 56        )
 57        threadpool = ThreadPoolManager(
 58            concurrent.futures.ThreadPoolExecutor(
 59                max_workers=num_workers, thread_name_prefix="workerpool"
 60            ),
 61            logger,
 62        )
 63        return ConcurrentSource(
 64            threadpool=threadpool,
 65            logger=logger,
 66            slice_logger=slice_logger,
 67            queue=queue,
 68            message_repository=message_repository,
 69            initial_number_partitions_to_generate=initial_number_of_partitions_to_generate,
 70            timeout_seconds=timeout_seconds,
 71        )
 72
 73    def __init__(
 74        self,
 75        threadpool: ThreadPoolManager,
 76        logger: logging.Logger,
 77        slice_logger: SliceLogger = DebugSliceLogger(),
 78        queue: Optional[Queue[QueueItem]] = None,
 79        message_repository: MessageRepository = InMemoryMessageRepository(),
 80        initial_number_partitions_to_generate: int = 1,
 81        timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
 82    ) -> None:
 83        """
 84        :param threadpool: The threadpool to submit tasks to
 85        :param logger: The logger to log to
 86        :param slice_logger: The slice logger used to create messages on new slices
 87        :param message_repository: The repository to emit messages to
 88        :param initial_number_partitions_to_generate: The initial number of concurrent partition generation tasks. Limiting this number ensures will limit the latency of the first records emitted. While the latency is not critical, emitting the records early allows the platform and the destination to process them as early as possible.
 89        :param timeout_seconds: The maximum number of seconds to wait for a record to be read from the queue. If no record is read within this time, the source will stop reading and return.
 90        """
 91        self._threadpool = threadpool
 92        self._logger = logger
 93        self._slice_logger = slice_logger
 94        self._message_repository = message_repository
 95        self._initial_number_partitions_to_generate = initial_number_partitions_to_generate
 96        self._timeout_seconds = timeout_seconds
 97
 98        # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less
 99        # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating
100        # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more
101        # information and might even need to be configurable depending on the source
102        self._queue = queue or Queue(maxsize=10_000)
103
104    def read(
105        self,
106        streams: List[AbstractStream],
107    ) -> Iterator[AirbyteMessage]:
108        self._logger.info("Starting syncing")
109        concurrent_stream_processor = ConcurrentReadProcessor(
110            streams,
111            PartitionEnqueuer(self._queue, self._threadpool),
112            self._threadpool,
113            self._logger,
114            self._slice_logger,
115            self._message_repository,
116            PartitionReader(
117                self._queue,
118                PartitionLogger(self._slice_logger, self._logger, self._message_repository),
119            ),
120        )
121
122        # Enqueue initial partition generation tasks
123        yield from self._submit_initial_partition_generators(concurrent_stream_processor)
124
125        # Read from the queue until all partitions were generated and read
126        yield from self._consume_from_queue(
127            self._queue,
128            concurrent_stream_processor,
129        )
130        self._threadpool.check_for_errors_and_shutdown()
131        self._logger.info("Finished syncing")
132
133    def _submit_initial_partition_generators(
134        self, concurrent_stream_processor: ConcurrentReadProcessor
135    ) -> Iterable[AirbyteMessage]:
136        for _ in range(self._initial_number_partitions_to_generate):
137            status_message = concurrent_stream_processor.start_next_partition_generator()
138            if status_message:
139                yield status_message
140
141    def _consume_from_queue(
142        self,
143        queue: Queue[QueueItem],
144        concurrent_stream_processor: ConcurrentReadProcessor,
145    ) -> Iterable[AirbyteMessage]:
146        while airbyte_message_or_record_or_exception := queue.get():
147            yield from self._handle_item(
148                airbyte_message_or_record_or_exception,
149                concurrent_stream_processor,
150            )
151            # In the event that a partition raises an exception, anything remaining in
152            # the queue will be missed because is_done() can raise an exception and exit
153            # out of this loop before remaining items are consumed
154            if queue.empty() and concurrent_stream_processor.is_done():
155                # all partitions were generated and processed. we're done here
156                break
157
158    def _handle_item(
159        self,
160        queue_item: QueueItem,
161        concurrent_stream_processor: ConcurrentReadProcessor,
162    ) -> Iterable[AirbyteMessage]:
163        # handle queue item and call the appropriate handler depending on the type of the queue item
164        if isinstance(queue_item, StreamThreadException):
165            yield from concurrent_stream_processor.on_exception(queue_item)
166        elif isinstance(queue_item, PartitionGenerationCompletedSentinel):
167            yield from concurrent_stream_processor.on_partition_generation_completed(queue_item)
168        elif isinstance(queue_item, Partition):
169            concurrent_stream_processor.on_partition(queue_item)
170        elif isinstance(queue_item, PartitionCompleteSentinel):
171            yield from concurrent_stream_processor.on_partition_complete_sentinel(queue_item)
172        elif isinstance(queue_item, Record):
173            yield from concurrent_stream_processor.on_record(queue_item)
174        elif isinstance(queue_item, AirbyteMessage):
175            yield queue_item
176        else:
177            raise ValueError(f"Unknown queue item type: {type(queue_item)}")
class ConcurrentSource:
 31class ConcurrentSource:
 32    """
 33    A Source that reads data from multiple AbstractStreams concurrently.
 34    It does so by submitting partition generation, and partition read tasks to a thread pool.
 35    The tasks asynchronously add their output to a shared queue.
 36    The read is done when all partitions for all streams w ere generated and read.
 37    """
 38
 39    DEFAULT_TIMEOUT_SECONDS = 900
 40
 41    @staticmethod
 42    def create(
 43        num_workers: int,
 44        initial_number_of_partitions_to_generate: int,
 45        logger: logging.Logger,
 46        slice_logger: SliceLogger,
 47        message_repository: MessageRepository,
 48        queue: Optional[Queue[QueueItem]] = None,
 49        timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
 50    ) -> "ConcurrentSource":
 51        is_single_threaded = initial_number_of_partitions_to_generate == 1 and num_workers == 1
 52        too_many_generator = (
 53            not is_single_threaded and initial_number_of_partitions_to_generate >= num_workers
 54        )
 55        assert not too_many_generator, (
 56            "It is required to have more workers than threads generating partitions"
 57        )
 58        threadpool = ThreadPoolManager(
 59            concurrent.futures.ThreadPoolExecutor(
 60                max_workers=num_workers, thread_name_prefix="workerpool"
 61            ),
 62            logger,
 63        )
 64        return ConcurrentSource(
 65            threadpool=threadpool,
 66            logger=logger,
 67            slice_logger=slice_logger,
 68            queue=queue,
 69            message_repository=message_repository,
 70            initial_number_partitions_to_generate=initial_number_of_partitions_to_generate,
 71            timeout_seconds=timeout_seconds,
 72        )
 73
 74    def __init__(
 75        self,
 76        threadpool: ThreadPoolManager,
 77        logger: logging.Logger,
 78        slice_logger: SliceLogger = DebugSliceLogger(),
 79        queue: Optional[Queue[QueueItem]] = None,
 80        message_repository: MessageRepository = InMemoryMessageRepository(),
 81        initial_number_partitions_to_generate: int = 1,
 82        timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
 83    ) -> None:
 84        """
 85        :param threadpool: The threadpool to submit tasks to
 86        :param logger: The logger to log to
 87        :param slice_logger: The slice logger used to create messages on new slices
 88        :param message_repository: The repository to emit messages to
 89        :param initial_number_partitions_to_generate: The initial number of concurrent partition generation tasks. Limiting this number ensures will limit the latency of the first records emitted. While the latency is not critical, emitting the records early allows the platform and the destination to process them as early as possible.
 90        :param timeout_seconds: The maximum number of seconds to wait for a record to be read from the queue. If no record is read within this time, the source will stop reading and return.
 91        """
 92        self._threadpool = threadpool
 93        self._logger = logger
 94        self._slice_logger = slice_logger
 95        self._message_repository = message_repository
 96        self._initial_number_partitions_to_generate = initial_number_partitions_to_generate
 97        self._timeout_seconds = timeout_seconds
 98
 99        # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less
100        # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating
101        # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more
102        # information and might even need to be configurable depending on the source
103        self._queue = queue or Queue(maxsize=10_000)
104
105    def read(
106        self,
107        streams: List[AbstractStream],
108    ) -> Iterator[AirbyteMessage]:
109        self._logger.info("Starting syncing")
110        concurrent_stream_processor = ConcurrentReadProcessor(
111            streams,
112            PartitionEnqueuer(self._queue, self._threadpool),
113            self._threadpool,
114            self._logger,
115            self._slice_logger,
116            self._message_repository,
117            PartitionReader(
118                self._queue,
119                PartitionLogger(self._slice_logger, self._logger, self._message_repository),
120            ),
121        )
122
123        # Enqueue initial partition generation tasks
124        yield from self._submit_initial_partition_generators(concurrent_stream_processor)
125
126        # Read from the queue until all partitions were generated and read
127        yield from self._consume_from_queue(
128            self._queue,
129            concurrent_stream_processor,
130        )
131        self._threadpool.check_for_errors_and_shutdown()
132        self._logger.info("Finished syncing")
133
134    def _submit_initial_partition_generators(
135        self, concurrent_stream_processor: ConcurrentReadProcessor
136    ) -> Iterable[AirbyteMessage]:
137        for _ in range(self._initial_number_partitions_to_generate):
138            status_message = concurrent_stream_processor.start_next_partition_generator()
139            if status_message:
140                yield status_message
141
142    def _consume_from_queue(
143        self,
144        queue: Queue[QueueItem],
145        concurrent_stream_processor: ConcurrentReadProcessor,
146    ) -> Iterable[AirbyteMessage]:
147        while airbyte_message_or_record_or_exception := queue.get():
148            yield from self._handle_item(
149                airbyte_message_or_record_or_exception,
150                concurrent_stream_processor,
151            )
152            # In the event that a partition raises an exception, anything remaining in
153            # the queue will be missed because is_done() can raise an exception and exit
154            # out of this loop before remaining items are consumed
155            if queue.empty() and concurrent_stream_processor.is_done():
156                # all partitions were generated and processed. we're done here
157                break
158
159    def _handle_item(
160        self,
161        queue_item: QueueItem,
162        concurrent_stream_processor: ConcurrentReadProcessor,
163    ) -> Iterable[AirbyteMessage]:
164        # handle queue item and call the appropriate handler depending on the type of the queue item
165        if isinstance(queue_item, StreamThreadException):
166            yield from concurrent_stream_processor.on_exception(queue_item)
167        elif isinstance(queue_item, PartitionGenerationCompletedSentinel):
168            yield from concurrent_stream_processor.on_partition_generation_completed(queue_item)
169        elif isinstance(queue_item, Partition):
170            concurrent_stream_processor.on_partition(queue_item)
171        elif isinstance(queue_item, PartitionCompleteSentinel):
172            yield from concurrent_stream_processor.on_partition_complete_sentinel(queue_item)
173        elif isinstance(queue_item, Record):
174            yield from concurrent_stream_processor.on_record(queue_item)
175        elif isinstance(queue_item, AirbyteMessage):
176            yield queue_item
177        else:
178            raise ValueError(f"Unknown queue item type: {type(queue_item)}")

A Source that reads data from multiple AbstractStreams concurrently. It does so by submitting partition generation, and partition read tasks to a thread pool. The tasks asynchronously add their output to a shared queue. The read is done when all partitions for all streams w ere generated and read.

 74    def __init__(
 75        self,
 76        threadpool: ThreadPoolManager,
 77        logger: logging.Logger,
 78        slice_logger: SliceLogger = DebugSliceLogger(),
 79        queue: Optional[Queue[QueueItem]] = None,
 80        message_repository: MessageRepository = InMemoryMessageRepository(),
 81        initial_number_partitions_to_generate: int = 1,
 82        timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
 83    ) -> None:
 84        """
 85        :param threadpool: The threadpool to submit tasks to
 86        :param logger: The logger to log to
 87        :param slice_logger: The slice logger used to create messages on new slices
 88        :param message_repository: The repository to emit messages to
 89        :param initial_number_partitions_to_generate: The initial number of concurrent partition generation tasks. Limiting this number ensures will limit the latency of the first records emitted. While the latency is not critical, emitting the records early allows the platform and the destination to process them as early as possible.
 90        :param timeout_seconds: The maximum number of seconds to wait for a record to be read from the queue. If no record is read within this time, the source will stop reading and return.
 91        """
 92        self._threadpool = threadpool
 93        self._logger = logger
 94        self._slice_logger = slice_logger
 95        self._message_repository = message_repository
 96        self._initial_number_partitions_to_generate = initial_number_partitions_to_generate
 97        self._timeout_seconds = timeout_seconds
 98
 99        # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less
100        # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating
101        # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more
102        # information and might even need to be configurable depending on the source
103        self._queue = queue or Queue(maxsize=10_000)
Parameters
  • threadpool: The threadpool to submit tasks to
  • logger: The logger to log to
  • slice_logger: The slice logger used to create messages on new slices
  • message_repository: The repository to emit messages to
  • initial_number_partitions_to_generate: The initial number of concurrent partition generation tasks. Limiting this number ensures will limit the latency of the first records emitted. While the latency is not critical, emitting the records early allows the platform and the destination to process them as early as possible.
  • timeout_seconds: The maximum number of seconds to wait for a record to be read from the queue. If no record is read within this time, the source will stop reading and return.
DEFAULT_TIMEOUT_SECONDS = 900
41    @staticmethod
42    def create(
43        num_workers: int,
44        initial_number_of_partitions_to_generate: int,
45        logger: logging.Logger,
46        slice_logger: SliceLogger,
47        message_repository: MessageRepository,
48        queue: Optional[Queue[QueueItem]] = None,
49        timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
50    ) -> "ConcurrentSource":
51        is_single_threaded = initial_number_of_partitions_to_generate == 1 and num_workers == 1
52        too_many_generator = (
53            not is_single_threaded and initial_number_of_partitions_to_generate >= num_workers
54        )
55        assert not too_many_generator, (
56            "It is required to have more workers than threads generating partitions"
57        )
58        threadpool = ThreadPoolManager(
59            concurrent.futures.ThreadPoolExecutor(
60                max_workers=num_workers, thread_name_prefix="workerpool"
61            ),
62            logger,
63        )
64        return ConcurrentSource(
65            threadpool=threadpool,
66            logger=logger,
67            slice_logger=slice_logger,
68            queue=queue,
69            message_repository=message_repository,
70            initial_number_partitions_to_generate=initial_number_of_partitions_to_generate,
71            timeout_seconds=timeout_seconds,
72        )
105    def read(
106        self,
107        streams: List[AbstractStream],
108    ) -> Iterator[AirbyteMessage]:
109        self._logger.info("Starting syncing")
110        concurrent_stream_processor = ConcurrentReadProcessor(
111            streams,
112            PartitionEnqueuer(self._queue, self._threadpool),
113            self._threadpool,
114            self._logger,
115            self._slice_logger,
116            self._message_repository,
117            PartitionReader(
118                self._queue,
119                PartitionLogger(self._slice_logger, self._logger, self._message_repository),
120            ),
121        )
122
123        # Enqueue initial partition generation tasks
124        yield from self._submit_initial_partition_generators(concurrent_stream_processor)
125
126        # Read from the queue until all partitions were generated and read
127        yield from self._consume_from_queue(
128            self._queue,
129            concurrent_stream_processor,
130        )
131        self._threadpool.check_for_errors_and_shutdown()
132        self._logger.info("Finished syncing")