airbyte_cdk.sources.concurrent_source.concurrent_read_processor

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4import logging
  5import os
  6from typing import Dict, Iterable, List, Optional, Set
  7
  8from airbyte_cdk.exception_handler import generate_failed_streams_error_message
  9from airbyte_cdk.models import AirbyteMessage, AirbyteStreamStatus, FailureType, StreamDescriptor
 10from airbyte_cdk.models import Type as MessageType
 11from airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel import (
 12    PartitionGenerationCompletedSentinel,
 13)
 14from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException
 15from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager
 16from airbyte_cdk.sources.message import MessageRepository
 17from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
 18from airbyte_cdk.sources.streams.concurrent.partition_enqueuer import PartitionEnqueuer
 19from airbyte_cdk.sources.streams.concurrent.partition_reader import PartitionReader
 20from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
 21from airbyte_cdk.sources.streams.concurrent.partitions.types import PartitionCompleteSentinel
 22from airbyte_cdk.sources.types import Record
 23from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message
 24from airbyte_cdk.sources.utils.slice_logger import SliceLogger
 25from airbyte_cdk.utils import AirbyteTracedException
 26from airbyte_cdk.utils.stream_status_utils import (
 27    as_airbyte_message as stream_status_as_airbyte_message,
 28)
 29
 30
 31class ConcurrentReadProcessor:
 32    def __init__(
 33        self,
 34        stream_instances_to_read_from: List[AbstractStream],
 35        partition_enqueuer: PartitionEnqueuer,
 36        thread_pool_manager: ThreadPoolManager,
 37        logger: logging.Logger,
 38        slice_logger: SliceLogger,
 39        message_repository: MessageRepository,
 40        partition_reader: PartitionReader,
 41    ):
 42        """
 43        This class is responsible for handling items from a concurrent stream read process.
 44        :param stream_instances_to_read_from: List of streams to read from
 45        :param partition_enqueuer: PartitionEnqueuer instance
 46        :param thread_pool_manager: ThreadPoolManager instance
 47        :param logger: Logger instance
 48        :param slice_logger: SliceLogger instance
 49        :param message_repository: MessageRepository instance
 50        :param partition_reader: PartitionReader instance
 51        """
 52        self._stream_name_to_instance = {s.name: s for s in stream_instances_to_read_from}
 53        self._record_counter = {}
 54        self._streams_to_running_partitions: Dict[str, Set[Partition]] = {}
 55        for stream in stream_instances_to_read_from:
 56            self._streams_to_running_partitions[stream.name] = set()
 57            self._record_counter[stream.name] = 0
 58        self._thread_pool_manager = thread_pool_manager
 59        self._partition_enqueuer = partition_enqueuer
 60        self._stream_instances_to_start_partition_generation = stream_instances_to_read_from
 61        self._streams_currently_generating_partitions: List[str] = []
 62        self._logger = logger
 63        self._slice_logger = slice_logger
 64        self._message_repository = message_repository
 65        self._partition_reader = partition_reader
 66        self._streams_done: Set[str] = set()
 67        self._exceptions_per_stream_name: dict[str, List[Exception]] = {}
 68
 69    def on_partition_generation_completed(
 70        self, sentinel: PartitionGenerationCompletedSentinel
 71    ) -> Iterable[AirbyteMessage]:
 72        """
 73        This method is called when a partition generation is completed.
 74        1. Remove the stream from the list of streams currently generating partitions
 75        2. If the stream is done, mark it as such and return a stream status message
 76        3. If there are more streams to read from, start the next partition generator
 77        """
 78        stream_name = sentinel.stream.name
 79        self._streams_currently_generating_partitions.remove(sentinel.stream.name)
 80        # It is possible for the stream to already be done if no partitions were generated
 81        # If the partition generation process was completed and there are no partitions left to process, the stream is done
 82        if (
 83            self._is_stream_done(stream_name)
 84            or len(self._streams_to_running_partitions[stream_name]) == 0
 85        ):
 86            yield from self._on_stream_is_done(stream_name)
 87        if self._stream_instances_to_start_partition_generation:
 88            yield self.start_next_partition_generator()  # type:ignore # None may be yielded
 89
 90    def on_partition(self, partition: Partition) -> None:
 91        """
 92        This method is called when a partition is generated.
 93        1. Add the partition to the set of partitions for the stream
 94        2. Log the slice if necessary
 95        3. Submit the partition to the thread pool manager
 96        """
 97        stream_name = partition.stream_name()
 98        self._streams_to_running_partitions[stream_name].add(partition)
 99        cursor = self._stream_name_to_instance[stream_name].cursor
