airbyte_cdk.sources.concurrent_source.concurrent_read_processor

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4import logging
  5import os
  6from typing import Dict, Iterable, List, Optional, Set
  7
  8from airbyte_cdk.exception_handler import generate_failed_streams_error_message
  9from airbyte_cdk.models import AirbyteMessage, AirbyteStreamStatus, FailureType, StreamDescriptor
 10from airbyte_cdk.models import Type as MessageType
 11from airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel import (
 12    PartitionGenerationCompletedSentinel,
 13)
 14from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException
 15from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager
 16from airbyte_cdk.sources.message import MessageRepository
 17from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
 18from airbyte_cdk.sources.streams.concurrent.partition_enqueuer import PartitionEnqueuer
 19from airbyte_cdk.sources.streams.concurrent.partition_reader import PartitionReader
 20from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
 21from airbyte_cdk.sources.streams.concurrent.partitions.types import PartitionCompleteSentinel
 22from airbyte_cdk.sources.types import Record
 23from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message
 24from airbyte_cdk.sources.utils.slice_logger import SliceLogger
 25from airbyte_cdk.utils import AirbyteTracedException
 26from airbyte_cdk.utils.stream_status_utils import (
 27    as_airbyte_message as stream_status_as_airbyte_message,
 28)
 29
 30
 31class ConcurrentReadProcessor:
 32    def __init__(
 33        self,
 34        stream_instances_to_read_from: List[AbstractStream],
 35        partition_enqueuer: PartitionEnqueuer,
 36        thread_pool_manager: ThreadPoolManager,
 37        logger: logging.Logger,
 38        slice_logger: SliceLogger,
 39        message_repository: MessageRepository,
 40        partition_reader: PartitionReader,
 41    ):
 42        """
 43        This class is responsible for handling items from a concurrent stream read process.
 44        :param stream_instances_to_read_from: List of streams to read from
 45        :param partition_enqueuer: PartitionEnqueuer instance
 46        :param thread_pool_manager: ThreadPoolManager instance
 47        :param logger: Logger instance
 48        :param slice_logger: SliceLogger instance
 49        :param message_repository: MessageRepository instance
 50        :param partition_reader: PartitionReader instance
 51        """
 52        self._stream_name_to_instance = {s.name: s for s in stream_instances_to_read_from}
 53        self._record_counter = {}
 54        self._streams_to_running_partitions: Dict[str, Set[Partition]] = {}
 55        for stream in stream_instances_to_read_from:
 56            self._streams_to_running_partitions[stream.name] = set()
 57            self._record_counter[stream.name] = 0
 58        self._thread_pool_manager = thread_pool_manager
 59        self._partition_enqueuer = partition_enqueuer
 60        self._stream_instances_to_start_partition_generation = stream_instances_to_read_from
 61        self._streams_currently_generating_partitions: List[str] = []
 62        self._logger = logger
 63        self._slice_logger = slice_logger
 64        self._message_repository = message_repository
 65        self._partition_reader = partition_reader
 66        self._streams_done: Set[str] = set()
 67        self._exceptions_per_stream_name: dict[str, List[Exception]] = {}
 68
 69    def on_partition_generation_completed(
 70        self, sentinel: PartitionGenerationCompletedSentinel
 71    ) -> Iterable[AirbyteMessage]:
 72        """
 73        This method is called when a partition generation is completed.
 74        1. Remove the stream from the list of streams currently generating partitions
 75        2. If the stream is done, mark it as such and return a stream status message
 76        3. If there are more streams to read from, start the next partition generator
 77        """
 78        stream_name = sentinel.stream.name
 79        self._streams_currently_generating_partitions.remove(sentinel.stream.name)
 80        # It is possible for the stream to already be done if no partitions were generated
 81        # If the partition generation process was completed and there are no partitions left to process, the stream is done
 82        if (
 83            self._is_stream_done(stream_name)
 84            or len(self._streams_to_running_partitions[stream_name]) == 0
 85        ):
 86            yield from self._on_stream_is_done(stream_name)
 87        if self._stream_instances_to_start_partition_generation:
 88            yield self.start_next_partition_generator()  # type:ignore # None may be yielded
 89
 90    def on_partition(self, partition: Partition) -> None:
 91        """
 92        This method is called when a partition is generated.
 93        1. Add the partition to the set of partitions for the stream
 94        2. Log the slice if necessary
 95        3. Submit the partition to the thread pool manager
 96        """
 97        stream_name = partition.stream_name()
 98        self._streams_to_running_partitions[stream_name].add(partition)
 99        cursor = self._stream_name_to_instance[stream_name].cursor
