airbyte_cdk.sources.concurrent_source.concurrent_read_processor

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4import logging
  5import os
  6from typing import Dict, Iterable, List, Optional, Set
  7
  8from airbyte_cdk.exception_handler import generate_failed_streams_error_message
  9from airbyte_cdk.models import AirbyteMessage, AirbyteStreamStatus, FailureType, StreamDescriptor
 10from airbyte_cdk.models import Type as MessageType
 11from airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel import (
 12    PartitionGenerationCompletedSentinel,
 13)
 14from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException
 15from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager
 16from airbyte_cdk.sources.declarative.partition_routers.grouping_partition_router import (
 17    GroupingPartitionRouter,
 18)
 19from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
 20    SubstreamPartitionRouter,
 21)
 22from airbyte_cdk.sources.message import MessageRepository
 23from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
 24from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
 25from airbyte_cdk.sources.streams.concurrent.partition_enqueuer import PartitionEnqueuer
 26from airbyte_cdk.sources.streams.concurrent.partition_reader import PartitionReader
 27from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
 28from airbyte_cdk.sources.streams.concurrent.partitions.types import PartitionCompleteSentinel
 29from airbyte_cdk.sources.types import Record
 30from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message
 31from airbyte_cdk.sources.utils.slice_logger import SliceLogger
 32from airbyte_cdk.utils import AirbyteTracedException
 33from airbyte_cdk.utils.stream_status_utils import (
 34    as_airbyte_message as stream_status_as_airbyte_message,
 35)
 36
 37
 38class ConcurrentReadProcessor:
 39    def __init__(
 40        self,
 41        stream_instances_to_read_from: List[AbstractStream],
 42        partition_enqueuer: PartitionEnqueuer,
 43        thread_pool_manager: ThreadPoolManager,
 44        logger: logging.Logger,
 45        slice_logger: SliceLogger,
 46        message_repository: MessageRepository,
 47        partition_reader: PartitionReader,
 48        max_concurrent_partition_generators: Optional[int] = None,
 49    ):
 50        """
 51        This class is responsible for handling items from a concurrent stream read process.
 52        :param stream_instances_to_read_from: List of streams to read from
 53        :param partition_enqueuer: PartitionEnqueuer instance
 54        :param thread_pool_manager: ThreadPoolManager instance
 55        :param logger: Logger instance
 56        :param slice_logger: SliceLogger instance
 57        :param message_repository: MessageRepository instance
 58        :param partition_reader: PartitionReader instance
 59        :param max_concurrent_partition_generators: Maximum number of partition generators allowed
 60            to run concurrently. None means no limit. When set, should be less than the number of
 61            workers in multi-worker mode so at least one worker slot is always available for
 62            partition reading, preventing thread pool starvation. In single-threaded mode
 63            (num_workers=1) the value may equal num_workers; ConcurrentSource.create() handles
 64            this distinction. ConcurrentSource.read() passes this value explicitly.
 65        """
 66        self._stream_name_to_instance = {s.name: s for s in stream_instances_to_read_from}
 67        self._record_counter = {}
 68        self._streams_to_running_partitions: Dict[str, Set[Partition]] = {}
 69        for stream in stream_instances_to_read_from:
 70            self._streams_to_running_partitions[stream.name] = set()
 71            self._record_counter[stream.name] = 0
 72        if (
 73            max_concurrent_partition_generators is not None
 74            and max_concurrent_partition_generators < 1
 75        ):
 76            raise ValueError(
 77                f"max_concurrent_partition_generators must be >= 1 or None, got {max_concurrent_partition_generators}"
 78            )
 79        self._thread_pool_manager = thread_pool_manager
 80        self._partition_enqueuer = partition_enqueuer
 81        self._max_concurrent_partition_generators = max_concurrent_partition_generators
 82        self._stream_instances_to_start_partition_generation = stream_instances_to_read_from
 83        self._streams_currently_generating_partitions: List[str] = []
 84        self._logger = logger
 85        self._slice_logger = slice_logger
 86        self._message_repository = message_repository
 87        self._partition_reader = partition_reader
 88        self._streams_done: Set[str] = set()
 89        self._exceptions_per_stream_name: dict[str, List[Exception]] = {}
 90
 91        # Track which streams (by name) are currently active
 92        # A stream is "active" if it's generating partitions or has partitions being read
 93        self._active_stream_names: Set[str] = set()
 94
 95        # Store blocking group names for streams that require blocking simultaneous reads
 96        # Maps stream name -> group name (empty string means no blocking)
 97        self._stream_block_simultaneous_read: Dict[str, str] = {
 98            stream.name: stream.block_simultaneous_read for stream in stream_instances_to_read_from
 99        }