100        if self._slice_logger.should_log_slice_message(self._logger):
101            self._message_repository.emit_message(
102                self._slice_logger.create_slice_log_message(partition.to_slice())
103            )
104        self._thread_pool_manager.submit(
105            self._partition_reader.process_partition, partition, cursor
106        )
107
108    def on_partition_complete_sentinel(
109        self, sentinel: PartitionCompleteSentinel
110    ) -> Iterable[AirbyteMessage]:
111        """
112        This method is called when a partition is completed.
113        1. Close the partition
114        2. If the stream is done, mark it as such and return a stream status message
115        3. Emit messages that were added to the message repository
116        """
117        partition = sentinel.partition
118
119        partitions_running = self._streams_to_running_partitions[partition.stream_name()]
120        if partition in partitions_running:
121            partitions_running.remove(partition)
122            # If all partitions were generated and this was the last one, the stream is done
123            if (
124                partition.stream_name() not in self._streams_currently_generating_partitions
125                and len(partitions_running) == 0
126            ):
127                yield from self._on_stream_is_done(partition.stream_name())
128        yield from self._message_repository.consume_queue()
129
130    def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
131        """
132        This method is called when a record is read from a partition.
133        1. Convert the record to an AirbyteMessage
134        2. If this is the first record for the stream, mark the stream as RUNNING
135        3. Increment the record counter for the stream
136        4. Ensures the cursor knows the record has been successfully emitted
137        5. Emit the message
138        6. Emit messages that were added to the message repository
139        """
140        # Do not pass a transformer or a schema
141        # AbstractStreams are expected to return data as they are expected.
142        # Any transformation on the data should be done before reaching this point
143        message = stream_data_to_airbyte_message(
144            stream_name=record.stream_name,
145            data_or_message=record.data,
146            file_reference=record.file_reference,
147        )
148        stream = self._stream_name_to_instance[record.stream_name]
149
150        if message.type == MessageType.RECORD:
151            if self._record_counter[stream.name] == 0:
152                self._logger.info(f"Marking stream {stream.name} as RUNNING")
153                yield stream_status_as_airbyte_message(
154                    stream.as_airbyte_stream(), AirbyteStreamStatus.RUNNING
155                )
156            self._record_counter[stream.name] += 1
157        yield message
158        yield from self._message_repository.consume_queue()
159
160    def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMessage]:
161        """
162        This method is called when an exception is raised.
163        1. Stop all running streams
164        2. Raise the exception
165        """
166        self._flag_exception(exception.stream_name, exception.exception)
167        self._logger.exception(
168            f"Exception while syncing stream {exception.stream_name}", exc_info=exception.exception
169        )
170
171        stream_descriptor = StreamDescriptor(name=exception.stream_name)
172        if isinstance(exception.exception, AirbyteTracedException):
173            yield exception.exception.as_airbyte_message(stream_descriptor=stream_descriptor)
174        else:
175            yield AirbyteTracedException.from_exception(
176                exception, stream_descriptor=stream_descriptor
177            ).as_airbyte_message()
178
179    def _flag_exception(self, stream_name: str, exception: Exception) -> None:
180        self._exceptions_per_stream_name.setdefault(stream_name, []).append(exception)
181
182    def start_next_partition_generator(self) -> Optional[AirbyteMessage]:
183        """
184        Start the next partition generator.
185        1. Pop the next stream to read from
186        2. Submit the partition generator to the thread pool manager
187        3. Add the stream to the list of streams currently generating partitions
188        4. Return a stream status message
189        """
190        if self._stream_instances_to_start_partition_generation:
191            stream = self._stream_instances_to_start_partition_generation.pop(0)
192            self._thread_pool_manager.submit(self._partition_enqueuer.generate_partitions, stream)
193            self._streams_currently_generating_partitions.append(stream.name)
194            self._logger.info(f"Marking stream {stream.name} as STARTED")
195            self._logger.info(f"Syncing stream: {stream.name} ")
196            return stream_status_as_airbyte_message(
197                stream.as_airbyte_stream(),
198                AirbyteStreamStatus.STARTED,
199            )
200        else:
201            return None
202
203    def is_done(self) -> bool:
204        """
205        This method is called to check if the sync is done.
206        The sync is done when:
207        1. There are no more streams generating partitions
208        2. There are no more streams to read from
209        3. All partitions for all streams are closed
210        """
211        is_done = all(
212            [
213                self._is_stream_done(stream_name)
214                for stream_name in self._stream_name_to_instance.keys()
215            ]
216        )
217        if is_done and self._exceptions_per_stream_name:
218            error_message = generate_failed_streams_error_message(self._exceptions_per_stream_name)
219            self._logger.info(error_message)
220            # We still raise at least one exception when a stream raises an exception because the platform currently relies
221            # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error
222            # type because this combined error isn't actionable, but rather the previously emitted individual errors.
223            raise AirbyteTracedException(
224                message=error_message,
225                internal_message="Concurrent read failure",
226                failure_type=FailureType.config_error,
227            )
228        return is_done
229
230    def _is_stream_done(self, stream_name: str) -> bool:
231        return stream_name in self._streams_done
232
233    def _on_stream_is_done(self, stream_name: str) -> Iterable[AirbyteMessage]:
234        self._logger.info(
235            f"Read {self._record_counter[stream_name]} records from {stream_name} stream"
236        )
237        self._logger.info(f"Marking stream {stream_name} as STOPPED")
238        stream = self._stream_name_to_instance[stream_name]
239        stream.cursor.ensure_at_least_one_state_emitted()
240        yield from self._message_repository.consume_queue()
241        self._logger.info(f"Finished syncing {stream.name}")
242        self._streams_done.add(stream_name)
243        stream_status = (
244            AirbyteStreamStatus.INCOMPLETE
245            if self._exceptions_per_stream_name.get(stream_name, [])
246            else AirbyteStreamStatus.COMPLETE
247        )
248        yield stream_status_as_airbyte_message(stream.as_airbyte_stream(), stream_status)
class ConcurrentReadProcessor:
 32class ConcurrentReadProcessor:
 33    def __init__(
 34        self,
 35        stream_instances_to_read_from: List[AbstractStream],
 36        partition_enqueuer: PartitionEnqueuer,
 37        thread_pool_manager: ThreadPoolManager,
 38        logger: logging.Logger,
 39        slice_logger: SliceLogger,
 40        message_repository: MessageRepository,
 41        partition_reader: PartitionReader,
 42    ):
 43        """
 44        This class is responsible for handling items from a concurrent stream read process.
 45        :param stream_instances_to_read_from: List of streams to read from
 46        :param partition_enqueuer: PartitionEnqueuer instance
 47        :param thread_pool_manager: ThreadPoolManager instance
 48        :param logger: Logger instance
 49        :param slice_logger: SliceLogger instance
 50        :param message_repository: MessageRepository instance
 51        :param partition_reader: PartitionReader instance
 52        """
 53        self._stream_name_to_instance = {s.name: s for s in stream_instances_to_read_from}
 54        self._record_counter = {}
 55        self._streams_to_running_partitions: Dict[str, Set[Partition]] = {}
 56        for stream in stream_instances_to_read_from:
 57            self._streams_to_running_partitions[stream.name] = set()
 58            self._record_counter[stream.name] = 0
 59        self._thread_pool_manager = thread_pool_manager
 60        self._partition_enqueuer = partition_enqueuer
 61        self._stream_instances_to_start_partition_generation = stream_instances_to_read_from
 62        self._streams_currently_generating_partitions: List[str] = []
 63        self._logger = logger
 64        self._slice_logger = slice_logger
 65        self._message_repository = message_repository
 66        self._partition_reader = partition_reader
 67        self._streams_done: Set[str] = set()
 68        self._exceptions_per_stream_name: dict[str, List[Exception]] = {}
 69
 70    def on_partition_generation_completed(
 71        self, sentinel: PartitionGenerationCompletedSentinel
 72    ) -> Iterable[AirbyteMessage]:
 73        """
 74        This method is called when a partition generation is completed.
 75        1. Remove the stream from the list of streams currently generating partitions
 76        2. If the stream is done, mark it as such and return a stream status message
 77        3. If there are more streams to read from, start the next partition generator
 78        """
 79        stream_name = sentinel.stream.name
 80        self._streams_currently_generating_partitions.remove(sentinel.stream.name)
 81        # It is possible for the stream to already be done if no partitions were generated
 82        # If the partition generation process was completed and there are no partitions left to process, the stream is done
 83        if (
 84            self._is_stream_done(stream_name)
 85            or len(self._streams_to_running_partitions[stream_name]) == 0
 86        ):
 87            yield from self._on_stream_is_done(stream_name)
 88        if self._stream_instances_to_start_partition_generation:
 89            yield self.start_next_partition_generator()  # type:ignore # None may be yielded
 90
 91    def on_partition(self, partition: Partition) -> None:
 92        """
 93        This method is called when a partition is generated.
 94        1. Add the partition to the set of partitions for the stream
 95        2. Log the slice if necessary
 96        3. Submit the partition to the thread pool manager
 97        """
 98        stream_name = partition.stream_name()
 99        self._streams_to_running_partitions[stream_name].add(partition)
