airbyte_cdk.sources.concurrent_source.concurrent_read_processor

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4import logging
  5import os
  6from typing import Dict, Iterable, List, Optional, Set
  7
  8from airbyte_cdk.exception_handler import generate_failed_streams_error_message
  9from airbyte_cdk.models import AirbyteMessage, AirbyteStreamStatus, FailureType, StreamDescriptor
 10from airbyte_cdk.models import Type as MessageType
 11from airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel import (
 12    PartitionGenerationCompletedSentinel,
 13)
 14from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException
 15from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager
 16from airbyte_cdk.sources.declarative.partition_routers.grouping_partition_router import (
 17    GroupingPartitionRouter,
 18)
 19from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
 20    SubstreamPartitionRouter,
 21)
 22from airbyte_cdk.sources.message import MessageRepository
 23from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
 24from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
 25from airbyte_cdk.sources.streams.concurrent.partition_enqueuer import PartitionEnqueuer
 26from airbyte_cdk.sources.streams.concurrent.partition_reader import PartitionReader
 27from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
 28from airbyte_cdk.sources.streams.concurrent.partitions.types import PartitionCompleteSentinel
 29from airbyte_cdk.sources.types import Record
 30from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message
 31from airbyte_cdk.sources.utils.slice_logger import SliceLogger
 32from airbyte_cdk.utils import AirbyteTracedException
 33from airbyte_cdk.utils.stream_status_utils import (
 34    as_airbyte_message as stream_status_as_airbyte_message,
 35)
 36
 37
 38class ConcurrentReadProcessor:
 39    def __init__(
 40        self,
 41        stream_instances_to_read_from: List[AbstractStream],
 42        partition_enqueuer: PartitionEnqueuer,
 43        thread_pool_manager: ThreadPoolManager,
 44        logger: logging.Logger,
 45        slice_logger: SliceLogger,
 46        message_repository: MessageRepository,
 47        partition_reader: PartitionReader,
 48    ):
 49        """
 50        This class is responsible for handling items from a concurrent stream read process.
 51        :param stream_instances_to_read_from: List of streams to read from
 52        :param partition_enqueuer: PartitionEnqueuer instance
 53        :param thread_pool_manager: ThreadPoolManager instance
 54        :param logger: Logger instance
 55        :param slice_logger: SliceLogger instance
 56        :param message_repository: MessageRepository instance
 57        :param partition_reader: PartitionReader instance
 58        """
 59        self._stream_name_to_instance = {s.name: s for s in stream_instances_to_read_from}
 60        self._record_counter = {}
 61        self._streams_to_running_partitions: Dict[str, Set[Partition]] = {}
 62        for stream in stream_instances_to_read_from:
 63            self._streams_to_running_partitions[stream.name] = set()
 64            self._record_counter[stream.name] = 0
 65        self._thread_pool_manager = thread_pool_manager
 66        self._partition_enqueuer = partition_enqueuer
 67        self._stream_instances_to_start_partition_generation = stream_instances_to_read_from
 68        self._streams_currently_generating_partitions: List[str] = []
 69        self._logger = logger
 70        self._slice_logger = slice_logger
 71        self._message_repository = message_repository
 72        self._partition_reader = partition_reader
 73        self._streams_done: Set[str] = set()
 74        self._exceptions_per_stream_name: dict[str, List[Exception]] = {}
 75
 76        # Track which streams (by name) are currently active
 77        # A stream is "active" if it's generating partitions or has partitions being read
 78        self._active_stream_names: Set[str] = set()
 79
 80        # Store blocking group names for streams that require blocking simultaneous reads
 81        # Maps stream name -> group name (empty string means no blocking)
 82        self._stream_block_simultaneous_read: Dict[str, str] = {
 83            stream.name: stream.block_simultaneous_read for stream in stream_instances_to_read_from
 84        }
 85
 86        # Track which groups are currently active
 87        # Maps group name -> set of stream names in that group
 88        self._active_groups: Dict[str, Set[str]] = {}
 89
 90        for stream in stream_instances_to_read_from:
 91            if stream.block_simultaneous_read:
 92                self._logger.info(
 93                    f"Stream '{stream.name}' is in blocking group '{stream.block_simultaneous_read}'. "
 94                    f"Will defer starting this stream if another stream in the same group or its parents are active."
 95                )
 96
 97    def on_partition_generation_completed(
 98        self, sentinel: PartitionGenerationCompletedSentinel
 99    ) -> Iterable[AirbyteMessage]:
100        """
101        This method is called when a partition generation is completed.
102        1. Remove the stream from the list of streams currently generating partitions
103        2. Deactivate parent streams (they were only needed for partition generation)
104        3. If the stream is done, mark it as such and return a stream status message
105        4. If there are more streams to read from, start the next partition generator
106        """
107        stream_name = sentinel.stream.name
108        self._streams_currently_generating_partitions.remove(sentinel.stream.name)
109
110        # Deactivate all parent streams now that partition generation is complete
111        # Parents were only needed to generate slices, they can now be reused
112        parent_streams = self._collect_all_parent_stream_names(stream_name)
113        for parent_stream_name in parent_streams:
114            if parent_stream_name in self._active_stream_names:
115                self._logger.debug(f"Removing '{parent_stream_name}' from active streams")
116                self._active_stream_names.discard(parent_stream_name)
117
118                # Remove from active groups
119                parent_group = self._stream_block_simultaneous_read.get(parent_stream_name, "")
120                if parent_group:
121                    if parent_group in self._active_groups:
122                        self._active_groups[parent_group].discard(parent_stream_name)
123                        if not self._active_groups[parent_group]:
124                            del self._active_groups[parent_group]
125                    self._logger.info(
126                        f"Parent stream '{parent_stream_name}' (group '{parent_group}') deactivated after "
127                        f"partition generation completed for child '{stream_name}'. "
128                        f"Blocked streams in the queue will be retried on next start_next_partition_generator call."
129                    )
130
131        # It is possible for the stream to already be done if no partitions were generated
132        # If the partition generation process was completed and there are no partitions left to process, the stream is done
133        if (
134            self._is_stream_done(stream_name)
135            or len(self._streams_to_running_partitions[stream_name]) == 0
136        ):
137            yield from self._on_stream_is_done(stream_name)
138        if self._stream_instances_to_start_partition_generation:
139            status_message = self.start_next_partition_generator()
140            if status_message:
141                yield status_message
142
143    def on_partition(self, partition: Partition) -> None:
144        """
145        This method is called when a partition is generated.
146        1. Add the partition to the set of partitions for the stream
147        2. Log the slice if necessary
148        3. Submit the partition to the thread pool manager
149        """
150        stream_name = partition.stream_name()
151        self._streams_to_running_partitions[stream_name].add(partition)
152        cursor = self._stream_name_to_instance[stream_name].cursor
153        if self._slice_logger.should_log_slice_message(self._logger):
154            self._message_repository.emit_message(
155                self._slice_logger.create_slice_log_message(partition.to_slice())
156            )
157        self._thread_pool_manager.submit(
158            self._partition_reader.process_partition, partition, cursor
159        )
160
161    def on_partition_complete_sentinel(
162        self, sentinel: PartitionCompleteSentinel
163    ) -> Iterable[AirbyteMessage]:
164        """
165        This method is called when a partition is completed.
166        1. Close the partition
167        2. If the stream is done, mark it as such and return a stream status message
168        3. Emit messages that were added to the message repository
169        4. If there are more streams to read from, start the next partition generator
170        """
171        partition = sentinel.partition
172
173        partitions_running = self._streams_to_running_partitions[partition.stream_name()]
174        if partition in partitions_running:
175            partitions_running.remove(partition)
176            # If all partitions were generated and this was the last one, the stream is done
177            if (
178                partition.stream_name() not in self._streams_currently_generating_partitions
179                and len(partitions_running) == 0
180            ):
181                yield from self._on_stream_is_done(partition.stream_name())
182                # Try to start the next stream in the queue (may be a deferred stream)
183                if self._stream_instances_to_start_partition_generation:
184                    status_message = self.start_next_partition_generator()
185                    if status_message:
186                        yield status_message
187        yield from self._message_repository.consume_queue()
188
189    def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
190        """
191        This method is called when a record is read from a partition.
192        1. Convert the record to an AirbyteMessage
193        2. If this is the first record for the stream, mark the stream as RUNNING
194        3. Increment the record counter for the stream
195        4. Ensures the cursor knows the record has been successfully emitted
196        5. Emit the message
197        6. Emit messages that were added to the message repository
198        """
199        # Do not pass a transformer or a schema
200        # AbstractStreams are expected to return data as they are expected.
201        # Any transformation on the data should be done before reaching this point
202        message = stream_data_to_airbyte_message(
203            stream_name=record.stream_name,
204            data_or_message=record.data,
205            file_reference=record.file_reference,
206        )
207        stream = self._stream_name_to_instance[record.stream_name]
208
209        if message.type == MessageType.RECORD:
210            if self._record_counter[stream.name] == 0:
211                self._logger.info(f"Marking stream {stream.name} as RUNNING")
212                yield stream_status_as_airbyte_message(
213                    stream.as_airbyte_stream(), AirbyteStreamStatus.RUNNING
214                )
215            self._record_counter[stream.name] += 1
216        yield message
217        yield from self._message_repository.consume_queue()
218
219    def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMessage]:
220        """
221        This method is called when an exception is raised.
222        1. Stop all running streams
223        2. Raise the exception
224        """
225        self._flag_exception(exception.stream_name, exception.exception)
226        self._logger.exception(
227            f"Exception while syncing stream {exception.stream_name}", exc_info=exception.exception
228        )
229
230        stream_descriptor = StreamDescriptor(name=exception.stream_name)
231        if isinstance(exception.exception, AirbyteTracedException):
232            yield exception.exception.as_airbyte_message(stream_descriptor=stream_descriptor)
233        else:
234            yield AirbyteTracedException.from_exception(
235                exception.exception,
236                stream_descriptor=stream_descriptor,
237                message=f"An unexpected error occurred in stream {exception.stream_name}: {type(exception.exception).__name__}",
238            ).as_airbyte_message()
239
240    def _flag_exception(self, stream_name: str, exception: Exception) -> None:
241        self._exceptions_per_stream_name.setdefault(stream_name, []).append(exception)
242
243    def start_next_partition_generator(self) -> Optional[AirbyteMessage]:
244        """
245        Submits the next partition generator to the thread pool.
246
247        A stream will be deferred (moved to end of queue) if:
248        1. The stream itself has block_simultaneous_read=True AND is already active
249        2. Any parent stream has block_simultaneous_read=True AND is currently active
250
251        This prevents simultaneous reads of streams that shouldn't be accessed concurrently.
252
253        :return: A status message if a partition generator was started, otherwise None
254        """
255        if not self._stream_instances_to_start_partition_generation:
256            return None
257
258        # Remember initial queue size to avoid infinite loops if all streams are blocked
259        max_attempts = len(self._stream_instances_to_start_partition_generation)
260        attempts = 0
261
262        while self._stream_instances_to_start_partition_generation and attempts < max_attempts:
263            attempts += 1
264
265            # Pop the first stream from the queue
266            stream = self._stream_instances_to_start_partition_generation.pop(0)
267            stream_name = stream.name
268            stream_group = self._stream_block_simultaneous_read.get(stream_name, "")
269
270            # Check if this stream has a blocking group and is already active as parent stream
271            # (i.e. being read from during partition generation for another stream)
272            if stream_group and stream_name in self._active_stream_names:
273                # Add back to the END of the queue for retry later
274                self._stream_instances_to_start_partition_generation.append(stream)
275                self._logger.info(
276                    f"Deferring stream '{stream_name}' (group '{stream_group}') because it's already active. Trying next stream."
277                )
278                continue  # Try the next stream in the queue
279
280            # Check if this stream's group is already active (another stream in the same group is running)
281            if (
282                stream_group
283                and stream_group in self._active_groups
284                and self._active_groups[stream_group]
285            ):
286                # Add back to the END of the queue for retry later
287                self._stream_instances_to_start_partition_generation.append(stream)
288                active_streams_in_group = self._active_groups[stream_group]
289                self._logger.info(
290                    f"Deferring stream '{stream_name}' (group '{stream_group}') because other stream(s) "
291                    f"{active_streams_in_group} in the same group are active. Trying next stream."
292                )
293                continue  # Try the next stream in the queue
294
295            # Check if any parent streams have a blocking group and are currently active
296            parent_streams = self._collect_all_parent_stream_names(stream_name)
297            blocked_by_parents = [
298                p
299                for p in parent_streams
300                if self._stream_block_simultaneous_read.get(p, "")
301                and p in self._active_stream_names
302            ]
303
304            if blocked_by_parents:
305                # Add back to the END of the queue for retry later
306                self._stream_instances_to_start_partition_generation.append(stream)
307                parent_groups = {
308                    self._stream_block_simultaneous_read.get(p, "") for p in blocked_by_parents
309                }
310                self._logger.info(
311                    f"Deferring stream '{stream_name}' because parent stream(s) "
312                    f"{blocked_by_parents} (groups {parent_groups}) are active. Trying next stream."
313                )
314                continue  # Try the next stream in the queue
315
316            # No blocking - start this stream
317            # Mark stream as active before starting
318            self._active_stream_names.add(stream_name)
319            self._streams_currently_generating_partitions.append(stream_name)
320
321            # Track this stream in its group if it has one
322            if stream_group:
323                if stream_group not in self._active_groups:
324                    self._active_groups[stream_group] = set()
325                self._active_groups[stream_group].add(stream_name)
326                self._logger.debug(f"Added '{stream_name}' to active group '{stream_group}'")
327
328            # Also mark all parent streams as active (they will be read from during partition generation)
329            for parent_stream_name in parent_streams:
330                parent_group = self._stream_block_simultaneous_read.get(parent_stream_name, "")
331                if parent_group:
332                    self._active_stream_names.add(parent_stream_name)
333                    if parent_group not in self._active_groups:
334                        self._active_groups[parent_group] = set()
335                    self._active_groups[parent_group].add(parent_stream_name)
336                    self._logger.info(
337                        f"Marking parent stream '{parent_stream_name}' (group '{parent_group}') as active "
338                        f"(will be read during partition generation for '{stream_name}')"
339                    )
340
341            self._thread_pool_manager.submit(self._partition_enqueuer.generate_partitions, stream)
342            self._logger.info(f"Marking stream {stream_name} as STARTED")
343            self._logger.info(f"Syncing stream: {stream_name}")
344            return stream_status_as_airbyte_message(
345                stream.as_airbyte_stream(),
346                AirbyteStreamStatus.STARTED,
347            )
348
349        # All streams in the queue are currently blocked
350        return None
351
352    def is_done(self) -> bool:
353        """
354        This method is called to check if the sync is done.
355        The sync is done when:
356        1. There are no more streams generating partitions
357        2. There are no more streams to read from
358        3. All partitions for all streams are closed
359        """
360        is_done = all(
361            [
362                self._is_stream_done(stream_name)
363                for stream_name in self._stream_name_to_instance.keys()
364            ]
365        )
366        if is_done and self._stream_instances_to_start_partition_generation:
367            stuck_stream_names = [
368                s.name for s in self._stream_instances_to_start_partition_generation
369            ]
370            raise AirbyteTracedException(
371                message="Partition generation queue is not empty after all streams completed.",
372                internal_message=f"Streams {stuck_stream_names} remained in the partition generation queue after all streams were marked done.",
373                failure_type=FailureType.system_error,
374            )
375        if is_done and self._active_groups:
376            raise AirbyteTracedException(
377                message="Active stream groups are not empty after all streams completed.",
378                internal_message=f"Groups {dict(self._active_groups)} still active after all streams were marked done.",
379                failure_type=FailureType.system_error,
380            )
381        if is_done and self._exceptions_per_stream_name:
382            error_message = generate_failed_streams_error_message(self._exceptions_per_stream_name)
383            self._logger.info(error_message)
384            # We still raise at least one exception when a stream raises an exception because the platform currently relies
385            # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error
386            # type because this combined error isn't actionable, but rather the previously emitted individual errors.
387            raise AirbyteTracedException(
388                message=error_message,
389                internal_message="Concurrent read failure",
390                failure_type=FailureType.config_error,
391            )
392        return is_done
393
394    def _is_stream_done(self, stream_name: str) -> bool:
395        return stream_name in self._streams_done
396
397    def _collect_all_parent_stream_names(self, stream_name: str) -> Set[str]:
398        """Recursively collect all parent stream names for a given stream.
399
400        For example, if we have: epics -> issues -> comments
401        Then for comments, this returns {issues, epics}.
402        """
403        parent_names: Set[str] = set()
404        stream = self._stream_name_to_instance.get(stream_name)
405
406        if not stream:
407            return parent_names
408
409        partition_router = (
410            stream.get_partition_router() if isinstance(stream, DefaultStream) else None
411        )
412        if isinstance(partition_router, GroupingPartitionRouter):
413            partition_router = partition_router.underlying_partition_router
414
415        if isinstance(partition_router, SubstreamPartitionRouter):
416            for parent_config in partition_router.parent_stream_configs:
417                parent_name = parent_config.stream.name
418                parent_names.add(parent_name)
419                parent_names.update(self._collect_all_parent_stream_names(parent_name))
420
421        return parent_names
422
423    def _on_stream_is_done(self, stream_name: str) -> Iterable[AirbyteMessage]:
424        self._logger.info(
425            f"Read {self._record_counter[stream_name]} records from {stream_name} stream"
426        )
427        self._logger.info(f"Marking stream {stream_name} as STOPPED")
428        stream = self._stream_name_to_instance[stream_name]
429        stream.cursor.ensure_at_least_one_state_emitted()
430        yield from self._message_repository.consume_queue()
431        self._logger.info(f"Finished syncing {stream.name}")
432        self._streams_done.add(stream_name)
433        stream_status = (
434            AirbyteStreamStatus.INCOMPLETE
435            if self._exceptions_per_stream_name.get(stream_name, [])
436            else AirbyteStreamStatus.COMPLETE
437        )
438        yield stream_status_as_airbyte_message(stream.as_airbyte_stream(), stream_status)
439
440        # Remove only this stream from active set (NOT parents)
441        if stream_name in self._active_stream_names:
442            self._active_stream_names.discard(stream_name)
443
444            # Remove from active groups
445            stream_group = self._stream_block_simultaneous_read.get(stream_name, "")
446            if stream_group:
447                if stream_group in self._active_groups:
448                    self._active_groups[stream_group].discard(stream_name)
449                    if not self._active_groups[stream_group]:
450                        del self._active_groups[stream_group]
451                self._logger.info(
452                    f"Stream '{stream_name}' (group '{stream_group}') is no longer active. "
453                    f"Blocked streams in the queue will be retried on next start_next_partition_generator call."
454                )
class ConcurrentReadProcessor:
 39class ConcurrentReadProcessor:
 40    def __init__(
 41        self,
 42        stream_instances_to_read_from: List[AbstractStream],
 43        partition_enqueuer: PartitionEnqueuer,
 44        thread_pool_manager: ThreadPoolManager,
 45        logger: logging.Logger,
 46        slice_logger: SliceLogger,
 47        message_repository: MessageRepository,
 48        partition_reader: PartitionReader,
 49    ):
 50        """
 51        This class is responsible for handling items from a concurrent stream read process.
 52        :param stream_instances_to_read_from: List of streams to read from
 53        :param partition_enqueuer: PartitionEnqueuer instance
 54        :param thread_pool_manager: ThreadPoolManager instance
 55        :param logger: Logger instance
 56        :param slice_logger: SliceLogger instance
 57        :param message_repository: MessageRepository instance
 58        :param partition_reader: PartitionReader instance
 59        """
 60        self._stream_name_to_instance = {s.name: s for s in stream_instances_to_read_from}
 61        self._record_counter = {}
 62        self._streams_to_running_partitions: Dict[str, Set[Partition]] = {}
 63        for stream in stream_instances_to_read_from:
 64            self._streams_to_running_partitions[stream.name] = set()
 65            self._record_counter[stream.name] = 0
 66        self._thread_pool_manager = thread_pool_manager
 67        self._partition_enqueuer = partition_enqueuer
 68        self._stream_instances_to_start_partition_generation = stream_instances_to_read_from
 69        self._streams_currently_generating_partitions: List[str] = []
 70        self._logger = logger
 71        self._slice_logger = slice_logger
 72        self._message_repository = message_repository
 73        self._partition_reader = partition_reader
 74        self._streams_done: Set[str] = set()
 75        self._exceptions_per_stream_name: dict[str, List[Exception]] = {}
 76
 77        # Track which streams (by name) are currently active
 78        # A stream is "active" if it's generating partitions or has partitions being read
 79        self._active_stream_names: Set[str] = set()
 80
 81        # Store blocking group names for streams that require blocking simultaneous reads
 82        # Maps stream name -> group name (empty string means no blocking)
 83        self._stream_block_simultaneous_read: Dict[str, str] = {
 84            stream.name: stream.block_simultaneous_read for stream in stream_instances_to_read_from
 85        }
 86
 87        # Track which groups are currently active
 88        # Maps group name -> set of stream names in that group
 89        self._active_groups: Dict[str, Set[str]] = {}
 90
 91        for stream in stream_instances_to_read_from:
 92            if stream.block_simultaneous_read:
 93                self._logger.info(
 94                    f"Stream '{stream.name}' is in blocking group '{stream.block_simultaneous_read}'. "
 95                    f"Will defer starting this stream if another stream in the same group or its parents are active."
 96                )
 97
 98    def on_partition_generation_completed(
 99        self, sentinel: PartitionGenerationCompletedSentinel
100    ) -> Iterable[AirbyteMessage]:
101        """
102        This method is called when a partition generation is completed.
103        1. Remove the stream from the list of streams currently generating partitions
104        2. Deactivate parent streams (they were only needed for partition generation)
105        3. If the stream is done, mark it as such and return a stream status message
106        4. If there are more streams to read from, start the next partition generator
107        """
108        stream_name = sentinel.stream.name
109        self._streams_currently_generating_partitions.remove(sentinel.stream.name)
110
111        # Deactivate all parent streams now that partition generation is complete
112        # Parents were only needed to generate slices, they can now be reused
113        parent_streams = self._collect_all_parent_stream_names(stream_name)
114        for parent_stream_name in parent_streams:
115            if parent_stream_name in self._active_stream_names:
116                self._logger.debug(f"Removing '{parent_stream_name}' from active streams")
117                self._active_stream_names.discard(parent_stream_name)
118
119                # Remove from active groups
120                parent_group = self._stream_block_simultaneous_read.get(parent_stream_name, "")
121                if parent_group:
122                    if parent_group in self._active_groups:
123                        self._active_groups[parent_group].discard(parent_stream_name)
124                        if not self._active_groups[parent_group]:
125                            del self._active_groups[parent_group]
126                    self._logger.info(
127                        f"Parent stream '{parent_stream_name}' (group '{parent_group}') deactivated after "
128                        f"partition generation completed for child '{stream_name}'. "
129                        f"Blocked streams in the queue will be retried on next start_next_partition_generator call."
130                    )
131
132        # It is possible for the stream to already be done if no partitions were generated
133        # If the partition generation process was completed and there are no partitions left to process, the stream is done
134        if (
135            self._is_stream_done(stream_name)
136            or len(self._streams_to_running_partitions[stream_name]) == 0
137        ):
138            yield from self._on_stream_is_done(stream_name)
139        if self._stream_instances_to_start_partition_generation:
140            status_message = self.start_next_partition_generator()
141            if status_message:
142                yield status_message
143
144    def on_partition(self, partition: Partition) -> None:
145        """
146        This method is called when a partition is generated.
147        1. Add the partition to the set of partitions for the stream
148        2. Log the slice if necessary
149        3. Submit the partition to the thread pool manager
150        """
151        stream_name = partition.stream_name()
152        self._streams_to_running_partitions[stream_name].add(partition)
153        cursor = self._stream_name_to_instance[stream_name].cursor
154        if self._slice_logger.should_log_slice_message(self._logger):
155            self._message_repository.emit_message(
156                self._slice_logger.create_slice_log_message(partition.to_slice())
157            )
158        self._thread_pool_manager.submit(
159            self._partition_reader.process_partition, partition, cursor
160        )
161
162    def on_partition_complete_sentinel(
163        self, sentinel: PartitionCompleteSentinel
164    ) -> Iterable[AirbyteMessage]:
165        """
166        This method is called when a partition is completed.
167        1. Close the partition
168        2. If the stream is done, mark it as such and return a stream status message
169        3. Emit messages that were added to the message repository
170        4. If there are more streams to read from, start the next partition generator
171        """
172        partition = sentinel.partition
173
174        partitions_running = self._streams_to_running_partitions[partition.stream_name()]
175        if partition in partitions_running:
176            partitions_running.remove(partition)
177            # If all partitions were generated and this was the last one, the stream is done
178            if (
179                partition.stream_name() not in self._streams_currently_generating_partitions
180                and len(partitions_running) == 0
181            ):
182                yield from self._on_stream_is_done(partition.stream_name())
183                # Try to start the next stream in the queue (may be a deferred stream)
184                if self._stream_instances_to_start_partition_generation:
185                    status_message = self.start_next_partition_generator()
186                    if status_message:
187                        yield status_message
188        yield from self._message_repository.consume_queue()
189
190    def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
191        """
192        This method is called when a record is read from a partition.
193        1. Convert the record to an AirbyteMessage
194        2. If this is the first record for the stream, mark the stream as RUNNING
195        3. Increment the record counter for the stream
196        4. Ensures the cursor knows the record has been successfully emitted
197        5. Emit the message
198        6. Emit messages that were added to the message repository
199        """
200        # Do not pass a transformer or a schema
201        # AbstractStreams are expected to return data as they are expected.
202        # Any transformation on the data should be done before reaching this point
203        message = stream_data_to_airbyte_message(
204            stream_name=record.stream_name,
205            data_or_message=record.data,
206            file_reference=record.file_reference,
207        )
208        stream = self._stream_name_to_instance[record.stream_name]
209
210        if message.type == MessageType.RECORD:
211            if self._record_counter[stream.name] == 0:
212                self._logger.info(f"Marking stream {stream.name} as RUNNING")
213                yield stream_status_as_airbyte_message(
214                    stream.as_airbyte_stream(), AirbyteStreamStatus.RUNNING
215                )
216            self._record_counter[stream.name] += 1
217        yield message
218        yield from self._message_repository.consume_queue()
219
220    def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMessage]:
221        """
222        This method is called when an exception is raised.
223        1. Stop all running streams
224        2. Raise the exception
225        """
226        self._flag_exception(exception.stream_name, exception.exception)
227        self._logger.exception(
228            f"Exception while syncing stream {exception.stream_name}", exc_info=exception.exception
229        )
230
231        stream_descriptor = StreamDescriptor(name=exception.stream_name)
232        if isinstance(exception.exception, AirbyteTracedException):
233            yield exception.exception.as_airbyte_message(stream_descriptor=stream_descriptor)
234        else:
235            yield AirbyteTracedException.from_exception(
236                exception.exception,
237                stream_descriptor=stream_descriptor,
238                message=f"An unexpected error occurred in stream {exception.stream_name}: {type(exception.exception).__name__}",
239            ).as_airbyte_message()
240
241    def _flag_exception(self, stream_name: str, exception: Exception) -> None:
242        self._exceptions_per_stream_name.setdefault(stream_name, []).append(exception)
243
244    def start_next_partition_generator(self) -> Optional[AirbyteMessage]:
245        """
246        Submits the next partition generator to the thread pool.
247
248        A stream will be deferred (moved to end of queue) if:
249        1. The stream itself has block_simultaneous_read=True AND is already active
250        2. Any parent stream has block_simultaneous_read=True AND is currently active
251
252        This prevents simultaneous reads of streams that shouldn't be accessed concurrently.
253
254        :return: A status message if a partition generator was started, otherwise None
255        """
256        if not self._stream_instances_to_start_partition_generation:
257            return None
258
259        # Remember initial queue size to avoid infinite loops if all streams are blocked
260        max_attempts = len(self._stream_instances_to_start_partition_generation)
261        attempts = 0
262
263        while self._stream_instances_to_start_partition_generation and attempts < max_attempts:
264            attempts += 1
265
266            # Pop the first stream from the queue
267            stream = self._stream_instances_to_start_partition_generation.pop(0)
268            stream_name = stream.name
269            stream_group = self._stream_block_simultaneous_read.get(stream_name, "")
270
271            # Check if this stream has a blocking group and is already active as parent stream
272            # (i.e. being read from during partition generation for another stream)
273            if stream_group and stream_name in self._active_stream_names:
274                # Add back to the END of the queue for retry later
275                self._stream_instances_to_start_partition_generation.append(stream)
276                self._logger.info(
277                    f"Deferring stream '{stream_name}' (group '{stream_group}') because it's already active. Trying next stream."
278                )
279                continue  # Try the next stream in the queue
280
281            # Check if this stream's group is already active (another stream in the same group is running)
282            if (
283                stream_group
284                and stream_group in self._active_groups
285                and self._active_groups[stream_group]
286            ):
287                # Add back to the END of the queue for retry later
288                self._stream_instances_to_start_partition_generation.append(stream)
289                active_streams_in_group = self._active_groups[stream_group]
290                self._logger.info(
291                    f"Deferring stream '{stream_name}' (group '{stream_group}') because other stream(s) "
292                    f"{active_streams_in_group} in the same group are active. Trying next stream."
293                )
294                continue  # Try the next stream in the queue
295
296            # Check if any parent streams have a blocking group and are currently active
297            parent_streams = self._collect_all_parent_stream_names(stream_name)
298            blocked_by_parents = [
299                p
300                for p in parent_streams
301                if self._stream_block_simultaneous_read.get(p, "")
302                and p in self._active_stream_names
303            ]
304
305            if blocked_by_parents:
306                # Add back to the END of the queue for retry later
307                self._stream_instances_to_start_partition_generation.append(stream)
308                parent_groups = {
309                    self._stream_block_simultaneous_read.get(p, "") for p in blocked_by_parents
310                }
311                self._logger.info(
312                    f"Deferring stream '{stream_name}' because parent stream(s) "
313                    f"{blocked_by_parents} (groups {parent_groups}) are active. Trying next stream."
314                )
315                continue  # Try the next stream in the queue
316
317            # No blocking - start this stream
318            # Mark stream as active before starting
319            self._active_stream_names.add(stream_name)
320            self._streams_currently_generating_partitions.append(stream_name)
321
322            # Track this stream in its group if it has one
323            if stream_group:
324                if stream_group not in self._active_groups:
325                    self._active_groups[stream_group] = set()
326                self._active_groups[stream_group].add(stream_name)
327                self._logger.debug(f"Added '{stream_name}' to active group '{stream_group}'")
328
329            # Also mark all parent streams as active (they will be read from during partition generation)
330            for parent_stream_name in parent_streams:
331                parent_group = self._stream_block_simultaneous_read.get(parent_stream_name, "")
332                if parent_group:
333                    self._active_stream_names.add(parent_stream_name)
334                    if parent_group not in self._active_groups:
335                        self._active_groups[parent_group] = set()
336                    self._active_groups[parent_group].add(parent_stream_name)
337                    self._logger.info(
338                        f"Marking parent stream '{parent_stream_name}' (group '{parent_group}') as active "
339                        f"(will be read during partition generation for '{stream_name}')"
340                    )
341
342            self._thread_pool_manager.submit(self._partition_enqueuer.generate_partitions, stream)
343            self._logger.info(f"Marking stream {stream_name} as STARTED")
344            self._logger.info(f"Syncing stream: {stream_name}")
345            return stream_status_as_airbyte_message(
346                stream.as_airbyte_stream(),
347                AirbyteStreamStatus.STARTED,
348            )
349
350        # All streams in the queue are currently blocked
351        return None
352
353    def is_done(self) -> bool:
354        """
355        This method is called to check if the sync is done.
356        The sync is done when:
357        1. There are no more streams generating partitions
358        2. There are no more streams to read from
359        3. All partitions for all streams are closed
360        """
361        is_done = all(
362            [
363                self._is_stream_done(stream_name)
364                for stream_name in self._stream_name_to_instance.keys()
365            ]
366        )
367        if is_done and self._stream_instances_to_start_partition_generation:
368            stuck_stream_names = [
369                s.name for s in self._stream_instances_to_start_partition_generation
370            ]
371            raise AirbyteTracedException(
372                message="Partition generation queue is not empty after all streams completed.",
373                internal_message=f"Streams {stuck_stream_names} remained in the partition generation queue after all streams were marked done.",
374                failure_type=FailureType.system_error,
375            )
376        if is_done and self._active_groups:
377            raise AirbyteTracedException(
378                message="Active stream groups are not empty after all streams completed.",
379                internal_message=f"Groups {dict(self._active_groups)} still active after all streams were marked done.",
380                failure_type=FailureType.system_error,
381            )
382        if is_done and self._exceptions_per_stream_name:
383            error_message = generate_failed_streams_error_message(self._exceptions_per_stream_name)
384            self._logger.info(error_message)
385            # We still raise at least one exception when a stream raises an exception because the platform currently relies
386            # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error
387            # type because this combined error isn't actionable, but rather the previously emitted individual errors.
388            raise AirbyteTracedException(
389                message=error_message,
390                internal_message="Concurrent read failure",
391                failure_type=FailureType.config_error,
392            )
393        return is_done
394
395    def _is_stream_done(self, stream_name: str) -> bool:
396        return stream_name in self._streams_done
397
398    def _collect_all_parent_stream_names(self, stream_name: str) -> Set[str]:
399        """Recursively collect all parent stream names for a given stream.
400
401        For example, if we have: epics -> issues -> comments
402        Then for comments, this returns {issues, epics}.
403        """
404        parent_names: Set[str] = set()
405        stream = self._stream_name_to_instance.get(stream_name)
406
407        if not stream:
408            return parent_names
409
410        partition_router = (
411            stream.get_partition_router() if isinstance(stream, DefaultStream) else None
412        )
413        if isinstance(partition_router, GroupingPartitionRouter):
414            partition_router = partition_router.underlying_partition_router
415
416        if isinstance(partition_router, SubstreamPartitionRouter):
417            for parent_config in partition_router.parent_stream_configs:
418                parent_name = parent_config.stream.name
419                parent_names.add(parent_name)
420                parent_names.update(self._collect_all_parent_stream_names(parent_name))
421
422        return parent_names
423
424    def _on_stream_is_done(self, stream_name: str) -> Iterable[AirbyteMessage]:
425        self._logger.info(
426            f"Read {self._record_counter[stream_name]} records from {stream_name} stream"
427        )
428        self._logger.info(f"Marking stream {stream_name} as STOPPED")
429        stream = self._stream_name_to_instance[stream_name]
430        stream.cursor.ensure_at_least_one_state_emitted()
431        yield from self._message_repository.consume_queue()
432        self._logger.info(f"Finished syncing {stream.name}")
433        self._streams_done.add(stream_name)
434        stream_status = (
435            AirbyteStreamStatus.INCOMPLETE
436            if self._exceptions_per_stream_name.get(stream_name, [])
437            else AirbyteStreamStatus.COMPLETE
438        )
439        yield stream_status_as_airbyte_message(stream.as_airbyte_stream(), stream_status)
440
441        # Remove only this stream from active set (NOT parents)
442        if stream_name in self._active_stream_names:
443            self._active_stream_names.discard(stream_name)
444
445            # Remove from active groups
446            stream_group = self._stream_block_simultaneous_read.get(stream_name, "")
447            if stream_group:
448                if stream_group in self._active_groups:
449                    self._active_groups[stream_group].discard(stream_name)
450                    if not self._active_groups[stream_group]:
451                        del self._active_groups[stream_group]
452                self._logger.info(
453                    f"Stream '{stream_name}' (group '{stream_group}') is no longer active. "
454                    f"Blocked streams in the queue will be retried on next start_next_partition_generator call."
455                )
40    def __init__(
41        self,
42        stream_instances_to_read_from: List[AbstractStream],
43        partition_enqueuer: PartitionEnqueuer,
44        thread_pool_manager: ThreadPoolManager,
45        logger: logging.Logger,
46        slice_logger: SliceLogger,
47        message_repository: MessageRepository,
48        partition_reader: PartitionReader,
49    ):
50        """
51        This class is responsible for handling items from a concurrent stream read process.
52        :param stream_instances_to_read_from: List of streams to read from
53        :param partition_enqueuer: PartitionEnqueuer instance
54        :param thread_pool_manager: ThreadPoolManager instance
55        :param logger: Logger instance
56        :param slice_logger: SliceLogger instance
57        :param message_repository: MessageRepository instance
58        :param partition_reader: PartitionReader instance
59        """
60        self._stream_name_to_instance = {s.name: s for s in stream_instances_to_read_from}
61        self._record_counter = {}
62        self._streams_to_running_partitions: Dict[str, Set[Partition]] = {}
63        for stream in stream_instances_to_read_from:
64            self._streams_to_running_partitions[stream.name] = set()
65            self._record_counter[stream.name] = 0
66        self._thread_pool_manager = thread_pool_manager
67        self._partition_enqueuer = partition_enqueuer
68        self._stream_instances_to_start_partition_generation = stream_instances_to_read_from
69        self._streams_currently_generating_partitions: List[str] = []
70        self._logger = logger
71        self._slice_logger = slice_logger
72        self._message_repository = message_repository
73        self._partition_reader = partition_reader
74        self._streams_done: Set[str] = set()
75        self._exceptions_per_stream_name: dict[str, List[Exception]] = {}
76
77        # Track which streams (by name) are currently active
78        # A stream is "active" if it's generating partitions or has partitions being read
79        self._active_stream_names: Set[str] = set()
80
81        # Store blocking group names for streams that require blocking simultaneous reads
82        # Maps stream name -> group name (empty string means no blocking)
83        self._stream_block_simultaneous_read: Dict[str, str] = {
84            stream.name: stream.block_simultaneous_read for stream in stream_instances_to_read_from
85        }
86
87        # Track which groups are currently active
88        # Maps group name -> set of stream names in that group
89        self._active_groups: Dict[str, Set[str]] = {}
90
91        for stream in stream_instances_to_read_from:
92            if stream.block_simultaneous_read:
93                self._logger.info(
94                    f"Stream '{stream.name}' is in blocking group '{stream.block_simultaneous_read}'. "
95                    f"Will defer starting this stream if another stream in the same group or its parents are active."
96                )

