airbyte_cdk.sources.concurrent_source.concurrent_read_processor

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4import logging
  5from typing import Dict, Iterable, List, Optional, Set
  6
  7from airbyte_cdk.exception_handler import generate_failed_streams_error_message
  8from airbyte_cdk.models import AirbyteMessage, AirbyteStreamStatus, FailureType, StreamDescriptor
  9from airbyte_cdk.models import Type as MessageType
 10from airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel import (
 11    PartitionGenerationCompletedSentinel,
 12)
 13from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException
 14from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager
 15from airbyte_cdk.sources.message import MessageRepository
 16from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
 17from airbyte_cdk.sources.streams.concurrent.partition_enqueuer import PartitionEnqueuer
 18from airbyte_cdk.sources.streams.concurrent.partition_reader import PartitionReader
 19from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
 20from airbyte_cdk.sources.streams.concurrent.partitions.types import PartitionCompleteSentinel
 21from airbyte_cdk.sources.types import Record
 22from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message
 23from airbyte_cdk.sources.utils.slice_logger import SliceLogger
 24from airbyte_cdk.utils import AirbyteTracedException
 25from airbyte_cdk.utils.stream_status_utils import (
 26    as_airbyte_message as stream_status_as_airbyte_message,
 27)
 28
 29
 30class ConcurrentReadProcessor:
 31    def __init__(
 32        self,
 33        stream_instances_to_read_from: List[AbstractStream],
 34        partition_enqueuer: PartitionEnqueuer,
 35        thread_pool_manager: ThreadPoolManager,
 36        logger: logging.Logger,
 37        slice_logger: SliceLogger,
 38        message_repository: MessageRepository,
 39        partition_reader: PartitionReader,
 40    ):
 41        """
 42        This class is responsible for handling items from a concurrent stream read process.
 43        :param stream_instances_to_read_from: List of streams to read from
 44        :param partition_enqueuer: PartitionEnqueuer instance
 45        :param thread_pool_manager: ThreadPoolManager instance
 46        :param logger: Logger instance
 47        :param slice_logger: SliceLogger instance
 48        :param message_repository: MessageRepository instance
 49        :param partition_reader: PartitionReader instance
 50        """
 51        self._stream_name_to_instance = {s.name: s for s in stream_instances_to_read_from}
 52        self._record_counter = {}
 53        self._streams_to_running_partitions: Dict[str, Set[Partition]] = {}
 54        for stream in stream_instances_to_read_from:
 55            self._streams_to_running_partitions[stream.name] = set()
 56            self._record_counter[stream.name] = 0
 57        self._thread_pool_manager = thread_pool_manager
 58        self._partition_enqueuer = partition_enqueuer
 59        self._stream_instances_to_start_partition_generation = stream_instances_to_read_from
 60        self._streams_currently_generating_partitions: List[str] = []
 61        self._logger = logger
 62        self._slice_logger = slice_logger
 63        self._message_repository = message_repository
 64        self._partition_reader = partition_reader
 65        self._streams_done: Set[str] = set()
 66        self._exceptions_per_stream_name: dict[str, List[Exception]] = {}
 67
 68    def on_partition_generation_completed(
 69        self, sentinel: PartitionGenerationCompletedSentinel
 70    ) -> Iterable[AirbyteMessage]:
 71        """
 72        This method is called when a partition generation is completed.
 73        1. Remove the stream from the list of streams currently generating partitions
 74        2. If the stream is done, mark it as such and return a stream status message
 75        3. If there are more streams to read from, start the next partition generator
 76        """
 77        stream_name = sentinel.stream.name
 78        self._streams_currently_generating_partitions.remove(sentinel.stream.name)
 79        # It is possible for the stream to already be done if no partitions were generated
 80        # If the partition generation process was completed and there are no partitions left to process, the stream is done
 81        if (
 82            self._is_stream_done(stream_name)
 83            or len(self._streams_to_running_partitions[stream_name]) == 0
 84        ):
 85            yield from self._on_stream_is_done(stream_name)
 86        if self._stream_instances_to_start_partition_generation:
 87            yield self.start_next_partition_generator()  # type:ignore # None may be yielded
 88
 89    def on_partition(self, partition: Partition) -> None:
 90        """
 91        This method is called when a partition is generated.
 92        1. Add the partition to the set of partitions for the stream
 93        2. Log the slice if necessary
 94        3. Submit the partition to the thread pool manager
 95        """
 96        stream_name = partition.stream_name()
 97        self._streams_to_running_partitions[stream_name].add(partition)
 98        if self._slice_logger.should_log_slice_message(self._logger):
 99            self._message_repository.emit_message(
100                self._slice_logger.create_slice_log_message(partition.to_slice())
101            )
102        self._thread_pool_manager.submit(self._partition_reader.process_partition, partition)
103
104    def on_partition_complete_sentinel(
105        self, sentinel: PartitionCompleteSentinel
106    ) -> Iterable[AirbyteMessage]:
107        """
108        This method is called when a partition is completed.
109        1. Close the partition
110        2. If the stream is done, mark it as such and return a stream status message
111        3. Emit messages that were added to the message repository
112        """
113        partition = sentinel.partition
114
115        try:
116            if sentinel.is_successful:
117                stream = self._stream_name_to_instance[partition.stream_name()]
118                stream.cursor.close_partition(partition)
119        except Exception as exception:
120            self._flag_exception(partition.stream_name(), exception)
121            yield AirbyteTracedException.from_exception(
122                exception, stream_descriptor=StreamDescriptor(name=partition.stream_name())
123            ).as_sanitized_airbyte_message()
124        finally:
125            partitions_running = self._streams_to_running_partitions[partition.stream_name()]
126            if partition in partitions_running:
127                partitions_running.remove(partition)
128                # If all partitions were generated and this was the last one, the stream is done
129                if (
130                    partition.stream_name() not in self._streams_currently_generating_partitions
131                    and len(partitions_running) == 0
132                ):
133                    yield from self._on_stream_is_done(partition.stream_name())
134            yield from self._message_repository.consume_queue()
135
136    def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
137        """
138        This method is called when a record is read from a partition.
139        1. Convert the record to an AirbyteMessage
140        2. If this is the first record for the stream, mark the stream as RUNNING
141        3. Increment the record counter for the stream
142        4. Ensures the cursor knows the record has been successfully emitted
143        5. Emit the message
144        6. Emit messages that were added to the message repository
145        """
146        # Do not pass a transformer or a schema
147        # AbstractStreams are expected to return data as they are expected.
148        # Any transformation on the data should be done before reaching this point
149        message = stream_data_to_airbyte_message(
150            stream_name=record.stream_name,
151            data_or_message=record.data,
152            is_file_transfer_message=record.is_file_transfer_message,
153        )
154        stream = self._stream_name_to_instance[record.stream_name]
155
156        if message.type == MessageType.RECORD:
157            if self._record_counter[stream.name] == 0:
158                self._logger.info(f"Marking stream {stream.name} as RUNNING")
159                yield stream_status_as_airbyte_message(
160                    stream.as_airbyte_stream(), AirbyteStreamStatus.RUNNING
161                )
162            self._record_counter[stream.name] += 1
163            stream.cursor.observe(record)
164        yield message
165        yield from self._message_repository.consume_queue()
166
167    def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMessage]:
168        """
169        This method is called when an exception is raised.
170        1. Stop all running streams
171        2. Raise the exception
172        """
173        self._flag_exception(exception.stream_name, exception.exception)
174        self._logger.exception(
175            f"Exception while syncing stream {exception.stream_name}", exc_info=exception.exception
176        )
177
178        stream_descriptor = StreamDescriptor(name=exception.stream_name)
179        if isinstance(exception.exception, AirbyteTracedException):
180            yield exception.exception.as_airbyte_message(stream_descriptor=stream_descriptor)
181        else:
182            yield AirbyteTracedException.from_exception(
183                exception, stream_descriptor=stream_descriptor
184            ).as_airbyte_message()
185
186    def _flag_exception(self, stream_name: str, exception: Exception) -> None:
187        self._exceptions_per_stream_name.setdefault(stream_name, []).append(exception)
188
189    def start_next_partition_generator(self) -> Optional[AirbyteMessage]:
190        """
191        Start the next partition generator.
192        1. Pop the next stream to read from
193        2. Submit the partition generator to the thread pool manager
194        3. Add the stream to the list of streams currently generating partitions
195        4. Return a stream status message
196        """
197        if self._stream_instances_to_start_partition_generation:
198            stream = self._stream_instances_to_start_partition_generation.pop(0)
199            self._thread_pool_manager.submit(self._partition_enqueuer.generate_partitions, stream)
200            self._streams_currently_generating_partitions.append(stream.name)
201            self._logger.info(f"Marking stream {stream.name} as STARTED")
202            self._logger.info(f"Syncing stream: {stream.name} ")
203            return stream_status_as_airbyte_message(
204                stream.as_airbyte_stream(),
205                AirbyteStreamStatus.STARTED,
206            )
207        else:
208            return None
209
210    def is_done(self) -> bool:
211        """
212        This method is called to check if the sync is done.
213        The sync is done when:
214        1. There are no more streams generating partitions
215        2. There are no more streams to read from
216        3. All partitions for all streams are closed
217        """
218        is_done = all(
219            [
220                self._is_stream_done(stream_name)
221                for stream_name in self._stream_name_to_instance.keys()
222            ]
223        )
224        if is_done and self._exceptions_per_stream_name:
225            error_message = generate_failed_streams_error_message(self._exceptions_per_stream_name)
226            self._logger.info(error_message)
227            # We still raise at least one exception when a stream raises an exception because the platform currently relies
228            # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error
229            # type because this combined error isn't actionable, but rather the previously emitted individual errors.
230            raise AirbyteTracedException(
231                message=error_message,
232                internal_message="Concurrent read failure",
233                failure_type=FailureType.config_error,
234            )
235        return is_done
236
237    def _is_stream_done(self, stream_name: str) -> bool:
238        return stream_name in self._streams_done
239
240    def _on_stream_is_done(self, stream_name: str) -> Iterable[AirbyteMessage]:
241        self._logger.info(
242            f"Read {self._record_counter[stream_name]} records from {stream_name} stream"
243        )
244        self._logger.info(f"Marking stream {stream_name} as STOPPED")
245        stream = self._stream_name_to_instance[stream_name]
246        stream.cursor.ensure_at_least_one_state_emitted()
247        yield from self._message_repository.consume_queue()
248        self._logger.info(f"Finished syncing {stream.name}")
249        self._streams_done.add(stream_name)
250        stream_status = (
251            AirbyteStreamStatus.INCOMPLETE
252            if self._exceptions_per_stream_name.get(stream_name, [])
253            else AirbyteStreamStatus.COMPLETE
254        )
255        yield stream_status_as_airbyte_message(stream.as_airbyte_stream(), stream_status)
class ConcurrentReadProcessor:
 31class ConcurrentReadProcessor:
 32    def __init__(
 33        self,
 34        stream_instances_to_read_from: List[AbstractStream],
 35        partition_enqueuer: PartitionEnqueuer,
 36        thread_pool_manager: ThreadPoolManager,
 37        logger: logging.Logger,
 38        slice_logger: SliceLogger,
 39        message_repository: MessageRepository,
 40        partition_reader: PartitionReader,
 41    ):
 42        """
 43        This class is responsible for handling items from a concurrent stream read process.
 44        :param stream_instances_to_read_from: List of streams to read from
 45        :param partition_enqueuer: PartitionEnqueuer instance
 46        :param thread_pool_manager: ThreadPoolManager instance
 47        :param logger: Logger instance
 48        :param slice_logger: SliceLogger instance
 49        :param message_repository: MessageRepository instance
 50        :param partition_reader: PartitionReader instance
 51        """
 52        self._stream_name_to_instance = {s.name: s for s in stream_instances_to_read_from}
 53        self._record_counter = {}
 54        self._streams_to_running_partitions: Dict[str, Set[Partition]] = {}
 55        for stream in stream_instances_to_read_from:
 56            self._streams_to_running_partitions[stream.name] = set()
 57            self._record_counter[stream.name] = 0
 58        self._thread_pool_manager = thread_pool_manager
 59        self._partition_enqueuer = partition_enqueuer
 60        self._stream_instances_to_start_partition_generation = stream_instances_to_read_from
 61        self._streams_currently_generating_partitions: List[str] = []
 62        self._logger = logger
 63        self._slice_logger = slice_logger
 64        self._message_repository = message_repository
 65        self._partition_reader = partition_reader
 66        self._streams_done: Set[str] = set()
 67        self._exceptions_per_stream_name: dict[str, List[Exception]] = {}
 68
 69    def on_partition_generation_completed(
 70        self, sentinel: PartitionGenerationCompletedSentinel
 71    ) -> Iterable[AirbyteMessage]:
 72        """
 73        This method is called when a partition generation is completed.
 74        1. Remove the stream from the list of streams currently generating partitions
 75        2. If the stream is done, mark it as such and return a stream status message
 76        3. If there are more streams to read from, start the next partition generator
 77        """
 78        stream_name = sentinel.stream.name
 79        self._streams_currently_generating_partitions.remove(sentinel.stream.name)
 80        # It is possible for the stream to already be done if no partitions were generated
 81        # If the partition generation process was completed and there are no partitions left to process, the stream is done
 82        if (
 83            self._is_stream_done(stream_name)
 84            or len(self._streams_to_running_partitions[stream_name]) == 0
 85        ):
 86            yield from self._on_stream_is_done(stream_name)
 87        if self._stream_instances_to_start_partition_generation:
 88            yield self.start_next_partition_generator()  # type:ignore # None may be yielded
 89
 90    def on_partition(self, partition: Partition) -> None:
 91        """
 92        This method is called when a partition is generated.
 93        1. Add the partition to the set of partitions for the stream
 94        2. Log the slice if necessary
 95        3. Submit the partition to the thread pool manager
 96        """
 97        stream_name = partition.stream_name()
 98        self._streams_to_running_partitions[stream_name].add(partition)
 99        if self._slice_logger.should_log_slice_message(self._logger):