100        if self._slice_logger.should_log_slice_message(self._logger):
101            self._message_repository.emit_message(
102                self._slice_logger.create_slice_log_message(partition.to_slice())
103            )
104        self._thread_pool_manager.submit(
105            self._partition_reader.process_partition, partition, cursor
106        )
107
108    def on_partition_complete_sentinel(
109        self, sentinel: PartitionCompleteSentinel
110    ) -> Iterable[AirbyteMessage]:
111        """
112        This method is called when a partition is completed.
113        1. Close the partition
114        2. If the stream is done, mark it as such and return a stream status message
115        3. Emit messages that were added to the message repository
116        """
117        partition = sentinel.partition
118
119        partitions_running = self._streams_to_running_partitions[partition.stream_name()]
120        if partition in partitions_running:
121            partitions_running.remove(partition)
122            # If all partitions were generated and this was the last one, the stream is done
123            if (
124                partition.stream_name() not in self._streams_currently_generating_partitions
125                and len(partitions_running) == 0
126            ):
127                yield from self._on_stream_is_done(partition.stream_name())
128        yield from self._message_repository.consume_queue()
129
130    def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
131        """
132        This method is called when a record is read from a partition.
133        1. Convert the record to an AirbyteMessage
134        2. If this is the first record for the stream, mark the stream as RUNNING
135        3. Increment the record counter for the stream
136        4. Ensures the cursor knows the record has been successfully emitted
137        5. Emit the message
138        6. Emit messages that were added to the message repository
139        """
140        # Do not pass a transformer or a schema
141        # AbstractStreams are expected to return data as they are expected.
142        # Any transformation on the data should be done before reaching this point
143        message = stream_data_to_airbyte_message(
144            stream_name=record.stream_name,
145            data_or_message=record.data,
146            file_reference=record.file_reference,
147        )
148        stream = self._stream_name_to_instance[record.stream_name]
149
150        if message.type == MessageType.RECORD:
151            if self._record_counter[stream.name] == 0:
152                self._logger.info(f"Marking stream {stream.name} as RUNNING")
153                yield stream_status_as_airbyte_message(
154                    stream.as_airbyte_stream(), AirbyteStreamStatus.RUNNING
155                )
156            self._record_counter[stream.name] += 1
157        yield message
158        yield from self._message_repository.consume_queue()
159
160    def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMessage]:
161        """
162        This method is called when an exception is raised.
163        1. Stop all running streams
164        2. Raise the exception
165        """
166        self._flag_exception(exception.stream_name, exception.exception)
167        self._logger.exception(
168            f"Exception while syncing stream {exception.stream_name}", exc_info=exception.exception
169        )
170
171        stream_descriptor = StreamDescriptor(name=exception.stream_name)
172        if isinstance(exception.exception, AirbyteTracedException):
173            yield exception.exception.as_airbyte_message(stream_descriptor=stream_descriptor)
174        else:
175            yield AirbyteTracedException.from_exception(
176                exception.exception,
177                stream_descriptor=stream_descriptor,
178                message=f"An unexpected error occurred in stream {exception.stream_name}: {type(exception.exception).__name__}",
179            ).as_airbyte_message()
180
181    def _flag_exception(self, stream_name: str, exception: Exception) -> None:
182        self._exceptions_per_stream_name.setdefault(stream_name, []).append(exception)
183
184    def start_next_partition_generator(self) -> Optional[AirbyteMessage]:
185        """
186        Start the next partition generator.
187        1. Pop the next stream to read from
188        2. Submit the partition generator to the thread pool manager
189        3. Add the stream to the list of streams currently generating partitions
190        4. Return a stream status message
191        """
192        if self._stream_instances_to_start_partition_generation:
193            stream = self._stream_instances_to_start_partition_generation.pop(0)
194            self._thread_pool_manager.submit(self._partition_enqueuer.generate_partitions, stream)
195            self._streams_currently_generating_partitions.append(stream.name)
196            self._logger.info(f"Marking stream {stream.name} as STARTED")
197            self._logger.info(f"Syncing stream: {stream.name} ")
198            return stream_status_as_airbyte_message(
199                stream.as_airbyte_stream(),
200                AirbyteStreamStatus.STARTED,
201            )
202        else:
203            return None
204
205    def is_done(self) -> bool:
206        """
207        This method is called to check if the sync is done.
208        The sync is done when:
209        1. There are no more streams generating partitions
210        2. There are no more streams to read from
211        3. All partitions for all streams are closed
212        """
213        is_done = all(
214            [
215                self._is_stream_done(stream_name)
216                for stream_name in self._stream_name_to_instance.keys()
217            ]
218        )
219        if is_done and self._exceptions_per_stream_name:
220            error_message = generate_failed_streams_error_message(self._exceptions_per_stream_name)
221            self._logger.info(error_message)
222            # We still raise at least one exception when a stream raises an exception because the platform currently relies
223            # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error
224            # type because this combined error isn't actionable, but rather the previously emitted individual errors.
225            raise AirbyteTracedException(
226                message=error_message,
227                internal_message="Concurrent read failure",
228                failure_type=FailureType.config_error,
229            )
230        return is_done
231
232    def _is_stream_done(self, stream_name: str) -> bool:
233        return stream_name in self._streams_done
234
235    def _on_stream_is_done(self, stream_name: str) -> Iterable[AirbyteMessage]:
236        self._logger.info(
237            f"Read {self._record_counter[stream_name]} records from {stream_name} stream"
238        )
239        self._logger.info(f"Marking stream {stream_name} as STOPPED")
240        stream = self._stream_name_to_instance[stream_name]
241        stream.cursor.ensure_at_least_one_state_emitted()
242        yield from self._message_repository.consume_queue()
243        self._logger.info(f"Finished syncing {stream.name}")
244        self._streams_done.add(stream_name)
245        stream_status = (
246            AirbyteStreamStatus.INCOMPLETE
247            if self._exceptions_per_stream_name.get(stream_name, [])
248            else AirbyteStreamStatus.COMPLETE
249        )
250        yield stream_status_as_airbyte_message(stream.as_airbyte_stream(), stream_status)
class ConcurrentReadProcessor:
 32class ConcurrentReadProcessor:
 33    def __init__(
 34        self,
 35        stream_instances_to_read_from: List[AbstractStream],
 36        partition_enqueuer: PartitionEnqueuer,
 37        thread_pool_manager: ThreadPoolManager,
 38        logger: logging.Logger,
 39        slice_logger: SliceLogger,
 40        message_repository: MessageRepository,
 41        partition_reader: PartitionReader,
 42    ):
 43        """
 44        This class is responsible for handling items from a concurrent stream read process.
 45        :param stream_instances_to_read_from: List of streams to read from
 46        :param partition_enqueuer: PartitionEnqueuer instance
 47        :param thread_pool_manager: ThreadPoolManager instance
 48        :param logger: Logger instance
 49        :param slice_logger: SliceLogger instance
 50        :param message_repository: MessageRepository instance
 51        :param partition_reader: PartitionReader instance
 52        """
 53        self._stream_name_to_instance = {s.name: s for s in stream_instances_to_read_from}
 54        self._record_counter = {}
 55        self._streams_to_running_partitions: Dict[str, Set[Partition]] = {}
 56        for stream in stream_instances_to_read_from:
 57            self._streams_to_running_partitions[stream.name] = set()
 58            self._record_counter[stream.name] = 0
 59        self._thread_pool_manager = thread_pool_manager
 60        self._partition_enqueuer = partition_enqueuer
 61        self._stream_instances_to_start_partition_generation = stream_instances_to_read_from
 62        self._streams_currently_generating_partitions: List[str] = []
 63        self._logger = logger
 64        self._slice_logger = slice_logger
 65        self._message_repository = message_repository
 66        self._partition_reader = partition_reader
 67        self._streams_done: Set[str] = set()
 68        self._exceptions_per_stream_name: dict[str, List[Exception]] = {}
 69
 70    def on_partition_generation_completed(
 71        self, sentinel: PartitionGenerationCompletedSentinel
 72    ) -> Iterable[AirbyteMessage]:
 73        """
 74        This method is called when a partition generation is completed.
 75        1. Remove the stream from the list of streams currently generating partitions
 76        2. If the stream is done, mark it as such and return a stream status message
 77        3. If there are more streams to read from, start the next partition generator
 78        """
 79        stream_name = sentinel.stream.name
 80        self._streams_currently_generating_partitions.remove(sentinel.stream.name)
 81        # It is possible for the stream to already be done if no partitions were generated
 82        # If the partition generation process was completed and there are no partitions left to process, the stream is done
 83        if (
 84            self._is_stream_done(stream_name)
 85            or len(self._streams_to_running_partitions[stream_name]) == 0
 86        ):
 87            yield from self._on_stream_is_done(stream_name)
 88        if self._stream_instances_to_start_partition_generation:
 89            yield self.start_next_partition_generator()  # type:ignore # None may be yielded
 90
 91    def on_partition(self, partition: Partition) -> None:
 92        """
 93        This method is called when a partition is generated.
 94        1. Add the partition to the set of partitions for the stream
 95        2. Log the slice if necessary
 96        3. Submit the partition to the thread pool manager
 97        """
 98        stream_name = partition.stream_name()
 99        self._streams_to_running_partitions[stream_name].add(partition)