This class is responsible for handling items from a concurrent stream read process.

Parameters
  • stream_instances_to_read_from: List of streams to read from
  • partition_enqueuer: PartitionEnqueuer instance
  • thread_pool_manager: ThreadPoolManager instance
  • logger: Logger instance
  • slice_logger: SliceLogger instance
  • message_repository: MessageRepository instance
  • partition_reader: PartitionReader instance
 98    def on_partition_generation_completed(
 99        self, sentinel: PartitionGenerationCompletedSentinel
100    ) -> Iterable[AirbyteMessage]:
101        """
102        This method is called when a partition generation is completed.
103        1. Remove the stream from the list of streams currently generating partitions
104        2. Deactivate parent streams (they were only needed for partition generation)
105        3. If the stream is done, mark it as such and return a stream status message
106        4. If there are more streams to read from, start the next partition generator
107        """
108        stream_name = sentinel.stream.name
109        self._streams_currently_generating_partitions.remove(sentinel.stream.name)
110
111        # Deactivate all parent streams now that partition generation is complete
112        # Parents were only needed to generate slices, they can now be reused
113        parent_streams = self._collect_all_parent_stream_names(stream_name)
114        for parent_stream_name in parent_streams:
115            if parent_stream_name in self._active_stream_names:
116                self._logger.debug(f"Removing '{parent_stream_name}' from active streams")
117                self._active_stream_names.discard(parent_stream_name)
118
119                # Remove from active groups
120                parent_group = self._stream_block_simultaneous_read.get(parent_stream_name, "")
121                if parent_group:
122                    if parent_group in self._active_groups:
123                        self._active_groups[parent_group].discard(parent_stream_name)
124                        if not self._active_groups[parent_group]:
125                            del self._active_groups[parent_group]
126                    self._logger.info(
127                        f"Parent stream '{parent_stream_name}' (group '{parent_group}') deactivated after "
128                        f"partition generation completed for child '{stream_name}'. "
129                        f"Blocked streams in the queue will be retried on next start_next_partition_generator call."
130                    )
131
132        # It is possible for the stream to already be done if no partitions were generated
133        # If the partition generation process was completed and there are no partitions left to process, the stream is done
134        if (
135            self._is_stream_done(stream_name)
136            or len(self._streams_to_running_partitions[stream_name]) == 0
137        ):
138            yield from self._on_stream_is_done(stream_name)
139        if self._stream_instances_to_start_partition_generation:
140            status_message = self.start_next_partition_generator()
141            if status_message:
142                yield status_message