100            self._message_repository.emit_message(
101                self._slice_logger.create_slice_log_message(partition.to_slice())
102            )
103        self._thread_pool_manager.submit(self._partition_reader.process_partition, partition)
104
105    def on_partition_complete_sentinel(
106        self, sentinel: PartitionCompleteSentinel
107    ) -> Iterable[AirbyteMessage]:
108        """
109        This method is called when a partition is completed.
110        1. Close the partition
111        2. If the stream is done, mark it as such and return a stream status message
112        3. Emit messages that were added to the message repository
113        """
114        partition = sentinel.partition
115
116        try:
117            if sentinel.is_successful:
118                stream = self._stream_name_to_instance[partition.stream_name()]
119                stream.cursor.close_partition(partition)
120        except Exception as exception:
121            self._flag_exception(partition.stream_name(), exception)
122            yield AirbyteTracedException.from_exception(
123                exception, stream_descriptor=StreamDescriptor(name=partition.stream_name())
124            ).as_sanitized_airbyte_message()
125        finally:
126            partitions_running = self._streams_to_running_partitions[partition.stream_name()]
127            if partition in partitions_running:
128                partitions_running.remove(partition)
129                # If all partitions were generated and this was the last one, the stream is done
130                if (
131                    partition.stream_name() not in self._streams_currently_generating_partitions
132                    and len(partitions_running) == 0
133                ):
134                    yield from self._on_stream_is_done(partition.stream_name())
135            yield from self._message_repository.consume_queue()
136
137    def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
138        """
139        This method is called when a record is read from a partition.
140        1. Convert the record to an AirbyteMessage
141        2. If this is the first record for the stream, mark the stream as RUNNING
142        3. Increment the record counter for the stream
143        4. Ensures the cursor knows the record has been successfully emitted
144        5. Emit the message
145        6. Emit messages that were added to the message repository
146        """
147        # Do not pass a transformer or a schema
148        # AbstractStreams are expected to return data as they are expected.
149        # Any transformation on the data should be done before reaching this point
150        message = stream_data_to_airbyte_message(
151            stream_name=record.stream_name,
152            data_or_message=record.data,
153            is_file_transfer_message=record.is_file_transfer_message,
154        )
155        stream = self._stream_name_to_instance[record.stream_name]
156
157        if message.type == MessageType.RECORD:
158            if self._record_counter[stream.name] == 0:
159                self._logger.info(f"Marking stream {stream.name} as RUNNING")
160                yield stream_status_as_airbyte_message(
161                    stream.as_airbyte_stream(), AirbyteStreamStatus.RUNNING
162                )
163            self._record_counter[stream.name] += 1
164            stream.cursor.observe(record)
165        yield message
166        yield from self._message_repository.consume_queue()
167
168    def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMessage]:
169        """
170        This method is called when an exception is raised.
171        1. Stop all running streams
172        2. Raise the exception
173        """
174        self._flag_exception(exception.stream_name, exception.exception)
175        self._logger.exception(
176            f"Exception while syncing stream {exception.stream_name}", exc_info=exception.exception
177        )
178
179        stream_descriptor = StreamDescriptor(name=exception.stream_name)
180        if isinstance(exception.exception, AirbyteTracedException):
181            yield exception.exception.as_airbyte_message(stream_descriptor=stream_descriptor)
182        else:
183            yield AirbyteTracedException.from_exception(
184                exception, stream_descriptor=stream_descriptor
185            ).as_airbyte_message()
186
187    def _flag_exception(self, stream_name: str, exception: Exception) -> None:
188        self._exceptions_per_stream_name.setdefault(stream_name, []).append(exception)
189
190    def start_next_partition_generator(self) -> Optional[AirbyteMessage]:
191        """
192        Start the next partition generator.
193        1. Pop the next stream to read from
194        2. Submit the partition generator to the thread pool manager
195        3. Add the stream to the list of streams currently generating partitions
196        4. Return a stream status message
197        """
198        if self._stream_instances_to_start_partition_generation:
199            stream = self._stream_instances_to_start_partition_generation.pop(0)
200            self._thread_pool_manager.submit(self._partition_enqueuer.generate_partitions, stream)
201            self._streams_currently_generating_partitions.append(stream.name)
202            self._logger.info(f"Marking stream {stream.name} as STARTED")
203            self._logger.info(f"Syncing stream: {stream.name} ")
204            return stream_status_as_airbyte_message(
205                stream.as_airbyte_stream(),
206                AirbyteStreamStatus.STARTED,
207            )
208        else:
209            return None
210
211    def is_done(self) -> bool:
212        """
213        This method is called to check if the sync is done.
214        The sync is done when:
215        1. There are no more streams generating partitions
216        2. There are no more streams to read from
217        3. All partitions for all streams are closed
218        """
219        is_done = all(
220            [
221                self._is_stream_done(stream_name)
222                for stream_name in self._stream_name_to_instance.keys()
223            ]
224        )
225        if is_done and self._exceptions_per_stream_name:
226            error_message = generate_failed_streams_error_message(self._exceptions_per_stream_name)
227            self._logger.info(error_message)
228            # We still raise at least one exception when a stream raises an exception because the platform currently relies
229            # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error
230            # type because this combined error isn't actionable, but rather the previously emitted individual errors.
231            raise AirbyteTracedException(
232                message=error_message,
233                internal_message="Concurrent read failure",
234                failure_type=FailureType.config_error,
235            )
236        return is_done
237
238    def _is_stream_done(self, stream_name: str) -> bool:
239        return stream_name in self._streams_done
240
241    def _on_stream_is_done(self, stream_name: str) -> Iterable[AirbyteMessage]:
242        self._logger.info(
243            f"Read {self._record_counter[stream_name]} records from {stream_name} stream"
244        )
245        self._logger.info(f"Marking stream {stream_name} as STOPPED")
246        stream = self._stream_name_to_instance[stream_name]
247        stream.cursor.ensure_at_least_one_state_emitted()
248        yield from self._message_repository.consume_queue()
249        self._logger.info(f"Finished syncing {stream.name}")
250        self._streams_done.add(stream_name)
251        stream_status = (
252            AirbyteStreamStatus.INCOMPLETE
253            if self._exceptions_per_stream_name.get(stream_name, [])
254            else AirbyteStreamStatus.COMPLETE
255        )
256        yield stream_status_as_airbyte_message(stream.as_airbyte_stream(), stream_status)
32    def __init__(
33        self,
34        stream_instances_to_read_from: List[AbstractStream],
35        partition_enqueuer: PartitionEnqueuer,
36        thread_pool_manager: ThreadPoolManager,
37        logger: logging.Logger,
38        slice_logger: SliceLogger,
39        message_repository: MessageRepository,
40        partition_reader: PartitionReader,
41    ):
42        """
43        This class is responsible for handling items from a concurrent stream read process.
44        :param stream_instances_to_read_from: List of streams to read from
45        :param partition_enqueuer: PartitionEnqueuer instance
46        :param thread_pool_manager: ThreadPoolManager instance
47        :param logger: Logger instance
48        :param slice_logger: SliceLogger instance
49        :param message_repository: MessageRepository instance
50        :param partition_reader: PartitionReader instance
51        """
52        self._stream_name_to_instance = {s.name: s for s in stream_instances_to_read_from}
53        self._record_counter = {}
54        self._streams_to_running_partitions: Dict[str, Set[Partition]] = {}
55        for stream in stream_instances_to_read_from:
56            self._streams_to_running_partitions[stream.name] = set()
57            self._record_counter[stream.name] = 0
58        self._thread_pool_manager = thread_pool_manager
59        self._partition_enqueuer = partition_enqueuer
60        self._stream_instances_to_start_partition_generation = stream_instances_to_read_from
61        self._streams_currently_generating_partitions: List[str] = []
62        self._logger = logger
63        self._slice_logger = slice_logger
64        self._message_repository = message_repository
65        self._partition_reader = partition_reader
66        self._streams_done: Set[str] = set()
67        self._exceptions_per_stream_name: dict[str, List[Exception]] = {}