100        cursor = self._stream_name_to_instance[stream_name].cursor
101        if self._slice_logger.should_log_slice_message(self._logger):
102            self._message_repository.emit_message(
103                self._slice_logger.create_slice_log_message(partition.to_slice())
104            )
105        self._thread_pool_manager.submit(
106            self._partition_reader.process_partition, partition, cursor
107        )
108
109    def on_partition_complete_sentinel(
110        self, sentinel: PartitionCompleteSentinel
111    ) -> Iterable[AirbyteMessage]:
112        """
113        This method is called when a partition is completed.
114        1. Close the partition
115        2. If the stream is done, mark it as such and return a stream status message
116        3. Emit messages that were added to the message repository
117        """
118        partition = sentinel.partition
119
120        partitions_running = self._streams_to_running_partitions[partition.stream_name()]
121        if partition in partitions_running:
122            partitions_running.remove(partition)
123            # If all partitions were generated and this was the last one, the stream is done
124            if (
125                partition.stream_name() not in self._streams_currently_generating_partitions
126                and len(partitions_running) == 0
127            ):
128                yield from self._on_stream_is_done(partition.stream_name())
129        yield from self._message_repository.consume_queue()
130
131    def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
132        """
133        This method is called when a record is read from a partition.
134        1. Convert the record to an AirbyteMessage
135        2. If this is the first record for the stream, mark the stream as RUNNING
136        3. Increment the record counter for the stream
137        4. Ensures the cursor knows the record has been successfully emitted
138        5. Emit the message
139        6. Emit messages that were added to the message repository
140        """
141        # Do not pass a transformer or a schema
142        # AbstractStreams are expected to return data as they are expected.
143        # Any transformation on the data should be done before reaching this point
144        message = stream_data_to_airbyte_message(
145            stream_name=record.stream_name,
146            data_or_message=record.data,
147            file_reference=record.file_reference,
148        )
149        stream = self._stream_name_to_instance[record.stream_name]
150
151        if message.type == MessageType.RECORD:
152            if self._record_counter[stream.name] == 0:
153                self._logger.info(f"Marking stream {stream.name} as RUNNING")
154                yield stream_status_as_airbyte_message(
155                    stream.as_airbyte_stream(), AirbyteStreamStatus.RUNNING
156                )
157            self._record_counter[stream.name] += 1
158        yield message
159        yield from self._message_repository.consume_queue()
160
161    def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMessage]:
162        """
163        This method is called when an exception is raised.
164        1. Stop all running streams
165        2. Raise the exception
166        """
167        self._flag_exception(exception.stream_name, exception.exception)
168        self._logger.exception(
169            f"Exception while syncing stream {exception.stream_name}", exc_info=exception.exception
170        )
171
172        stream_descriptor = StreamDescriptor(name=exception.stream_name)
173        if isinstance(exception.exception, AirbyteTracedException):
174            yield exception.exception.as_airbyte_message(stream_descriptor=stream_descriptor)
175        else:
176            yield AirbyteTracedException.from_exception(
177                exception, stream_descriptor=stream_descriptor
178            ).as_airbyte_message()
179
180    def _flag_exception(self, stream_name: str, exception: Exception) -> None:
181        self._exceptions_per_stream_name.setdefault(stream_name, []).append(exception)
182
183    def start_next_partition_generator(self) -> Optional[AirbyteMessage]:
184        """
185        Start the next partition generator.
186        1. Pop the next stream to read from
187        2. Submit the partition generator to the thread pool manager
188        3. Add the stream to the list of streams currently generating partitions
189        4. Return a stream status message
190        """
191        if self._stream_instances_to_start_partition_generation:
192            stream = self._stream_instances_to_start_partition_generation.pop(0)
193            self._thread_pool_manager.submit(self._partition_enqueuer.generate_partitions, stream)
194            self._streams_currently_generating_partitions.append(stream.name)
195            self._logger.info(f"Marking stream {stream.name} as STARTED")
196            self._logger.info(f"Syncing stream: {stream.name} ")
197            return stream_status_as_airbyte_message(
198                stream.as_airbyte_stream(),
199                AirbyteStreamStatus.STARTED,
200            )
201        else:
202            return None
203
204    def is_done(self) -> bool:
205        """
206        This method is called to check if the sync is done.
207        The sync is done when:
208        1. There are no more streams generating partitions
209        2. There are no more streams to read from
210        3. All partitions for all streams are closed
211        """
212        is_done = all(
213            [
214                self._is_stream_done(stream_name)
215                for stream_name in self._stream_name_to_instance.keys()
216            ]
217        )
218        if is_done and self._exceptions_per_stream_name:
219            error_message = generate_failed_streams_error_message(self._exceptions_per_stream_name)
220            self._logger.info(error_message)
221            # We still raise at least one exception when a stream raises an exception because the platform currently relies
222            # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error
223            # type because this combined error isn't actionable, but rather the previously emitted individual errors.
224            raise AirbyteTracedException(
225                message=error_message,
226                internal_message="Concurrent read failure",
227                failure_type=FailureType.config_error,
228            )
229        return is_done
230
231    def _is_stream_done(self, stream_name: str) -> bool:
232        return stream_name in self._streams_done
233
234    def _on_stream_is_done(self, stream_name: str) -> Iterable[AirbyteMessage]:
235        self._logger.info(
236            f"Read {self._record_counter[stream_name]} records from {stream_name} stream"
237        )
238        self._logger.info(f"Marking stream {stream_name} as STOPPED")
239        stream = self._stream_name_to_instance[stream_name]
240        stream.cursor.ensure_at_least_one_state_emitted()
241        yield from self._message_repository.consume_queue()
242        self._logger.info(f"Finished syncing {stream.name}")
243        self._streams_done.add(stream_name)
244        stream_status = (
245            AirbyteStreamStatus.INCOMPLETE
246            if self._exceptions_per_stream_name.get(stream_name, [])
247            else AirbyteStreamStatus.COMPLETE
248        )
249        yield stream_status_as_airbyte_message(stream.as_airbyte_stream(), stream_status)
33    def __init__(
34        self,
35        stream_instances_to_read_from: List[AbstractStream],
36        partition_enqueuer: PartitionEnqueuer,
37        thread_pool_manager: ThreadPoolManager,
38        logger: logging.Logger,
39        slice_logger: SliceLogger,
40        message_repository: MessageRepository,
41        partition_reader: PartitionReader,
42    ):
43        """
44        This class is responsible for handling items from a concurrent stream read process.
45        :param stream_instances_to_read_from: List of streams to read from
46        :param partition_enqueuer: PartitionEnqueuer instance
47        :param thread_pool_manager: ThreadPoolManager instance
48        :param logger: Logger instance
49        :param slice_logger: SliceLogger instance
50        :param message_repository: MessageRepository instance
51        :param partition_reader: PartitionReader instance
52        """
53        self._stream_name_to_instance = {s.name: s for s in stream_instances_to_read_from}
54        self._record_counter = {}
55        self._streams_to_running_partitions: Dict[str, Set[Partition]] = {}
56        for stream in stream_instances_to_read_from:
57            self._streams_to_running_partitions[stream.name] = set()
58            self._record_counter[stream.name] = 0
59        self._thread_pool_manager = thread_pool_manager
60        self._partition_enqueuer = partition_enqueuer
61        self._stream_instances_to_start_partition_generation = stream_instances_to_read_from
62        self._streams_currently_generating_partitions: List[str] = []
63        self._logger = logger
64        self._slice_logger = slice_logger
65        self._message_repository = message_repository
66        self._partition_reader = partition_reader
67        self._streams_done: Set[str] = set()
68        self._exceptions_per_stream_name: dict[str, List[Exception]] = {}