This method is called when a partition generation is completed.

  1. Remove the stream from the list of streams currently generating partitions
  2. Deactivate parent streams (they were only needed for partition generation)
  3. If the stream is done, mark it as such and return a stream status message
  4. If there are more streams to read from, start the next partition generator
def on_partition( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition) -> None:
144    def on_partition(self, partition: Partition) -> None:
145        """
146        This method is called when a partition is generated.
147        1. Add the partition to the set of partitions for the stream
148        2. Log the slice if necessary
149        3. Submit the partition to the thread pool manager
150        """
151        stream_name = partition.stream_name()
152        self._streams_to_running_partitions[stream_name].add(partition)
153        cursor = self._stream_name_to_instance[stream_name].cursor
154        if self._slice_logger.should_log_slice_message(self._logger):
155            self._message_repository.emit_message(
156                self._slice_logger.create_slice_log_message(partition.to_slice())
157            )
158        self._thread_pool_manager.submit(
159            self._partition_reader.process_partition, partition, cursor
160        )

This method is called when a partition is generated.

  1. Add the partition to the set of partitions for the stream
  2. Log the slice if necessary
  3. Submit the partition to the thread pool manager
def on_partition_complete_sentinel( self, sentinel: airbyte_cdk.sources.streams.concurrent.partitions.types.PartitionCompleteSentinel) -> Iterable[airbyte_cdk.AirbyteMessage]:
162    def on_partition_complete_sentinel(
163        self, sentinel: PartitionCompleteSentinel
164    ) -> Iterable[AirbyteMessage]:
165        """
166        This method is called when a partition is completed.
167        1. Close the partition
168        2. If the stream is done, mark it as such and return a stream status message
169        3. Emit messages that were added to the message repository
170        4. If there are more streams to read from, start the next partition generator
171        """
172        partition = sentinel.partition
173
174        partitions_running = self._streams_to_running_partitions[partition.stream_name()]
175        if partition in partitions_running:
176            partitions_running.remove(partition)
177            # If all partitions were generated and this was the last one, the stream is done
178            if (
179                partition.stream_name() not in self._streams_currently_generating_partitions
180                and len(partitions_running) == 0
181            ):
182                yield from self._on_stream_is_done(partition.stream_name())
183                # Try to start the next stream in the queue (may be a deferred stream)
184                if self._stream_instances_to_start_partition_generation:
185                    status_message = self.start_next_partition_generator()
186                    if status_message:
187                        yield status_message
188        yield from self._message_repository.consume_queue()