This class is responsible for handling items from a concurrent stream read process.

Parameters
  • stream_instances_to_read_from: List of streams to read from
  • partition_enqueuer: PartitionEnqueuer instance
  • thread_pool_manager: ThreadPoolManager instance
  • logger: Logger instance
  • slice_logger: SliceLogger instance
  • message_repository: MessageRepository instance
  • partition_reader: PartitionReader instance
69    def on_partition_generation_completed(
70        self, sentinel: PartitionGenerationCompletedSentinel
71    ) -> Iterable[AirbyteMessage]:
72        """
73        This method is called when a partition generation is completed.
74        1. Remove the stream from the list of streams currently generating partitions
75        2. If the stream is done, mark it as such and return a stream status message
76        3. If there are more streams to read from, start the next partition generator
77        """
78        stream_name = sentinel.stream.name
79        self._streams_currently_generating_partitions.remove(sentinel.stream.name)
80        # It is possible for the stream to already be done if no partitions were generated
81        # If the partition generation process was completed and there are no partitions left to process, the stream is done
82        if (
83            self._is_stream_done(stream_name)
84            or len(self._streams_to_running_partitions[stream_name]) == 0
85        ):
86            yield from self._on_stream_is_done(stream_name)
87        if self._stream_instances_to_start_partition_generation:
88            yield self.start_next_partition_generator()  # type:ignore # None may be yielded