This class is responsible for handling items from a concurrent stream read process.

Parameters
  • stream_instances_to_read_from: List of streams to read from
  • partition_enqueuer: PartitionEnqueuer instance
  • thread_pool_manager: ThreadPoolManager instance
  • logger: Logger instance
  • slice_logger: SliceLogger instance
  • message_repository: MessageRepository instance
  • partition_reader: PartitionReader instance
70    def on_partition_generation_completed(
71        self, sentinel: PartitionGenerationCompletedSentinel
72    ) -> Iterable[AirbyteMessage]:
73        """
74        This method is called when a partition generation is completed.
75        1. Remove the stream from the list of streams currently generating partitions
76        2. If the stream is done, mark it as such and return a stream status message
77        3. If there are more streams to read from, start the next partition generator
78        """
79        stream_name = sentinel.stream.name
80        self._streams_currently_generating_partitions.remove(sentinel.stream.name)
81        # It is possible for the stream to already be done if no partitions were generated
82        # If the partition generation process was completed and there are no partitions left to process, the stream is done
83        if (
84            self._is_stream_done(stream_name)
85            or len(self._streams_to_running_partitions[stream_name]) == 0
86        ):
87            yield from self._on_stream_is_done(stream_name)
88        if self._stream_instances_to_start_partition_generation:
89            yield self.start_next_partition_generator()  # type:ignore # None may be yielded