This method is called when a partition is completed.

  1. Close the partition
  2. If the stream is done, mark it as such and return a stream status message
  3. Emit messages that were added to the message repository
  4. If there are more streams to read from, start the next partition generator
def on_record( self, record: airbyte_cdk.Record) -> Iterable[airbyte_cdk.AirbyteMessage]:
190    def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
191        """
192        This method is called when a record is read from a partition.
193        1. Convert the record to an AirbyteMessage
194        2. If this is the first record for the stream, mark the stream as RUNNING
195        3. Increment the record counter for the stream
196        4. Ensures the cursor knows the record has been successfully emitted
197        5. Emit the message
198        6. Emit messages that were added to the message repository
199        """
200        # Do not pass a transformer or a schema
201        # AbstractStreams are expected to return data as they are expected.
202        # Any transformation on the data should be done before reaching this point
203        message = stream_data_to_airbyte_message(
204            stream_name=record.stream_name,
205            data_or_message=record.data,
206            file_reference=record.file_reference,
207        )
208        stream = self._stream_name_to_instance[record.stream_name]
209
210        if message.type == MessageType.RECORD:
211            if self._record_counter[stream.name] == 0:
212                self._logger.info(f"Marking stream {stream.name} as RUNNING")
213                yield stream_status_as_airbyte_message(
214                    stream.as_airbyte_stream(), AirbyteStreamStatus.RUNNING
215                )
216            self._record_counter[stream.name] += 1
217        yield message
218        yield from self._message_repository.consume_queue()