100
101        # Track which groups are currently active
102        # Maps group name -> set of stream names in that group
103        self._active_groups: Dict[str, Set[str]] = {}
104
105        for stream in stream_instances_to_read_from:
106            if stream.block_simultaneous_read:
107                self._logger.info(
108                    f"Stream '{stream.name}' is in blocking group '{stream.block_simultaneous_read}'. "
109                    f"Will defer starting this stream if another stream in the same group or its parents are active."
110                )
111
112    def on_partition_generation_completed(
113        self, sentinel: PartitionGenerationCompletedSentinel
114    ) -> Iterable[AirbyteMessage]:
115        """
116        This method is called when a partition generation is completed.
117        1. Remove the stream from the list of streams currently generating partitions
118        2. Deactivate parent streams (they were only needed for partition generation)
119        3. If the stream is done, mark it as such and return a stream status message
120        4. If there are more streams to read from, start the next partition generator
121        """
122        stream_name = sentinel.stream.name
123        self._streams_currently_generating_partitions.remove(sentinel.stream.name)
124
125        # Deactivate all parent streams now that partition generation is complete
126        # Parents were only needed to generate slices, they can now be reused
127        parent_streams = self._collect_all_parent_stream_names(stream_name)
128        for parent_stream_name in parent_streams:
129            if parent_stream_name in self._active_stream_names:
130                self._logger.debug(f"Removing '{parent_stream_name}' from active streams")
131                self._active_stream_names.discard(parent_stream_name)
132
133                # Remove from active groups
134                parent_group = self._stream_block_simultaneous_read.get(parent_stream_name, "")
135                if parent_group:
136                    if parent_group in self._active_groups:
137                        self._active_groups[parent_group].discard(parent_stream_name)
138                        if not self._active_groups[parent_group]:
139                            del self._active_groups[parent_group]
140                    self._logger.info(
141                        f"Parent stream '{parent_stream_name}' (group '{parent_group}') deactivated after "
142                        f"partition generation completed for child '{stream_name}'. "
143                        f"Blocked streams in the queue will be retried on next start_next_partition_generator call."
144                    )
145
146        # It is possible for the stream to already be done if no partitions were generated
147        # If the partition generation process was completed and there are no partitions left to process, the stream is done
148        if (
149            self._is_stream_done(stream_name)
150            or len(self._streams_to_running_partitions[stream_name]) == 0
151        ):
152            yield from self._on_stream_is_done(stream_name)
153        if self._stream_instances_to_start_partition_generation:
154            status_message = self.start_next_partition_generator()
155            if status_message:
156                yield status_message
157
158    def on_partition(self, partition: Partition) -> None:
159        """
160        This method is called when a partition is generated.
161        1. Add the partition to the set of partitions for the stream
162        2. Log the slice if necessary
163        3. Submit the partition to the thread pool manager
164        """
165        stream_name = partition.stream_name()
166        self._streams_to_running_partitions[stream_name].add(partition)
167        cursor = self._stream_name_to_instance[stream_name].cursor
168        if self._slice_logger.should_log_slice_message(self._logger):
169            self._message_repository.emit_message(
170                self._slice_logger.create_slice_log_message(partition.to_slice())
171            )
172        self._thread_pool_manager.submit(
173            self._partition_reader.process_partition, partition, cursor
174        )
175
176    def on_partition_complete_sentinel(
177        self, sentinel: PartitionCompleteSentinel
178    ) -> Iterable[AirbyteMessage]:
179        """
180        This method is called when a partition is completed.
181        1. Close the partition
182        2. If the stream is done, mark it as such and return a stream status message
183        3. Emit messages that were added to the message repository
184        4. If there are more streams to read from, start the next partition generator
185        """
186        partition = sentinel.partition
187
188        partitions_running = self._streams_to_running_partitions[partition.stream_name()]
189        if partition in partitions_running:
190            partitions_running.remove(partition)
191            # If all partitions were generated and this was the last one, the stream is done
192            if (
193                partition.stream_name() not in self._streams_currently_generating_partitions
194                and len(partitions_running) == 0
195            ):
196                yield from self._on_stream_is_done(partition.stream_name())
197                # Try to start the next stream in the queue (may be a deferred stream)
198                if self._stream_instances_to_start_partition_generation:
199                    status_message = self.start_next_partition_generator()
200                    if status_message:
201                        yield status_message
202        yield from self._message_repository.consume_queue()
203
204    def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
205        """
206        This method is called when a record is read from a partition.
207        1. Convert the record to an AirbyteMessage
208        2. If this is the first record for the stream, mark the stream as RUNNING
209        3. Increment the record counter for the stream
210        4. Ensures the cursor knows the record has been successfully emitted
211        5. Emit the message
212        6. Emit messages that were added to the message repository
213        """
214        # Do not pass a transformer or a schema
215        # AbstractStreams are expected to return data as they are expected.
216        # Any transformation on the data should be done before reaching this point
217        message = stream_data_to_airbyte_message(
218            stream_name=record.stream_name,
219            data_or_message=record.data,
220            file_reference=record.file_reference,
221        )
222        stream = self._stream_name_to_instance[record.stream_name]
223
224        if message.type == MessageType.RECORD:
225            if self._record_counter[stream.name] == 0:
226                self._logger.info(f"Marking stream {stream.name} as RUNNING")
227                yield stream_status_as_airbyte_message(
228                    stream.as_airbyte_stream(), AirbyteStreamStatus.RUNNING
229                )
230            self._record_counter[stream.name] += 1
231        yield message
232        yield from self._message_repository.consume_queue()
233
234    def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMessage]:
235        """
236        This method is called when an exception is raised.
237        1. Stop all running streams
238        2. Raise the exception
239        """
240        self._flag_exception(exception.stream_name, exception.exception)
241        self._logger.exception(
242            f"Exception while syncing stream {exception.stream_name}", exc_info=exception.exception
243        )
244
245        stream_descriptor = StreamDescriptor(name=exception.stream_name)
246        if isinstance(exception.exception, AirbyteTracedException):
247            yield exception.exception.as_airbyte_message(stream_descriptor=stream_descriptor)
248        else:
249            yield AirbyteTracedException.from_exception(
250                exception.exception,
251                stream_descriptor=stream_descriptor,
252                message=f"An unexpected error occurred in stream {exception.stream_name}: {type(exception.exception).__name__}",
253            ).as_airbyte_message()
254
255    def _flag_exception(self, stream_name: str, exception: Exception) -> None:
256        self._exceptions_per_stream_name.setdefault(stream_name, []).append(exception)
257
258    def start_next_partition_generator(self) -> Optional[AirbyteMessage]:
259        """
260        Submits the next partition generator to the thread pool.
261
262        A stream will be deferred (moved to end of queue) if:
263        1. The stream itself has block_simultaneous_read=True AND is already active
264        2. Any parent stream has block_simultaneous_read=True AND is currently active
265
266        This prevents simultaneous reads of streams that shouldn't be accessed concurrently.
267
268        :return: A status message if a partition generator was started, otherwise None
269        """
270        if not self._stream_instances_to_start_partition_generation:
271            return None
272
273        # Enforce the concurrent generator cap so at least one worker slot is always available
274        # for partition reading. Recovery is guaranteed: on_partition_generation_completed
275        # decrements the count before calling here, so the guard always passes there.
276        if (
277            self._max_concurrent_partition_generators is not None
278            and len(self._streams_currently_generating_partitions)
279            >= self._max_concurrent_partition_generators
280        ):
281            self._logger.debug(
282                f"Concurrent partition generator cap ({self._max_concurrent_partition_generators}) reached "
283                f"({len(self._streams_currently_generating_partitions)} active). Deferring next generator start."
284            )
285            return None
286
287        # Remember initial queue size to avoid infinite loops if all streams are blocked
288        max_attempts = len(self._stream_instances_to_start_partition_generation)
289        attempts = 0
290
291        while self._stream_instances_to_start_partition_generation and attempts < max_attempts:
292            attempts += 1
293
294            # Pop the first stream from the queue
295            stream = self._stream_instances_to_start_partition_generation.pop(0)
296            stream_name = stream.name
297            stream_group = self._stream_block_simultaneous_read.get(stream_name, "")
298
299            # Check if this stream has a blocking group and is already active as parent stream
300            # (i.e. being read from during partition generation for another stream)
301            if stream_group and stream_name in self._active_stream_names:
302                # Add back to the END of the queue for retry later
303                self._stream_instances_to_start_partition_generation.append(stream)
304                self._logger.info(
305                    f"Deferring stream '{stream_name}' (group '{stream_group}') because it's already active. Trying next stream."
306                )
307                continue  # Try the next stream in the queue
308
309            # Check if this stream's group is already active (another stream in the same group is running)
310            if (
311                stream_group
312                and stream_group in self._active_groups
313                and self._active_groups[stream_group]
314            ):
315                # Add back to the END of the queue for retry later
316                self._stream_instances_to_start_partition_generation.append(stream)
317                active_streams_in_group = self._active_groups[stream_group]
318                self._logger.info(
319                    f"Deferring stream '{stream_name}' (group '{stream_group}') because other stream(s) "
320                    f"{active_streams_in_group} in the same group are active. Trying next stream."
321                )
322                continue  # Try the next stream in the queue
323
324            # Check if any parent streams have a blocking group and are currently active
325            parent_streams = self._collect_all_parent_stream_names(stream_name)
326            blocked_by_parents = [
327                p
328                for p in parent_streams
329                if self._stream_block_simultaneous_read.get(p, "")
330                and p in self._active_stream_names
331            ]
332
333            if blocked_by_parents:
334                # Add back to the END of the queue for retry later
335                self._stream_instances_to_start_partition_generation.append(stream)
336                parent_groups = {
337                    self._stream_block_simultaneous_read.get(p, "") for p in blocked_by_parents
338                }
339                self._logger.info(
340                    f"Deferring stream '{stream_name}' because parent stream(s) "
341                    f"{blocked_by_parents} (groups {parent_groups}) are active. Trying next stream."
342                )
343                continue  # Try the next stream in the queue
344
345            # No blocking - start this stream
346            # Mark stream as active before starting
347            self._active_stream_names.add(stream_name)
348            self._streams_currently_generating_partitions.append(stream_name)
349
350            # Track this stream in its group if it has one
351            if stream_group:
352                if stream_group not in self._active_groups:
353                    self._active_groups[stream_group] = set()
354                self._active_groups[stream_group].add(stream_name)
355                self._logger.debug(f"Added '{stream_name}' to active group '{stream_group}'")
356
357            # Also mark all parent streams as active (they will be read from during partition generation)
358            for parent_stream_name in parent_streams:
359                parent_group = self._stream_block_simultaneous_read.get(parent_stream_name, "")
360                if parent_group:
361                    self._active_stream_names.add(parent_stream_name)
362                    if parent_group not in self._active_groups:
363                        self._active_groups[parent_group] = set()
364                    self._active_groups[parent_group].add(parent_stream_name)
365                    self._logger.info(
366                        f"Marking parent stream '{parent_stream_name}' (group '{parent_group}') as active "
367                        f"(will be read during partition generation for '{stream_name}')"
368                    )
369
370            self._thread_pool_manager.submit(self._partition_enqueuer.generate_partitions, stream)
371            self._logger.info(f"Marking stream {stream_name} as STARTED")
372            self._logger.info(f"Syncing stream: {stream_name}")
373            return stream_status_as_airbyte_message(
374                stream.as_airbyte_stream(),
375                AirbyteStreamStatus.STARTED,
376            )
377
378        # All streams in the queue are currently blocked
379        return None
380
381    def is_done(self) -> bool:
382        """
383        This method is called to check if the sync is done.
384        The sync is done when:
385        1. There are no more streams generating partitions
386        2. There are no more streams to read from
387        3. All partitions for all streams are closed
388        """
389        is_done = all(
390            [
391                self._is_stream_done(stream_name)
392                for stream_name in self._stream_name_to_instance.keys()
393            ]
394        )
395        if is_done and self._stream_instances_to_start_partition_generation:
396            stuck_stream_names = [
397                s.name for s in self._stream_instances_to_start_partition_generation
398            ]
399            raise AirbyteTracedException(
400                message="Partition generation queue is not empty after all streams completed.",
401                internal_message=f"Streams {stuck_stream_names} remained in the partition generation queue after all streams were marked done.",
402                failure_type=FailureType.system_error,
403            )
404        if is_done and self._active_groups:
405            raise AirbyteTracedException(
406                message="Active stream groups are not empty after all streams completed.",
407                internal_message=f"Groups {dict(self._active_groups)} still active after all streams were marked done.",
408                failure_type=FailureType.system_error,
409            )
410        if is_done and self._exceptions_per_stream_name:
411            error_message = generate_failed_streams_error_message(self._exceptions_per_stream_name)
412            self._logger.info(error_message)
413            # We still raise at least one exception when a stream raises an exception because the platform currently relies
414            # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error
415            # type because this combined error isn't actionable, but rather the previously emitted individual errors.
416            raise AirbyteTracedException(
417                message=error_message,
418                internal_message="Concurrent read failure",
419                failure_type=FailureType.config_error,
420            )
421        return is_done
422
423    def _is_stream_done(self, stream_name: str) -> bool:
424        return stream_name in self._streams_done
425
426    def _collect_all_parent_stream_names(self, stream_name: str) -> Set[str]:
427        """Recursively collect all parent stream names for a given stream.
428
429        For example, if we have: epics -> issues -> comments
430        Then for comments, this returns {issues, epics}.
431        """
432        parent_names: Set[str] = set()
433        stream = self._stream_name_to_instance.get(stream_name)
434
435        if not stream:
436            return parent_names
437
438        partition_router = (
439            stream.get_partition_router() if isinstance(stream, DefaultStream) else None
440        )
441        if isinstance(partition_router, GroupingPartitionRouter):
442            partition_router = partition_router.underlying_partition_router
443
444        if isinstance(partition_router, SubstreamPartitionRouter):
445            for parent_config in partition_router.parent_stream_configs:
446                parent_name = parent_config.stream.name
447                parent_names.add(parent_name)
448                parent_names.update(self._collect_all_parent_stream_names(parent_name))
449
450        return parent_names
451
452    def _on_stream_is_done(self, stream_name: str) -> Iterable[AirbyteMessage]:
453        self._logger.info(
454            f"Read {self._record_counter[stream_name]} records from {stream_name} stream"
455        )
456        self._logger.info(f"Marking stream {stream_name} as STOPPED")
457        stream = self._stream_name_to_instance[stream_name]
458        stream.cursor.ensure_at_least_one_state_emitted()
459        yield from self._message_repository.consume_queue()
460        self._logger.info(f"Finished syncing {stream.name}")
461        self._streams_done.add(stream_name)
462        stream_status = (
463            AirbyteStreamStatus.INCOMPLETE
464            if self._exceptions_per_stream_name.get(stream_name, [])
465            else AirbyteStreamStatus.COMPLETE
466        )
467        yield stream_status_as_airbyte_message(stream.as_airbyte_stream(), stream_status)
468
469        # Remove only this stream from active set (NOT parents)
470        if stream_name in self._active_stream_names:
471            self._active_stream_names.discard(stream_name)
472
473            # Remove from active groups
474            stream_group = self._stream_block_simultaneous_read.get(stream_name, "")
475            if stream_group:
476                if stream_group in self._active_groups:
477                    self._active_groups[stream_group].discard(stream_name)
478                    if not self._active_groups[stream_group]:
479                        del self._active_groups[stream_group]
480                self._logger.info(
481                    f"Stream '{stream_name}' (group '{stream_group}') is no longer active. "
482                    f"Blocked streams in the queue will be retried on next start_next_partition_generator call."
483                )
class ConcurrentReadProcessor:
 39class ConcurrentReadProcessor:
 40    def __init__(
 41        self,
 42        stream_instances_to_read_from: List[AbstractStream],
 43        partition_enqueuer: PartitionEnqueuer,
 44        thread_pool_manager: ThreadPoolManager,
 45        logger: logging.Logger,
 46        slice_logger: SliceLogger,
 47        message_repository: MessageRepository,
 48        partition_reader: PartitionReader,
 49        max_concurrent_partition_generators: Optional[int] = None,
 50    ):
 51        """
 52        This class is responsible for handling items from a concurrent stream read process.
 53        :param stream_instances_to_read_from: List of streams to read from
 54        :param partition_enqueuer: PartitionEnqueuer instance
 55        :param thread_pool_manager: ThreadPoolManager instance
 56        :param logger: Logger instance
 57        :param slice_logger: SliceLogger instance
 58        :param message_repository: MessageRepository instance
 59        :param partition_reader: PartitionReader instance
 60        :param max_concurrent_partition_generators: Maximum number of partition generators allowed
 61            to run concurrently. None means no limit. When set, should be less than the number of
 62            workers in multi-worker mode so at least one worker slot is always available for
 63            partition reading, preventing thread pool starvation. In single-threaded mode
 64            (num_workers=1) the value may equal num_workers; ConcurrentSource.create() handles
 65            this distinction. ConcurrentSource.read() passes this value explicitly.
 66        """
 67        self._stream_name_to_instance = {s.name: s for s in stream_instances_to_read_from}
 68        self._record_counter = {}
 69        self._streams_to_running_partitions: Dict[str, Set[Partition]] = {}
 70        for stream in stream_instances_to_read_from:
 71            self._streams_to_running_partitions[stream.name] = set()
 72            self._record_counter[stream.name] = 0
 73        if (
 74            max_concurrent_partition_generators is not None
 75            and max_concurrent_partition_generators < 1
 76        ):
 77            raise ValueError(
 78                f"max_concurrent_partition_generators must be >= 1 or None, got {max_concurrent_partition_generators}"
 79            )
 80        self._thread_pool_manager = thread_pool_manager
 81        self._partition_enqueuer = partition_enqueuer
 82        self._max_concurrent_partition_generators = max_concurrent_partition_generators
 83        self._stream_instances_to_start_partition_generation = stream_instances_to_read_from
 84        self._streams_currently_generating_partitions: List[str] = []
 85        self._logger = logger
 86        self._slice_logger = slice_logger
 87        self._message_repository = message_repository
 88        self._partition_reader = partition_reader
 89        self._streams_done: Set[str] = set()
 90        self._exceptions_per_stream_name: dict[str, List[Exception]] = {}
 91
 92        # Track which streams (by name) are currently active
 93        # A stream is "active" if it's generating partitions or has partitions being read
 94        self._active_stream_names: Set[str] = set()
 95
 96        # Store blocking group names for streams that require blocking simultaneous reads
 97        # Maps stream name -> group name (empty string means no blocking)
 98        self._stream_block_simultaneous_read: Dict[str, str] = {
 99            stream.name: stream.block_simultaneous_read for stream in stream_instances_to_read_from
100        }
101
102        # Track which groups are currently active
103        # Maps group name -> set of stream names in that group
104        self._active_groups: Dict[str, Set[str]] = {}
105
106        for stream in stream_instances_to_read_from:
107            if stream.block_simultaneous_read:
108                self._logger.info(
109                    f"Stream '{stream.name}' is in blocking group '{stream.block_simultaneous_read}'. "
110                    f"Will defer starting this stream if another stream in the same group or its parents are active."
111                )
112
113    def on_partition_generation_completed(
114        self, sentinel: PartitionGenerationCompletedSentinel
115    ) -> Iterable[AirbyteMessage]:
116        """
117        This method is called when a partition generation is completed.
118        1. Remove the stream from the list of streams currently generating partitions
119        2. Deactivate parent streams (they were only needed for partition generation)
120        3. If the stream is done, mark it as such and return a stream status message
121        4. If there are more streams to read from, start the next partition generator
122        """
123        stream_name = sentinel.stream.name
124        self._streams_currently_generating_partitions.remove(sentinel.stream.name)
125
126        # Deactivate all parent streams now that partition generation is complete
127        # Parents were only needed to generate slices, they can now be reused
128        parent_streams = self._collect_all_parent_stream_names(stream_name)
129        for parent_stream_name in parent_streams:
130            if parent_stream_name in self._active_stream_names:
131                self._logger.debug(f"Removing '{parent_stream_name}' from active streams")
132                self._active_stream_names.discard(parent_stream_name)
133
134                # Remove from active groups
135                parent_group = self._stream_block_simultaneous_read.get(parent_stream_name, "")
136                if parent_group:
137                    if parent_group in self._active_groups:
138                        self._active_groups[parent_group].discard(parent_stream_name)
139                        if not self._active_groups[parent_group]:
140                            del self._active_groups[parent_group]
141                    self._logger.info(
142                        f"Parent stream '{parent_stream_name}' (group '{parent_group}') deactivated after "
143                        f"partition generation completed for child '{stream_name}'. "
144                        f"Blocked streams in the queue will be retried on next start_next_partition_generator call."
145                    )
146
147        # It is possible for the stream to already be done if no partitions were generated
148        # If the partition generation process was completed and there are no partitions left to process, the stream is done
149        if (
150            self._is_stream_done(stream_name)
151            or len(self._streams_to_running_partitions[stream_name]) == 0
152        ):
153            yield from self._on_stream_is_done(stream_name)
154        if self._stream_instances_to_start_partition_generation:
155            status_message = self.start_next_partition_generator()
156            if status_message:
157                yield status_message
158
159    def on_partition(self, partition: Partition) -> None:
160        """
161        This method is called when a partition is generated.
162        1. Add the partition to the set of partitions for the stream
163        2. Log the slice if necessary
164        3. Submit the partition to the thread pool manager
165        """
166        stream_name = partition.stream_name()
167        self._streams_to_running_partitions[stream_name].add(partition)
168        cursor = self._stream_name_to_instance[stream_name].cursor
169        if self._slice_logger.should_log_slice_message(self._logger):
170            self._message_repository.emit_message(
171                self._slice_logger.create_slice_log_message(partition.to_slice())
172            )
173        self._thread_pool_manager.submit(
174            self._partition_reader.process_partition, partition, cursor
175        )
176
177    def on_partition_complete_sentinel(
178        self, sentinel: PartitionCompleteSentinel
179    ) -> Iterable[AirbyteMessage]:
180        """
181        This method is called when a partition is completed.
182        1. Close the partition
183        2. If the stream is done, mark it as such and return a stream status message
184        3. Emit messages that were added to the message repository
185        4. If there are more streams to read from, start the next partition generator
186        """
187        partition = sentinel.partition
188
189        partitions_running = self._streams_to_running_partitions[partition.stream_name()]
190        if partition in partitions_running:
191            partitions_running.remove(partition)
192            # If all partitions were generated and this was the last one, the stream is done
193            if (
194                partition.stream_name() not in self._streams_currently_generating_partitions
195                and len(partitions_running) == 0
196            ):
197                yield from self._on_stream_is_done(partition.stream_name())
198                # Try to start the next stream in the queue (may be a deferred stream)
199                if self._stream_instances_to_start_partition_generation:
200                    status_message = self.start_next_partition_generator()
201                    if status_message:
202                        yield status_message
203        yield from self._message_repository.consume_queue()
204
205    def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
206        """
207        This method is called when a record is read from a partition.
208        1. Convert the record to an AirbyteMessage
209        2. If this is the first record for the stream, mark the stream as RUNNING
210        3. Increment the record counter for the stream
211        4. Ensures the cursor knows the record has been successfully emitted
212        5. Emit the message
213        6. Emit messages that were added to the message repository
214        """
215        # Do not pass a transformer or a schema
216        # AbstractStreams are expected to return data as they are expected.
217        # Any transformation on the data should be done before reaching this point
218        message = stream_data_to_airbyte_message(
219            stream_name=record.stream_name,
220            data_or_message=record.data,
221            file_reference=record.file_reference,
222        )
223        stream = self._stream_name_to_instance[record.stream_name]
224
225        if message.type == MessageType.RECORD:
226            if self._record_counter[stream.name] == 0:
227                self._logger.info(f"Marking stream {stream.name} as RUNNING")
228                yield stream_status_as_airbyte_message(
229                    stream.as_airbyte_stream(), AirbyteStreamStatus.RUNNING
230                )
231            self._record_counter[stream.name] += 1
232        yield message
233        yield from self._message_repository.consume_queue()
234
235    def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMessage]:
236        """
237        This method is called when an exception is raised.
238        1. Stop all running streams
239        2. Raise the exception
240        """
241        self._flag_exception(exception.stream_name, exception.exception)
242        self._logger.exception(
243            f"Exception while syncing stream {exception.stream_name}", exc_info=exception.exception
244        )
245
246        stream_descriptor = StreamDescriptor(name=exception.stream_name)
247        if isinstance(exception.exception, AirbyteTracedException):
248            yield exception.exception.as_airbyte_message(stream_descriptor=stream_descriptor)
249        else:
250            yield AirbyteTracedException.from_exception(
251                exception.exception,
252                stream_descriptor=stream_descriptor,
253                message=f"An unexpected error occurred in stream {exception.stream_name}: {type(exception.exception).__name__}",
254            ).as_airbyte_message()
255
256    def _flag_exception(self, stream_name: str, exception: Exception) -> None:
257        self._exceptions_per_stream_name.setdefault(stream_name, []).append(exception)
258
259    def start_next_partition_generator(self) -> Optional[AirbyteMessage]:
260        """
261        Submits the next partition generator to the thread pool.
262
263        A stream will be deferred (moved to end of queue) if:
264        1. The stream itself has block_simultaneous_read=True AND is already active
265        2. Any parent stream has block_simultaneous_read=True AND is currently active
266
267        This prevents simultaneous reads of streams that shouldn't be accessed concurrently.
268
269        :return: A status message if a partition generator was started, otherwise None
270        """
271        if not self._stream_instances_to_start_partition_generation:
272            return None
273
274        # Enforce the concurrent generator cap so at least one worker slot is always available
275        # for partition reading. Recovery is guaranteed: on_partition_generation_completed
276        # decrements the count before calling here, so the guard always passes there.
277        if (
278            self._max_concurrent_partition_generators is not None
279            and len(self._streams_currently_generating_partitions)
280            >= self._max_concurrent_partition_generators
281        ):
282            self._logger.debug(
283                f"Concurrent partition generator cap ({self._max_concurrent_partition_generators}) reached "
284                f"({len(self._streams_currently_generating_partitions)} active). Deferring next generator start."
285            )
286            return None
287
288        # Remember initial queue size to avoid infinite loops if all streams are blocked
289        max_attempts = len(self._stream_instances_to_start_partition_generation)
290        attempts = 0
291
292        while self._stream_instances_to_start_partition_generation and attempts < max_attempts:
293            attempts += 1
294
295            # Pop the first stream from the queue
296            stream = self._stream_instances_to_start_partition_generation.pop(0)
297            stream_name = stream.name
298            stream_group = self._stream_block_simultaneous_read.get(stream_name, "")
299
300            # Check if this stream has a blocking group and is already active as parent stream
301            # (i.e. being read from during partition generation for another stream)
302            if stream_group and stream_name in self._active_stream_names:
303                # Add back to the END of the queue for retry later
304                self._stream_instances_to_start_partition_generation.append(stream)
305                self._logger.info(
306                    f"Deferring stream '{stream_name}' (group '{stream_group}') because it's already active. Trying next stream."
307                )
308                continue  # Try the next stream in the queue
309
310            # Check if this stream's group is already active (another stream in the same group is running)
311            if (
312                stream_group
313                and stream_group in self._active_groups
314                and self._active_groups[stream_group]
315            ):
316                # Add back to the END of the queue for retry later
317                self._stream_instances_to_start_partition_generation.append(stream)
318                active_streams_in_group = self._active_groups[stream_group]
319                self._logger.info(
320                    f"Deferring stream '{stream_name}' (group '{stream_group}') because other stream(s) "
321                    f"{active_streams_in_group} in the same group are active. Trying next stream."
322                )
323                continue  # Try the next stream in the queue
324
325            # Check if any parent streams have a blocking group and are currently active
326            parent_streams = self._collect_all_parent_stream_names(stream_name)
327            blocked_by_parents = [
328                p
329                for p in parent_streams
330                if self._stream_block_simultaneous_read.get(p, "")
331                and p in self._active_stream_names
332            ]
333
334            if blocked_by_parents:
335                # Add back to the END of the queue for retry later
336                self._stream_instances_to_start_partition_generation.append(stream)
337                parent_groups = {
338                    self._stream_block_simultaneous_read.get(p, "") for p in blocked_by_parents
339                }
340                self._logger.info(
341                    f"Deferring stream '{stream_name}' because parent stream(s) "
342                    f"{blocked_by_parents} (groups {parent_groups}) are active. Trying next stream."
343                )
344                continue  # Try the next stream in the queue
345
346            # No blocking - start this stream
347            # Mark stream as active before starting
348            self._active_stream_names.add(stream_name)
349            self._streams_currently_generating_partitions.append(stream_name)
350
351            # Track this stream in its group if it has one
352            if stream_group:
353                if stream_group not in self._active_groups:
354                    self._active_groups[stream_group] = set()
355                self._active_groups[stream_group].add(stream_name)
356                self._logger.debug(f"Added '{stream_name}' to active group '{stream_group}'")
357
358            # Also mark all parent streams as active (they will be read from during partition generation)
359            for parent_stream_name in parent_streams:
360                parent_group = self._stream_block_simultaneous_read.get(parent_stream_name, "")
361                if parent_group:
362                    self._active_stream_names.add(parent_stream_name)
363                    if parent_group not in self._active_groups:
364                        self._active_groups[parent_group] = set()
365                    self._active_groups[parent_group].add(parent_stream_name)
366                    self._logger.info(
367                        f"Marking parent stream '{parent_stream_name}' (group '{parent_group}') as active "
368                        f"(will be read during partition generation for '{stream_name}')"
369                    )
370
371            self._thread_pool_manager.submit(self._partition_enqueuer.generate_partitions, stream)
372            self._logger.info(f"Marking stream {stream_name} as STARTED")
373            self._logger.info(f"Syncing stream: {stream_name}")
374            return stream_status_as_airbyte_message(
375                stream.as_airbyte_stream(),
376                AirbyteStreamStatus.STARTED,
377            )
378
379        # All streams in the queue are currently blocked
380        return None
381
382    def is_done(self) -> bool:
383        """
384        This method is called to check if the sync is done.
385        The sync is done when:
386        1. There are no more streams generating partitions
387        2. There are no more streams to read from
388        3. All partitions for all streams are closed
389        """
390        is_done = all(
391            [
392                self._is_stream_done(stream_name)
393                for stream_name in self._stream_name_to_instance.keys()
394            ]
395        )
396        if is_done and self._stream_instances_to_start_partition_generation:
397            stuck_stream_names = [
398                s.name for s in self._stream_instances_to_start_partition_generation
399            ]
400            raise AirbyteTracedException(
401                message="Partition generation queue is not empty after all streams completed.",
402                internal_message=f"Streams {stuck_stream_names} remained in the partition generation queue after all streams were marked done.",
403                failure_type=FailureType.system_error,
404            )
405        if is_done and self._active_groups:
406            raise AirbyteTracedException(
407                message="Active stream groups are not empty after all streams completed.",
408                internal_message=f"Groups {dict(self._active_groups)} still active after all streams were marked done.",
409                failure_type=FailureType.system_error,
410            )
411        if is_done and self._exceptions_per_stream_name:
412            error_message = generate_failed_streams_error_message(self._exceptions_per_stream_name)
413            self._logger.info(error_message)
414            # We still raise at least one exception when a stream raises an exception because the platform currently relies
415            # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error
416            # type because this combined error isn't actionable, but rather the previously emitted individual errors.
417            raise AirbyteTracedException(
418                message=error_message,
419                internal_message="Concurrent read failure",
420                failure_type=FailureType.config_error,
421            )
422        return is_done
423
424    def _is_stream_done(self, stream_name: str) -> bool:
425        return stream_name in self._streams_done
426
427    def _collect_all_parent_stream_names(self, stream_name: str) -> Set[str]:
428        """Recursively collect all parent stream names for a given stream.
429
430        For example, if we have: epics -> issues -> comments
431        Then for comments, this returns {issues, epics}.
432        """
433        parent_names: Set[str] = set()
434        stream = self._stream_name_to_instance.get(stream_name)
435
436        if not stream:
437            return parent_names
438
439        partition_router = (
440            stream.get_partition_router() if isinstance(stream, DefaultStream) else None
441        )
442        if isinstance(partition_router, GroupingPartitionRouter):
443            partition_router = partition_router.underlying_partition_router
444
445        if isinstance(partition_router, SubstreamPartitionRouter):
446            for parent_config in partition_router.parent_stream_configs:
447                parent_name = parent_config.stream.name
448                parent_names.add(parent_name)
449                parent_names.update(self._collect_all_parent_stream_names(parent_name))
450
451        return parent_names
452
453    def _on_stream_is_done(self, stream_name: str) -> Iterable[AirbyteMessage]:
454        self._logger.info(
455            f"Read {self._record_counter[stream_name]} records from {stream_name} stream"
456        )
457        self._logger.info(f"Marking stream {stream_name} as STOPPED")
458        stream = self._stream_name_to_instance[stream_name]
459        stream.cursor.ensure_at_least_one_state_emitted()
460        yield from self._message_repository.consume_queue()
461        self._logger.info(f"Finished syncing {stream.name}")
462        self._streams_done.add(stream_name)
463        stream_status = (
464            AirbyteStreamStatus.INCOMPLETE
465            if self._exceptions_per_stream_name.get(stream_name, [])
466            else AirbyteStreamStatus.COMPLETE
467        )
468        yield stream_status_as_airbyte_message(stream.as_airbyte_stream(), stream_status)
469
470        # Remove only this stream from active set (NOT parents)
471        if stream_name in self._active_stream_names:
472            self._active_stream_names.discard(stream_name)
473
474            # Remove from active groups
475            stream_group = self._stream_block_simultaneous_read.get(stream_name, "")
476            if stream_group:
477                if stream_group in self._active_groups:
478                    self._active_groups[stream_group].discard(stream_name)
479                    if not self._active_groups[stream_group]:
480                        del self._active_groups[stream_group]
481                self._logger.info(
482                    f"Stream '{stream_name}' (group '{stream_group}') is no longer active. "
483                    f"Blocked streams in the queue will be retried on next start_next_partition_generator call."
484                )
ConcurrentReadProcessor( stream_instances_to_read_from: List[airbyte_cdk.sources.streams.concurrent.abstract_stream.AbstractStream], partition_enqueuer: airbyte_cdk.sources.streams.concurrent.partition_enqueuer.PartitionEnqueuer, thread_pool_manager: airbyte_cdk.sources.concurrent_source.thread_pool_manager.ThreadPoolManager, logger: logging.Logger, slice_logger: airbyte_cdk.sources.utils.slice_logger.SliceLogger, message_repository: airbyte_cdk.MessageRepository, partition_reader: airbyte_cdk.sources.streams.concurrent.partition_reader.PartitionReader, max_concurrent_partition_generators: Optional[int] = None)
 40    def __init__(
 41        self,
 42        stream_instances_to_read_from: List[AbstractStream],
 43        partition_enqueuer: PartitionEnqueuer,
 44        thread_pool_manager: ThreadPoolManager,
 45        logger: logging.Logger,
 46        slice_logger: SliceLogger,
 47        message_repository: MessageRepository,
 48        partition_reader: PartitionReader,
 49        max_concurrent_partition_generators: Optional[int] = None,
 50    ):
 51        """
 52        This class is responsible for handling items from a concurrent stream read process.
 53        :param stream_instances_to_read_from: List of streams to read from
 54        :param partition_enqueuer: PartitionEnqueuer instance
 55        :param thread_pool_manager: ThreadPoolManager instance
 56        :param logger: Logger instance
 57        :param slice_logger: SliceLogger instance
 58        :param message_repository: MessageRepository instance
 59        :param partition_reader: PartitionReader instance
 60        :param max_concurrent_partition_generators: Maximum number of partition generators allowed
 61            to run concurrently. None means no limit. When set, should be less than the number of
 62            workers in multi-worker mode so at least one worker slot is always available for
 63            partition reading, preventing thread pool starvation. In single-threaded mode
 64            (num_workers=1) the value may equal num_workers; ConcurrentSource.create() handles
 65            this distinction. ConcurrentSource.read() passes this value explicitly.
 66        """
 67        self._stream_name_to_instance = {s.name: s for s in stream_instances_to_read_from}
 68        self._record_counter = {}
 69        self._streams_to_running_partitions: Dict[str, Set[Partition]] = {}
 70        for stream in stream_instances_to_read_from:
 71            self._streams_to_running_partitions[stream.name] = set()
 72            self._record_counter[stream.name] = 0
 73        if (
 74            max_concurrent_partition_generators is not None
 75            and max_concurrent_partition_generators < 1
 76        ):
 77            raise ValueError(
 78                f"max_concurrent_partition_generators must be >= 1 or None, got {max_concurrent_partition_generators}"
 79            )
 80        self._thread_pool_manager = thread_pool_manager
 81        self._partition_enqueuer = partition_enqueuer
 82        self._max_concurrent_partition_generators = max_concurrent_partition_generators
 83        self._stream_instances_to_start_partition_generation = stream_instances_to_read_from
 84        self._streams_currently_generating_partitions: List[str] = []
 85        self._logger = logger
 86        self._slice_logger = slice_logger
 87        self._message_repository = message_repository
 88        self._partition_reader = partition_reader
 89        self._streams_done: Set[str] = set()
 90        self._exceptions_per_stream_name: dict[str, List[Exception]] = {}
 91
 92        # Track which streams (by name) are currently active
 93        # A stream is "active" if it's generating partitions or has partitions being read
 94        self._active_stream_names: Set[str] = set()
 95
 96        # Store blocking group names for streams that require blocking simultaneous reads
 97        # Maps stream name -> group name (empty string means no blocking)
 98        self._stream_block_simultaneous_read: Dict[str, str] = {
 99            stream.name: stream.block_simultaneous_read for stream in stream_instances_to_read_from
100        }
101
102        # Track which groups are currently active
103        # Maps group name -> set of stream names in that group
104        self._active_groups: Dict[str, Set[str]] = {}
105
106        for stream in stream_instances_to_read_from:
107            if stream.block_simultaneous_read:
108                self._logger.info(
109                    f"Stream '{stream.name}' is in blocking group '{stream.block_simultaneous_read}'. "
110                    f"Will defer starting this stream if another stream in the same group or its parents are active."
111                )