This method is called when a partition generation is completed.

  1. Remove the stream from the list of streams currently generating partitions
  2. If the stream is done, mark it as such and return a stream status message
  3. If there are more streams to read from, start the next partition generator
def on_partition( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition) -> None:
 91    def on_partition(self, partition: Partition) -> None:
 92        """
 93        This method is called when a partition is generated.
 94        1. Add the partition to the set of partitions for the stream
 95        2. Log the slice if necessary
 96        3. Submit the partition to the thread pool manager
 97        """
 98        stream_name = partition.stream_name()
 99        self._streams_to_running_partitions[stream_name].add(partition)
100        cursor = self._stream_name_to_instance[stream_name].cursor
101        if self._slice_logger.should_log_slice_message(self._logger):
102            self._message_repository.emit_message(
103                self._slice_logger.create_slice_log_message(partition.to_slice())
104            )
105        self._thread_pool_manager.submit(
106            self._partition_reader.process_partition, partition, cursor
107        )

This method is called when a partition is generated.

  1. Add the partition to the set of partitions for the stream
  2. Log the slice if necessary
  3. Submit the partition to the thread pool manager
def on_partition_complete_sentinel( self, sentinel: airbyte_cdk.sources.streams.concurrent.partitions.types.PartitionCompleteSentinel) -> Iterable[airbyte_cdk.AirbyteMessage]:
109    def on_partition_complete_sentinel(
110        self, sentinel: PartitionCompleteSentinel
111    ) -> Iterable[AirbyteMessage]:
112        """
113        This method is called when a partition is completed.
114        1. Close the partition
115        2. If the stream is done, mark it as such and return a stream status message
116        3. Emit messages that were added to the message repository
117        """
118        partition = sentinel.partition
119
120        partitions_running = self._streams_to_running_partitions[partition.stream_name()]
121        if partition in partitions_running:
122            partitions_running.remove(partition)
123            # If all partitions were generated and this was the last one, the stream is done
124            if (
125                partition.stream_name() not in self._streams_currently_generating_partitions
126                and len(partitions_running) == 0
127            ):
128                yield from self._on_stream_is_done(partition.stream_name())
129        yield from self._message_repository.consume_queue()