This method is called when a partition generation is completed.

  1. Remove the stream from the list of streams currently generating partitions
  2. If the stream is done, mark it as such and return a stream status message
  3. If there are more streams to read from, start the next partition generator
def on_partition( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition) -> None:
 90    def on_partition(self, partition: Partition) -> None:
 91        """
 92        This method is called when a partition is generated.
 93        1. Add the partition to the set of partitions for the stream
 94        2. Log the slice if necessary
 95        3. Submit the partition to the thread pool manager
 96        """
 97        stream_name = partition.stream_name()
 98        self._streams_to_running_partitions[stream_name].add(partition)
 99        if self._slice_logger.should_log_slice_message(self._logger):
100            self._message_repository.emit_message(
101                self._slice_logger.create_slice_log_message(partition.to_slice())
102            )
103        self._thread_pool_manager.submit(self._partition_reader.process_partition, partition)

This method is called when a partition is generated.

  1. Add the partition to the set of partitions for the stream
  2. Log the slice if necessary
  3. Submit the partition to the thread pool manager
def on_partition_complete_sentinel( self, sentinel: airbyte_cdk.sources.streams.concurrent.partitions.types.PartitionCompleteSentinel) -> Iterable[airbyte_cdk.AirbyteMessage]:
105    def on_partition_complete_sentinel(
106        self, sentinel: PartitionCompleteSentinel
107    ) -> Iterable[AirbyteMessage]:
108        """
109        This method is called when a partition is completed.
110        1. Close the partition
111        2. If the stream is done, mark it as such and return a stream status message
112        3. Emit messages that were added to the message repository
113        """
114        partition = sentinel.partition
115
116        try:
117            if sentinel.is_successful:
118                stream = self._stream_name_to_instance[partition.stream_name()]
119                stream.cursor.close_partition(partition)
120        except Exception as exception:
121            self._flag_exception(partition.stream_name(), exception)
122            yield AirbyteTracedException.from_exception(
123                exception, stream_descriptor=StreamDescriptor(name=partition.stream_name())
124            ).as_sanitized_airbyte_message()
125        finally:
126            partitions_running = self._streams_to_running_partitions[partition.stream_name()]
127            if partition in partitions_running:
128                partitions_running.remove(partition)
129                # If all partitions were generated and this was the last one, the stream is done
130                if (
131                    partition.stream_name() not in self._streams_currently_generating_partitions
132                    and len(partitions_running) == 0
133                ):
134                    yield from self._on_stream_is_done(partition.stream_name())
135            yield from self._message_repository.consume_queue()