This method is called when a record is read from a partition.

  1. Convert the record to an AirbyteMessage
  2. If this is the first record for the stream, mark the stream as RUNNING
  3. Increment the record counter for the stream
  4. Ensures the cursor knows the record has been successfully emitted
  5. Emit the message
  6. Emit messages that were added to the message repository
220    def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMessage]:
221        """
222        This method is called when an exception is raised.
223        1. Stop all running streams
224        2. Raise the exception
225        """
226        self._flag_exception(exception.stream_name, exception.exception)
227        self._logger.exception(
228            f"Exception while syncing stream {exception.stream_name}", exc_info=exception.exception
229        )
230
231        stream_descriptor = StreamDescriptor(name=exception.stream_name)
232        if isinstance(exception.exception, AirbyteTracedException):
233            yield exception.exception.as_airbyte_message(stream_descriptor=stream_descriptor)
234        else:
235            yield AirbyteTracedException.from_exception(
236                exception.exception,
237                stream_descriptor=stream_descriptor,
238                message=f"An unexpected error occurred in stream {exception.stream_name}: {type(exception.exception).__name__}",
239            ).as_airbyte_message()

This method is called when an exception is raised.

  1. Stop all running streams
  2. Raise the exception
def start_next_partition_generator(self) -> Optional[airbyte_cdk.AirbyteMessage]:
244    def start_next_partition_generator(self) -> Optional[AirbyteMessage]:
245        """
246        Submits the next partition generator to the thread pool.
247
248        A stream will be deferred (moved to end of queue) if:
249        1. The stream itself has block_simultaneous_read=True AND is already active
250        2. Any parent stream has block_simultaneous_read=True AND is currently active
251
252        This prevents simultaneous reads of streams that shouldn't be accessed concurrently.
253
254        :return: A status message if a partition generator was started, otherwise None
255        """
256        if not self._stream_instances_to_start_partition_generation:
257            return None
258
259        # Remember initial queue size to avoid infinite loops if all streams are blocked
260        max_attempts = len(self._stream_instances_to_start_partition_generation)
261        attempts = 0
262
263        while self._stream_instances_to_start_partition_generation and attempts < max_attempts:
264            attempts += 1
265
266            # Pop the first stream from the queue
267            stream = self._stream_instances_to_start_partition_generation.pop(0)
268            stream_name = stream.name
269            stream_group = self._stream_block_simultaneous_read.get(stream_name, "")
270
271            # Check if this stream has a blocking group and is already active as parent stream
272            # (i.e. being read from during partition generation for another stream)
273            if stream_group and stream_name in self._active_stream_names:
274                # Add back to the END of the queue for retry later
275                self._stream_instances_to_start_partition_generation.append(stream)
276                self._logger.info(
277                    f"Deferring stream '{stream_name}' (group '{stream_group}') because it's already active. Trying next stream."
278                )
279                continue  # Try the next stream in the queue
280
281            # Check if this stream's group is already active (another stream in the same group is running)
282            if (
283                stream_group
284                and stream_group in self._active_groups
285                and self._active_groups[stream_group]
286            ):
287                # Add back to the END of the queue for retry later
288                self._stream_instances_to_start_partition_generation.append(stream)
289                active_streams_in_group = self._active_groups[stream_group]
290                self._logger.info(
291                    f"Deferring stream '{stream_name}' (group '{stream_group}') because other stream(s) "
292                    f"{active_streams_in_group} in the same group are active. Trying next stream."
293                )
294                continue  # Try the next stream in the queue
295
296            # Check if any parent streams have a blocking group and are currently active
297            parent_streams = self._collect_all_parent_stream_names(stream_name)
298            blocked_by_parents = [
299                p
300                for p in parent_streams
301                if self._stream_block_simultaneous_read.get(p, "")
302                and p in self._active_stream_names
303            ]
304
305            if blocked_by_parents:
306                # Add back to the END of the queue for retry later
307                self._stream_instances_to_start_partition_generation.append(stream)
308                parent_groups = {
309                    self._stream_block_simultaneous_read.get(p, "") for p in blocked_by_parents
310                }
311                self._logger.info(
312                    f"Deferring stream '{stream_name}' because parent stream(s) "
313                    f"{blocked_by_parents} (groups {parent_groups}) are active. Trying next stream."
314                )
315                continue  # Try the next stream in the queue
316
317            # No blocking - start this stream
318            # Mark stream as active before starting
319            self._active_stream_names.add(stream_name)
320            self._streams_currently_generating_partitions.append(stream_name)
321
322            # Track this stream in its group if it has one
323            if stream_group:
324                if stream_group not in self._active_groups:
325                    self._active_groups[stream_group] = set()
326                self._active_groups[stream_group].add(stream_name)
327                self._logger.debug(f"Added '{stream_name}' to active group '{stream_group}'")
328
329            # Also mark all parent streams as active (they will be read from during partition generation)
330            for parent_stream_name in parent_streams:
331                parent_group = self._stream_block_simultaneous_read.get(parent_stream_name, "")
332                if parent_group:
333                    self._active_stream_names.add(parent_stream_name)
334                    if parent_group not in self._active_groups:
335                        self._active_groups[parent_group] = set()
336                    self._active_groups[parent_group].add(parent_stream_name)
337                    self._logger.info(
338                        f"Marking parent stream '{parent_stream_name}' (group '{parent_group}') as active "
339                        f"(will be read during partition generation for '{stream_name}')"
340                    )
341
342            self._thread_pool_manager.submit(self._partition_enqueuer.generate_partitions, stream)
343            self._logger.info(f"Marking stream {stream_name} as STARTED")
344            self._logger.info(f"Syncing stream: {stream_name}")
345            return stream_status_as_airbyte_message(
346                stream.as_airbyte_stream(),
347                AirbyteStreamStatus.STARTED,
348            )
349
350        # All streams in the queue are currently blocked
351        return None