100        cursor = self._stream_name_to_instance[stream_name].cursor
101        if self._slice_logger.should_log_slice_message(self._logger):
102            self._message_repository.emit_message(
103                self._slice_logger.create_slice_log_message(partition.to_slice())
104            )
105        self._thread_pool_manager.submit(
106            self._partition_reader.process_partition, partition, cursor
107        )
108
109    def on_partition_complete_sentinel(
110        self, sentinel: PartitionCompleteSentinel
111    ) -> Iterable[AirbyteMessage]:
112        """
113        This method is called when a partition is completed.
114        1. Close the partition
115        2. If the stream is done, mark it as such and return a stream status message
116        3. Emit messages that were added to the message repository
117        """
118        partition = sentinel.partition
119
120        partitions_running = self._streams_to_running_partitions[partition.stream_name()]
121        if partition in partitions_running:
122            partitions_running.remove(partition)
123            # If all partitions were generated and this was the last one, the stream is done
124            if (
125                partition.stream_name() not in self._streams_currently_generating_partitions
126                and len(partitions_running) == 0
127            ):
128                yield from self._on_stream_is_done(partition.stream_name())
129        yield from self._message_repository.consume_queue()
130
131    def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
132        """
133        This method is called when a record is read from a partition.
134        1. Convert the record to an AirbyteMessage
135        2. If this is the first record for the stream, mark the stream as RUNNING
136        3. Increment the record counter for the stream
137        4. Ensures the cursor knows the record has been successfully emitted
138        5. Emit the message
139        6. Emit messages that were added to the message repository
140        """
141        # Do not pass a transformer or a schema
142        # AbstractStreams are expected to return data as they are expected.
143        # Any transformation on the data should be done before reaching this point
144        message = stream_data_to_airbyte_message(
145            stream_name=record.stream_name,
146            data_or_message=record.data,
147            file_reference=record.file_reference,
148        )
149        stream = self._stream_name_to_instance[record.stream_name]
150
151        if message.type == MessageType.RECORD:
152            if self._record_counter[stream.name] == 0:
153                self._logger.info(f"Marking stream {stream.name} as RUNNING")
154                yield stream_status_as_airbyte_message(
155                    stream.as_airbyte_stream(), AirbyteStreamStatus.RUNNING
156                )
157            self._record_counter[stream.name] += 1
158        yield message
159        yield from self._message_repository.consume_queue()
160
161    def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMessage]:
162        """
163        This method is called when an exception is raised.
164        1. Stop all running streams
165        2. Raise the exception
166        """
167        self._flag_exception(exception.stream_name, exception.exception)
168        self._logger.exception(
169            f"Exception while syncing stream {exception.stream_name}", exc_info=exception.exception
170        )
171
172        stream_descriptor = StreamDescriptor(name=exception.stream_name)
173        if isinstance(exception.exception, AirbyteTracedException):
174            yield exception.exception.as_airbyte_message(stream_descriptor=stream_descriptor)
175        else:
176            yield AirbyteTracedException.from_exception(
177                exception.exception,
178                stream_descriptor=stream_descriptor,
179                message=f"An unexpected error occurred in stream {exception.stream_name}: {type(exception.exception).__name__}",
180            ).as_airbyte_message()
181
182    def _flag_exception(self, stream_name: str, exception: Exception) -> None:
183        self._exceptions_per_stream_name.setdefault(stream_name, []).append(exception)
184
185    def start_next_partition_generator(self) -> Optional[AirbyteMessage]:
186        """
187        Start the next partition generator.
188        1. Pop the next stream to read from
189        2. Submit the partition generator to the thread pool manager
190        3. Add the stream to the list of streams currently generating partitions
191        4. Return a stream status message
192        """
193        if self._stream_instances_to_start_partition_generation:
194            stream = self._stream_instances_to_start_partition_generation.pop(0)
195            self._thread_pool_manager.submit(self._partition_enqueuer.generate_partitions, stream)
196            self._streams_currently_generating_partitions.append(stream.name)
197            self._logger.info(f"Marking stream {stream.name} as STARTED")
198            self._logger.info(f"Syncing stream: {stream.name} ")
199            return stream_status_as_airbyte_message(
200                stream.as_airbyte_stream(),
201                AirbyteStreamStatus.STARTED,
202            )
203        else:
204            return None
205
206    def is_done(self) -> bool:
207        """
208        This method is called to check if the sync is done.
209        The sync is done when:
210        1. There are no more streams generating partitions
211        2. There are no more streams to read from
212        3. All partitions for all streams are closed
213        """
214        is_done = all(
215            [
216                self._is_stream_done(stream_name)
217                for stream_name in self._stream_name_to_instance.keys()
218            ]
219        )
220        if is_done and self._exceptions_per_stream_name:
221            error_message = generate_failed_streams_error_message(self._exceptions_per_stream_name)
222            self._logger.info(error_message)
223            # We still raise at least one exception when a stream raises an exception because the platform currently relies
224            # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error
225            # type because this combined error isn't actionable, but rather the previously emitted individual errors.
226            raise AirbyteTracedException(
227                message=error_message,
228                internal_message="Concurrent read failure",
229                failure_type=FailureType.config_error,
230            )
231        return is_done
232
233    def _is_stream_done(self, stream_name: str) -> bool:
234        return stream_name in self._streams_done
235
236    def _on_stream_is_done(self, stream_name: str) -> Iterable[AirbyteMessage]:
237        self._logger.info(
238            f"Read {self._record_counter[stream_name]} records from {stream_name} stream"
239        )
240        self._logger.info(f"Marking stream {stream_name} as STOPPED")
241        stream = self._stream_name_to_instance[stream_name]
242        stream.cursor.ensure_at_least_one_state_emitted()
243        yield from self._message_repository.consume_queue()
244        self._logger.info(f"Finished syncing {stream.name}")
245        self._streams_done.add(stream_name)
246        stream_status = (
247            AirbyteStreamStatus.INCOMPLETE
248            if self._exceptions_per_stream_name.get(stream_name, [])
249            else AirbyteStreamStatus.COMPLETE
250        )
251        yield stream_status_as_airbyte_message(stream.as_airbyte_stream(), stream_status)
33    def __init__(
34        self,
35        stream_instances_to_read_from: List[AbstractStream],
36        partition_enqueuer: PartitionEnqueuer,
37        thread_pool_manager: ThreadPoolManager,
38        logger: logging.Logger,
39        slice_logger: SliceLogger,
40        message_repository: MessageRepository,
41        partition_reader: PartitionReader,
42    ):
43        """
44        This class is responsible for handling items from a concurrent stream read process.
45        :param stream_instances_to_read_from: List of streams to read from
46        :param partition_enqueuer: PartitionEnqueuer instance
47        :param thread_pool_manager: ThreadPoolManager instance
48        :param logger: Logger instance
49        :param slice_logger: SliceLogger instance
50        :param message_repository: MessageRepository instance
51        :param partition_reader: PartitionReader instance
52        """
53        self._stream_name_to_instance = {s.name: s for s in stream_instances_to_read_from}
54        self._record_counter = {}
55        self._streams_to_running_partitions: Dict[str, Set[Partition]] = {}
56        for stream in stream_instances_to_read_from:
57            self._streams_to_running_partitions[stream.name] = set()
58            self._record_counter[stream.name] = 0
59        self._thread_pool_manager = thread_pool_manager
60        self._partition_enqueuer = partition_enqueuer
61        self._stream_instances_to_start_partition_generation = stream_instances_to_read_from
62        self._streams_currently_generating_partitions: List[str] = []
63        self._logger = logger
64        self._slice_logger = slice_logger
65        self._message_repository = message_repository
66        self._partition_reader = partition_reader
67        self._streams_done: Set[str] = set()
68        self._exceptions_per_stream_name: dict[str, List[Exception]] = {}