This class is responsible for handling items from a concurrent stream read process.

Parameters
  • stream_instances_to_read_from: List of streams to read from
  • partition_enqueuer: PartitionEnqueuer instance
  • thread_pool_manager: ThreadPoolManager instance
  • logger: Logger instance
  • slice_logger: SliceLogger instance
  • message_repository: MessageRepository instance
  • partition_reader: PartitionReader instance
  • max_concurrent_partition_generators: Maximum number of partition generators allowed to run concurrently. None means no limit. When set, should be less than the number of workers in multi-worker mode so at least one worker slot is always available for partition reading, preventing thread pool starvation. In single-threaded mode (num_workers=1) the value may equal num_workers; ConcurrentSource.create() handles this distinction. ConcurrentSource.read() passes this value explicitly.
113    def on_partition_generation_completed(
114        self, sentinel: PartitionGenerationCompletedSentinel
115    ) -> Iterable[AirbyteMessage]:
116        """
117        This method is called when a partition generation is completed.
118        1. Remove the stream from the list of streams currently generating partitions
119        2. Deactivate parent streams (they were only needed for partition generation)
120        3. If the stream is done, mark it as such and return a stream status message
121        4. If there are more streams to read from, start the next partition generator
122        """
123        stream_name = sentinel.stream.name
124        self._streams_currently_generating_partitions.remove(sentinel.stream.name)
125
126        # Deactivate all parent streams now that partition generation is complete
127        # Parents were only needed to generate slices, they can now be reused
128        parent_streams = self._collect_all_parent_stream_names(stream_name)
129        for parent_stream_name in parent_streams:
130            if parent_stream_name in self._active_stream_names:
131                self._logger.debug(f"Removing '{parent_stream_name}' from active streams")
132                self._active_stream_names.discard(parent_stream_name)
133
134                # Remove from active groups
135                parent_group = self._stream_block_simultaneous_read.get(parent_stream_name, "")
136                if parent_group:
137                    if parent_group in self._active_groups:
138                        self._active_groups[parent_group].discard(parent_stream_name)
139                        if not self._active_groups[parent_group]:
140                            del self._active_groups[parent_group]
141                    self._logger.info(
142                        f"Parent stream '{parent_stream_name}' (group '{parent_group}') deactivated after "
143                        f"partition generation completed for child '{stream_name}'. "
144                        f"Blocked streams in the queue will be retried on next start_next_partition_generator call."
145                    )
146
147        # It is possible for the stream to already be done if no partitions were generated
148        # If the partition generation process was completed and there are no partitions left to process, the stream is done
149        if (
150            self._is_stream_done(stream_name)
151            or len(self._streams_to_running_partitions[stream_name]) == 0
152        ):
153            yield from self._on_stream_is_done(stream_name)
154        if self._stream_instances_to_start_partition_generation:
155            status_message = self.start_next_partition_generator()
156            if status_message:
157                yield status_message