This method is called when a partition is completed.

  1. Close the partition
  2. If the stream is done, mark it as such and return a stream status message
  3. Emit messages that were added to the message repository
def on_record( self, record: airbyte_cdk.Record) -> Iterable[airbyte_cdk.AirbyteMessage]:
131    def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
132        """
133        This method is called when a record is read from a partition.
134        1. Convert the record to an AirbyteMessage
135        2. If this is the first record for the stream, mark the stream as RUNNING
136        3. Increment the record counter for the stream
137        4. Ensures the cursor knows the record has been successfully emitted
138        5. Emit the message
139        6. Emit messages that were added to the message repository
140        """
141        # Do not pass a transformer or a schema
142        # AbstractStreams are expected to return data as they are expected.
143        # Any transformation on the data should be done before reaching this point
144        message = stream_data_to_airbyte_message(
145            stream_name=record.stream_name,
146            data_or_message=record.data,
147            file_reference=record.file_reference,
148        )
149        stream = self._stream_name_to_instance[record.stream_name]
150
151        if message.type == MessageType.RECORD:
152            if self._record_counter[stream.name] == 0:
153                self._logger.info(f"Marking stream {stream.name} as RUNNING")
154                yield stream_status_as_airbyte_message(
155                    stream.as_airbyte_stream(), AirbyteStreamStatus.RUNNING
156                )
157            self._record_counter[stream.name] += 1
158        yield message
159        yield from self._message_repository.consume_queue()