This class is responsible for handling items from a concurrent stream read process.

Parameters
  • stream_instances_to_read_from: List of streams to read from
  • partition_enqueuer: PartitionEnqueuer instance
  • thread_pool_manager: ThreadPoolManager instance
  • logger: Logger instance
  • slice_logger: SliceLogger instance
  • message_repository: MessageRepository instance
  • partition_reader: PartitionReader instance
70    def on_partition_generation_completed(
71        self, sentinel: PartitionGenerationCompletedSentinel
72    ) -> Iterable[AirbyteMessage]:
73        """
74        This method is called when a partition generation is completed.
75        1. Remove the stream from the list of streams currently generating partitions
76        2. If the stream is done, mark it as such and return a stream status message
77        3. If there are more streams to read from, start the next partition generator
78        """
79        stream_name = sentinel.stream.name
80        self._streams_currently_generating_partitions.remove(sentinel.stream.name)
81        # It is possible for the stream to already be done if no partitions were generated
82        # If the partition generation process was completed and there are no partitions left to process, the stream is done
83        if (
84            self._is_stream_done(stream_name)
85            or len(self._streams_to_running_partitions[stream_name]) == 0
86        ):
87            yield from self._on_stream_is_done(stream_name)
88        if self._stream_instances_to_start_partition_generation:
89            yield self.start_next_partition_generator()  # type:ignore # None may be yielded