This method is called when a partition generation is completed.

  1. Remove the stream from the list of streams currently generating partitions
  2. Deactivate parent streams (they were only needed for partition generation)
  3. If the stream is done, mark it as such and return a stream status message
  4. If there are more streams to read from, start the next partition generator
def on_partition( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition) -> None:
159    def on_partition(self, partition: Partition) -> None:
160        """
161        This method is called when a partition is generated.
162        1. Add the partition to the set of partitions for the stream
163        2. Log the slice if necessary
164        3. Submit the partition to the thread pool manager
165        """
166        stream_name = partition.stream_name()
167        self._streams_to_running_partitions[stream_name].add(partition)
168        cursor = self._stream_name_to_instance[stream_name].cursor
169        if self._slice_logger.should_log_slice_message(self._logger):
170            self._message_repository.emit_message(
171                self._slice_logger.create_slice_log_message(partition.to_slice())
172            )
173        self._thread_pool_manager.submit(
174            self._partition_reader.process_partition, partition, cursor
175        )

This method is called when a partition is generated.

  1. Add the partition to the set of partitions for the stream
  2. Log the slice if necessary
  3. Submit the partition to the thread pool manager
def on_partition_complete_sentinel( self, sentinel: airbyte_cdk.sources.streams.concurrent.partitions.types.PartitionCompleteSentinel) -> Iterable[airbyte_cdk.AirbyteMessage]:
177    def on_partition_complete_sentinel(
178        self, sentinel: PartitionCompleteSentinel
179    ) -> Iterable[AirbyteMessage]:
180        """
181        This method is called when a partition is completed.
182        1. Close the partition
183        2. If the stream is done, mark it as such and return a stream status message
184        3. Emit messages that were added to the message repository
185        4. If there are more streams to read from, start the next partition generator
186        """
187        partition = sentinel.partition
188
189        partitions_running = self._streams_to_running_partitions[partition.stream_name()]
190        if partition in partitions_running:
191            partitions_running.remove(partition)
192            # If all partitions were generated and this was the last one, the stream is done
193            if (
194                partition.stream_name() not in self._streams_currently_generating_partitions
195                and len(partitions_running) == 0
196            ):
197                yield from self._on_stream_is_done(partition.stream_name())
198                # Try to start the next stream in the queue (may be a deferred stream)
199                if self._stream_instances_to_start_partition_generation:
200                    status_message = self.start_next_partition_generator()
201                    if status_message:
202                        yield status_message
203        yield from self._message_repository.consume_queue()