Submits the next partition generator to the thread pool.

A stream will be deferred (moved to end of queue) if:

  1. The stream itself has block_simultaneous_read=True AND is already active
  2. Any parent stream has block_simultaneous_read=True AND is currently active

This prevents simultaneous reads of streams that shouldn't be accessed concurrently.

Returns

A status message if a partition generator was started, otherwise None

def is_done(self) -> bool:
353    def is_done(self) -> bool:
354        """
355        This method is called to check if the sync is done.
356        The sync is done when:
357        1. There are no more streams generating partitions
358        2. There are no more streams to read from
359        3. All partitions for all streams are closed
360        """
361        is_done = all(
362            [
363                self._is_stream_done(stream_name)
364                for stream_name in self._stream_name_to_instance.keys()
365            ]
366        )
367        if is_done and self._stream_instances_to_start_partition_generation:
368            stuck_stream_names = [
369                s.name for s in self._stream_instances_to_start_partition_generation
370            ]
371            raise AirbyteTracedException(
372                message="Partition generation queue is not empty after all streams completed.",
373                internal_message=f"Streams {stuck_stream_names} remained in the partition generation queue after all streams were marked done.",
374                failure_type=FailureType.system_error,
375            )
376        if is_done and self._active_groups:
377            raise AirbyteTracedException(
378                message="Active stream groups are not empty after all streams completed.",
379                internal_message=f"Groups {dict(self._active_groups)} still active after all streams were marked done.",
380                failure_type=FailureType.system_error,
381            )
382        if is_done and self._exceptions_per_stream_name:
383            error_message = generate_failed_streams_error_message(self._exceptions_per_stream_name)
384            self._logger.info(error_message)
385            # We still raise at least one exception when a stream raises an exception because the platform currently relies
386            # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error
387            # type because this combined error isn't actionable, but rather the previously emitted individual errors.
388            raise AirbyteTracedException(
389                message=error_message,
390                internal_message="Concurrent read failure",
391                failure_type=FailureType.config_error,
392            )
393        return is_done

This method is called to check if the sync is done. The sync is done when:

  1. There are no more streams generating partitions
  2. There are no more streams to read from
  3. All partitions for all streams are closed