This method is called when a partition generation is completed.

  1. Remove the stream from the list of streams currently generating partitions
  2. If the stream is done, mark it as such and return a stream status message
  3. If there are more streams to read from, start the next partition generator
def on_partition( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition) -> None:
 91    def on_partition(self, partition: Partition) -> None:
 92        """
 93        This method is called when a partition is generated.
 94        1. Add the partition to the set of partitions for the stream
 95        2. Log the slice if necessary
 96        3. Submit the partition to the thread pool manager
 97        """
 98        stream_name = partition.stream_name()
 99        self._streams_to_running_partitions[stream_name].add(partition)
100        cursor = self._stream_name_to_instance[stream_name].cursor
101        if self._slice_logger.should_log_slice_message(self._logger):
102            self._message_repository.emit_message(
103                self._slice_logger.create_slice_log_message(partition.to_slice())
104            )
105        self._thread_pool_manager.submit(
106            self._partition_reader.process_partition, partition, cursor
107        )

This method is called when a partition is generated.

  1. Add the partition to the set of partitions for the stream
  2. Log the slice if necessary
  3. Submit the partition to the thread pool manager
def on_partition_complete_sentinel( self, sentinel: airbyte_cdk.sources.streams.concurrent.partitions.types.PartitionCompleteSentinel) -> Iterable[airbyte_cdk.AirbyteMessage]:
109    def on_partition_complete_sentinel(
110        self, sentinel: PartitionCompleteSentinel
111    ) -> Iterable[AirbyteMessage]:
112        """
113        This method is called when a partition is completed.
114        1. Close the partition
115        2. If the stream is done, mark it as such and return a stream status message
116        3. Emit messages that were added to the message repository
117        """
118        partition = sentinel.partition
119
120        partitions_running = self._streams_to_running_partitions[partition.stream_name()]
121        if partition in partitions_running:
122            partitions_running.remove(partition)
123            # If all partitions were generated and this was the last one, the stream is done
124            if (
125                partition.stream_name() not in self._streams_currently_generating_partitions
126                and len(partitions_running) == 0
127            ):
128                yield from self._on_stream_is_done(partition.stream_name())
129        yield from self._message_repository.consume_queue()