This method is called when a partition is completed.

  1. Close the partition
  2. If the stream is done, mark it as such and return a stream status message
  3. Emit messages that were added to the message repository
def on_record( self, record: airbyte_cdk.Record) -> Iterable[airbyte_cdk.AirbyteMessage]:
137    def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
138        """
139        This method is called when a record is read from a partition.
140        1. Convert the record to an AirbyteMessage
141        2. If this is the first record for the stream, mark the stream as RUNNING
142        3. Increment the record counter for the stream
143        4. Ensures the cursor knows the record has been successfully emitted
144        5. Emit the message
145        6. Emit messages that were added to the message repository
146        """
147        # Do not pass a transformer or a schema
148        # AbstractStreams are expected to return data as they are expected.
149        # Any transformation on the data should be done before reaching this point
150        message = stream_data_to_airbyte_message(
151            stream_name=record.stream_name,
152            data_or_message=record.data,
153            is_file_transfer_message=record.is_file_transfer_message,
154        )
155        stream = self._stream_name_to_instance[record.stream_name]
156
157        if message.type == MessageType.RECORD:
158            if self._record_counter[stream.name] == 0:
159                self._logger.info(f"Marking stream {stream.name} as RUNNING")
160                yield stream_status_as_airbyte_message(
161                    stream.as_airbyte_stream(), AirbyteStreamStatus.RUNNING
162                )
163            self._record_counter[stream.name] += 1
164            stream.cursor.observe(record)
165        yield message
166        yield from self._message_repository.consume_queue()