This method is called when a partition is completed.

  1. Close the partition
  2. If the stream is done, mark it as such and return a stream status message
  3. Emit messages that were added to the message repository
  4. If there are more streams to read from, start the next partition generator
def on_record( self, record: airbyte_cdk.Record) -> Iterable[airbyte_cdk.AirbyteMessage]:
205    def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
206        """
207        This method is called when a record is read from a partition.
208        1. Convert the record to an AirbyteMessage
209        2. If this is the first record for the stream, mark the stream as RUNNING
210        3. Increment the record counter for the stream
211        4. Ensures the cursor knows the record has been successfully emitted
212        5. Emit the message
213        6. Emit messages that were added to the message repository
214        """
215        # Do not pass a transformer or a schema
216        # AbstractStreams are expected to return data as they are expected.
217        # Any transformation on the data should be done before reaching this point
218        message = stream_data_to_airbyte_message(
219            stream_name=record.stream_name,
220            data_or_message=record.data,
221            file_reference=record.file_reference,
222        )
223        stream = self._stream_name_to_instance[record.stream_name]
224
225        if message.type == MessageType.RECORD:
226            if self._record_counter[stream.name] == 0:
227                self._logger.info(f"Marking stream {stream.name} as RUNNING")
228                yield stream_status_as_airbyte_message(
229                    stream.as_airbyte_stream(), AirbyteStreamStatus.RUNNING
230                )
231            self._record_counter[stream.name] += 1
232        yield message
233        yield from self._message_repository.consume_queue()