This method is called when a partition is completed.

  1. Close the partition
  2. If the stream is done, mark it as such and return a stream status message
  3. Emit messages that were added to the message repository
def on_record( self, record: airbyte_cdk.Record) -> Iterable[airbyte_cdk.AirbyteMessage]:
131    def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
132        """
133        This method is called when a record is read from a partition.
134        1. Convert the record to an AirbyteMessage
135        2. If this is the first record for the stream, mark the stream as RUNNING
136        3. Increment the record counter for the stream
137        4. Ensures the cursor knows the record has been successfully emitted
138        5. Emit the message
139        6. Emit messages that were added to the message repository
140        """
141        # Do not pass a transformer or a schema
142        # AbstractStreams are expected to return data as they are expected.
143        # Any transformation on the data should be done before reaching this point
144        message = stream_data_to_airbyte_message(
145            stream_name=record.stream_name,
146            data_or_message=record.data,
147            file_reference=record.file_reference,
148        )
149        stream = self._stream_name_to_instance[record.stream_name]
150
151        if message.type == MessageType.RECORD:
152            if self._record_counter[stream.name] == 0:
153                self._logger.info(f"Marking stream {stream.name} as RUNNING")
154                yield stream_status_as_airbyte_message(
155                    stream.as_airbyte_stream(), AirbyteStreamStatus.RUNNING
156                )
157            self._record_counter[stream.name] += 1
158        yield message
159        yield from self._message_repository.consume_queue()