This method is called when a record is read from a partition.

  1. Convert the record to an AirbyteMessage
  2. If this is the first record for the stream, mark the stream as RUNNING
  3. Increment the record counter for the stream
  4. Ensures the cursor knows the record has been successfully emitted
  5. Emit the message
  6. Emit messages that were added to the message repository
168    def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMessage]:
169        """
170        This method is called when an exception is raised.
171        1. Stop all running streams
172        2. Raise the exception
173        """
174        self._flag_exception(exception.stream_name, exception.exception)
175        self._logger.exception(
176            f"Exception while syncing stream {exception.stream_name}", exc_info=exception.exception
177        )
178
179        stream_descriptor = StreamDescriptor(name=exception.stream_name)
180        if isinstance(exception.exception, AirbyteTracedException):
181            yield exception.exception.as_airbyte_message(stream_descriptor=stream_descriptor)
182        else:
183            yield AirbyteTracedException.from_exception(
184                exception, stream_descriptor=stream_descriptor
185            ).as_airbyte_message()

This method is called when an exception is raised.

  1. Stop all running streams
  2. Raise the exception
def start_next_partition_generator(self) -> Optional[airbyte_cdk.AirbyteMessage]:
190    def start_next_partition_generator(self) -> Optional[AirbyteMessage]:
191        """
192        Start the next partition generator.
193        1. Pop the next stream to read from
194        2. Submit the partition generator to the thread pool manager
195        3. Add the stream to the list of streams currently generating partitions
196        4. Return a stream status message
197        """
198        if self._stream_instances_to_start_partition_generation:
199            stream = self._stream_instances_to_start_partition_generation.pop(0)
200            self._thread_pool_manager.submit(self._partition_enqueuer.generate_partitions, stream)
201            self._streams_currently_generating_partitions.append(stream.name)
202            self._logger.info(f"Marking stream {stream.name} as STARTED")
203            self._logger.info(f"Syncing stream: {stream.name} ")
204            return stream_status_as_airbyte_message(
205                stream.as_airbyte_stream(),
206                AirbyteStreamStatus.STARTED,
207            )
208        else:
209            return None