This method is called when a record is read from a partition.

  1. Convert the record to an AirbyteMessage
  2. If this is the first record for the stream, mark the stream as RUNNING
  3. Increment the record counter for the stream
  4. Ensures the cursor knows the record has been successfully emitted
  5. Emit the message
  6. Emit messages that were added to the message repository
161    def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMessage]:
162        """
163        This method is called when an exception is raised.
164        1. Stop all running streams
165        2. Raise the exception
166        """
167        self._flag_exception(exception.stream_name, exception.exception)
168        self._logger.exception(
169            f"Exception while syncing stream {exception.stream_name}", exc_info=exception.exception
170        )
171
172        stream_descriptor = StreamDescriptor(name=exception.stream_name)
173        if isinstance(exception.exception, AirbyteTracedException):
174            yield exception.exception.as_airbyte_message(stream_descriptor=stream_descriptor)
175        else:
176            yield AirbyteTracedException.from_exception(
177                exception, stream_descriptor=stream_descriptor
178            ).as_airbyte_message()

This method is called when an exception is raised.

  1. Stop all running streams
  2. Raise the exception
def start_next_partition_generator(self) -> Optional[airbyte_cdk.AirbyteMessage]:
183    def start_next_partition_generator(self) -> Optional[AirbyteMessage]:
184        """
185        Start the next partition generator.
186        1. Pop the next stream to read from
187        2. Submit the partition generator to the thread pool manager
188        3. Add the stream to the list of streams currently generating partitions
189        4. Return a stream status message
190        """
191        if self._stream_instances_to_start_partition_generation:
192            stream = self._stream_instances_to_start_partition_generation.pop(0)
193            self._thread_pool_manager.submit(self._partition_enqueuer.generate_partitions, stream)
194            self._streams_currently_generating_partitions.append(stream.name)
195            self._logger.info(f"Marking stream {stream.name} as STARTED")
196            self._logger.info(f"Syncing stream: {stream.name} ")
197            return stream_status_as_airbyte_message(
198                stream.as_airbyte_stream(),
199                AirbyteStreamStatus.STARTED,
200            )
201        else:
202            return None

Start the next partition generator.

  1. Pop the next stream to read from
  2. Submit the partition generator to the thread pool manager
  3. Add the stream to the list of streams currently generating partitions
  4. Return a stream status message
def is_done(self) -> bool:
204    def is_done(self) -> bool:
205        """
206        This method is called to check if the sync is done.
207        The sync is done when:
208        1. There are no more streams generating partitions
209        2. There are no more streams to read from
210        3. All partitions for all streams are closed
211        """
212        is_done = all(
213            [
214                self._is_stream_done(stream_name)
215                for stream_name in self._stream_name_to_instance.keys()
216            ]
217        )
218        if is_done and self._exceptions_per_stream_name:
219            error_message = generate_failed_streams_error_message(self._exceptions_per_stream_name)
220            self._logger.info(error_message)
221            # We still raise at least one exception when a stream raises an exception because the platform currently relies
222            # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error
223            # type because this combined error isn't actionable, but rather the previously emitted individual errors.
224            raise AirbyteTracedException(
225                message=error_message,
226                internal_message="Concurrent read failure",
227                failure_type=FailureType.config_error,
228            )
229        return is_done

This method is called to check if the sync is done. The sync is done when:

  1. There are no more streams generating partitions
  2. There are no more streams to read from
  3. All partitions for all streams are closed