This method is called when a record is read from a partition.

  1. Convert the record to an AirbyteMessage
  2. If this is the first record for the stream, mark the stream as RUNNING
  3. Increment the record counter for the stream
  4. Ensures the cursor knows the record has been successfully emitted
  5. Emit the message
  6. Emit messages that were added to the message repository
161    def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMessage]:
162        """
163        This method is called when an exception is raised.
164        1. Stop all running streams
165        2. Raise the exception
166        """
167        self._flag_exception(exception.stream_name, exception.exception)
168        self._logger.exception(
169            f"Exception while syncing stream {exception.stream_name}", exc_info=exception.exception
170        )
171
172        stream_descriptor = StreamDescriptor(name=exception.stream_name)
173        if isinstance(exception.exception, AirbyteTracedException):
174            yield exception.exception.as_airbyte_message(stream_descriptor=stream_descriptor)
175        else:
176            yield AirbyteTracedException.from_exception(
177                exception.exception,
178                stream_descriptor=stream_descriptor,
179                message=f"An unexpected error occurred in stream {exception.stream_name}: {type(exception.exception).__name__}",
180            ).as_airbyte_message()

This method is called when an exception is raised.

  1. Stop all running streams
  2. Raise the exception
def start_next_partition_generator(self) -> Optional[airbyte_cdk.AirbyteMessage]:
185    def start_next_partition_generator(self) -> Optional[AirbyteMessage]:
186        """
187        Start the next partition generator.
188        1. Pop the next stream to read from
189        2. Submit the partition generator to the thread pool manager
190        3. Add the stream to the list of streams currently generating partitions
191        4. Return a stream status message
192        """
193        if self._stream_instances_to_start_partition_generation:
194            stream = self._stream_instances_to_start_partition_generation.pop(0)
195            self._thread_pool_manager.submit(self._partition_enqueuer.generate_partitions, stream)
196            self._streams_currently_generating_partitions.append(stream.name)
197            self._logger.info(f"Marking stream {stream.name} as STARTED")
198            self._logger.info(f"Syncing stream: {stream.name} ")
199            return stream_status_as_airbyte_message(
200                stream.as_airbyte_stream(),
201                AirbyteStreamStatus.STARTED,
202            )
203        else:
204            return None

Start the next partition generator.

  1. Pop the next stream to read from
  2. Submit the partition generator to the thread pool manager
  3. Add the stream to the list of streams currently generating partitions
  4. Return a stream status message
def is_done(self) -> bool:
206    def is_done(self) -> bool:
207        """
208        This method is called to check if the sync is done.
209        The sync is done when:
210        1. There are no more streams generating partitions
211        2. There are no more streams to read from
212        3. All partitions for all streams are closed
213        """
214        is_done = all(
215            [
216                self._is_stream_done(stream_name)
217                for stream_name in self._stream_name_to_instance.keys()
218            ]
219        )
220        if is_done and self._exceptions_per_stream_name:
221            error_message = generate_failed_streams_error_message(self._exceptions_per_stream_name)
222            self._logger.info(error_message)
223            # We still raise at least one exception when a stream raises an exception because the platform currently relies
224            # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error
225            # type because this combined error isn't actionable, but rather the previously emitted individual errors.
226            raise AirbyteTracedException(
227                message=error_message,
228                internal_message="Concurrent read failure",
229                failure_type=FailureType.config_error,
230            )
231        return is_done

This method is called to check if the sync is done. The sync is done when:

  1. There are no more streams generating partitions
  2. There are no more streams to read from
  3. All partitions for all streams are closed