This method is called when a record is read from a partition.

  1. Convert the record to an AirbyteMessage
  2. If this is the first record for the stream, mark the stream as RUNNING
  3. Increment the record counter for the stream
  4. Ensures the cursor knows the record has been successfully emitted
  5. Emit the message
  6. Emit messages that were added to the message repository
235    def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMessage]:
236        """
237        This method is called when an exception is raised.
238        1. Stop all running streams
239        2. Raise the exception
240        """
241        self._flag_exception(exception.stream_name, exception.exception)
242        self._logger.exception(
243            f"Exception while syncing stream {exception.stream_name}", exc_info=exception.exception
244        )
245
246        stream_descriptor = StreamDescriptor(name=exception.stream_name)
247        if isinstance(exception.exception, AirbyteTracedException):
248            yield exception.exception.as_airbyte_message(stream_descriptor=stream_descriptor)
249        else:
250            yield AirbyteTracedException.from_exception(
251                exception.exception,
252                stream_descriptor=stream_descriptor,
253                message=f"An unexpected error occurred in stream {exception.stream_name}: {type(exception.exception).__name__}",
254            ).as_airbyte_message()

This method is called when an exception is raised.

  1. Stop all running streams
  2. Raise the exception
def start_next_partition_generator(self) -> Optional[airbyte_cdk.AirbyteMessage]:
259    def start_next_partition_generator(self) -> Optional[AirbyteMessage]:
260        """
261        Submits the next partition generator to the thread pool.
262
263        A stream will be deferred (moved to end of queue) if:
264        1. The stream itself has block_simultaneous_read=True AND is already active
265        2. Any parent stream has block_simultaneous_read=True AND is currently active
266
267        This prevents simultaneous reads of streams that shouldn't be accessed concurrently.
268
269        :return: A status message if a partition generator was started, otherwise None
270        """
271        if not self._stream_instances_to_start_partition_generation:
272            return None
273
274        # Enforce the concurrent generator cap so at least one worker slot is always available
275        # for partition reading. Recovery is guaranteed: on_partition_generation_completed
276        # decrements the count before calling here, so the guard always passes there.
277        if (
278            self._max_concurrent_partition_generators is not None
279            and len(self._streams_currently_generating_partitions)
280            >= self._max_concurrent_partition_generators
281        ):
282            self._logger.debug(
283                f"Concurrent partition generator cap ({self._max_concurrent_partition_generators}) reached "
284                f"({len(self._streams_currently_generating_partitions)} active). Deferring next generator start."
285            )
286            return None
287
288        # Remember initial queue size to avoid infinite loops if all streams are blocked
289        max_attempts = len(self._stream_instances_to_start_partition_generation)
290        attempts = 0
291
292        while self._stream_instances_to_start_partition_generation and attempts < max_attempts:
293            attempts += 1
294
295            # Pop the first stream from the queue
296            stream = self._stream_instances_to_start_partition_generation.pop(0)
297            stream_name = stream.name
298            stream_group = self._stream_block_simultaneous_read.get(stream_name, "")
299
300            # Check if this stream has a blocking group and is already active as parent stream
301            # (i.e. being read from during partition generation for another stream)
302            if stream_group and stream_name in self._active_stream_names:
303                # Add back to the END of the queue for retry later
304                self._stream_instances_to_start_partition_generation.append(stream)
305                self._logger.info(
306                    f"Deferring stream '{stream_name}' (group '{stream_group}') because it's already active. Trying next stream."
307                )
308                continue  # Try the next stream in the queue
309
310            # Check if this stream's group is already active (another stream in the same group is running)
311            if (
312                stream_group
313                and stream_group in self._active_groups
314                and self._active_groups[stream_group]
315            ):
316                # Add back to the END of the queue for retry later
317                self._stream_instances_to_start_partition_generation.append(stream)
318                active_streams_in_group = self._active_groups[stream_group]
319                self._logger.info(
320                    f"Deferring stream '{stream_name}' (group '{stream_group}') because other stream(s) "
321                    f"{active_streams_in_group} in the same group are active. Trying next stream."
322                )
323                continue  # Try the next stream in the queue
324
325            # Check if any parent streams have a blocking group and are currently active
326            parent_streams = self._collect_all_parent_stream_names(stream_name)
327            blocked_by_parents = [
328                p
329                for p in parent_streams
330                if self._stream_block_simultaneous_read.get(p, "")
331                and p in self._active_stream_names
332            ]
333
334            if blocked_by_parents:
335                # Add back to the END of the queue for retry later
336                self._stream_instances_to_start_partition_generation.append(stream)
337                parent_groups = {
338                    self._stream_block_simultaneous_read.get(p, "") for p in blocked_by_parents
339                }
340                self._logger.info(
341                    f"Deferring stream '{stream_name}' because parent stream(s) "
342                    f"{blocked_by_parents} (groups {parent_groups}) are active. Trying next stream."
343                )
344                continue  # Try the next stream in the queue
345
346            # No blocking - start this stream
347            # Mark stream as active before starting
348            self._active_stream_names.add(stream_name)
349            self._streams_currently_generating_partitions.append(stream_name)
350
351            # Track this stream in its group if it has one
352            if stream_group:
353                if stream_group not in self._active_groups:
354                    self._active_groups[stream_group] = set()
355                self._active_groups[stream_group].add(stream_name)
356                self._logger.debug(f"Added '{stream_name}' to active group '{stream_group}'")
357
358            # Also mark all parent streams as active (they will be read from during partition generation)
359            for parent_stream_name in parent_streams:
360                parent_group = self._stream_block_simultaneous_read.get(parent_stream_name, "")
361                if parent_group:
362                    self._active_stream_names.add(parent_stream_name)
363                    if parent_group not in self._active_groups:
364                        self._active_groups[parent_group] = set()
365                    self._active_groups[parent_group].add(parent_stream_name)
366                    self._logger.info(
367                        f"Marking parent stream '{parent_stream_name}' (group '{parent_group}') as active "
368                        f"(will be read during partition generation for '{stream_name}')"
369                    )
370
371            self._thread_pool_manager.submit(self._partition_enqueuer.generate_partitions, stream)
372            self._logger.info(f"Marking stream {stream_name} as STARTED")
373            self._logger.info(f"Syncing stream: {stream_name}")
374            return stream_status_as_airbyte_message(
375                stream.as_airbyte_stream(),
376                AirbyteStreamStatus.STARTED,
377            )
378
379        # All streams in the queue are currently blocked
380        return None