Start the next partition generator.

  1. Pop the next stream to read from
  2. Submit the partition generator to the thread pool manager
  3. Add the stream to the list of streams currently generating partitions
  4. Return a stream status message
def is_done(self) -> bool:
211    def is_done(self) -> bool:
212        """
213        This method is called to check if the sync is done.
214        The sync is done when:
215        1. There are no more streams generating partitions
216        2. There are no more streams to read from
217        3. All partitions for all streams are closed
218        """
219        is_done = all(
220            [
221                self._is_stream_done(stream_name)
222                for stream_name in self._stream_name_to_instance.keys()
223            ]
224        )
225        if is_done and self._exceptions_per_stream_name:
226            error_message = generate_failed_streams_error_message(self._exceptions_per_stream_name)
227            self._logger.info(error_message)
228            # We still raise at least one exception when a stream raises an exception because the platform currently relies
229            # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error
230            # type because this combined error isn't actionable, but rather the previously emitted individual errors.
231            raise AirbyteTracedException(
232                message=error_message,
233                internal_message="Concurrent read failure",
234                failure_type=FailureType.config_error,
235            )
236        return is_done

This method is called to check if the sync is done. The sync is done when:

  1. There are no more streams generating partitions
  2. There are no more streams to read from
  3. All partitions for all streams are closed