Submits the next partition generator to the thread pool.

A stream will be deferred (moved to end of queue) if:

  1. The stream itself has block_simultaneous_read=True AND is already active
  2. Any parent stream has block_simultaneous_read=True AND is currently active

This prevents simultaneous reads of streams that shouldn't be accessed concurrently.

Returns

A status message if a partition generator was started, otherwise None

def is_done(self) -> bool:
382    def is_done(self) -> bool:
383        """
384        This method is called to check if the sync is done.
385        The sync is done when:
386        1. There are no more streams generating partitions
387        2. There are no more streams to read from
388        3. All partitions for all streams are closed
389        """
390        is_done = all(
391            [
392                self._is_stream_done(stream_name)
393                for stream_name in self._stream_name_to_instance.keys()
394            ]
395        )
396        if is_done and self._stream_instances_to_start_partition_generation:
397            stuck_stream_names = [
398                s.name for s in self._stream_instances_to_start_partition_generation
399            ]
400            raise AirbyteTracedException(
401                message="Partition generation queue is not empty after all streams completed.",
402                internal_message=f"Streams {stuck_stream_names} remained in the partition generation queue after all streams were marked done.",
403                failure_type=FailureType.system_error,
404            )
405        if is_done and self._active_groups:
406            raise AirbyteTracedException(
407                message="Active stream groups are not empty after all streams completed.",
408                internal_message=f"Groups {dict(self._active_groups)} still active after all streams were marked done.",
409                failure_type=FailureType.system_error,
410            )
411        if is_done and self._exceptions_per_stream_name:
412            error_message = generate_failed_streams_error_message(self._exceptions_per_stream_name)
413            self._logger.info(error_message)
414            # We still raise at least one exception when a stream raises an exception because the platform currently relies
415            # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error
416            # type because this combined error isn't actionable, but rather the previously emitted individual errors.
417            raise AirbyteTracedException(
418                message=error_message,
419                internal_message="Concurrent read failure",
420                failure_type=FailureType.config_error,
421            )
422        return is_done

This method is called to check if the sync is done. The sync is done when:

  1. There are no more streams generating partitions
  2. There are no more streams to read from
  3. All partitions for all streams are closed