airbyte_cdk.sources.concurrent_source.concurrent_read_processor
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4import logging 5import os 6from typing import Dict, Iterable, List, Optional, Set 7 8from airbyte_cdk.exception_handler import generate_failed_streams_error_message 9from airbyte_cdk.models import AirbyteMessage, AirbyteStreamStatus, FailureType, StreamDescriptor 10from airbyte_cdk.models import Type as MessageType 11from airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel import ( 12 PartitionGenerationCompletedSentinel, 13) 14from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException 15from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager 16from airbyte_cdk.sources.declarative.partition_routers.grouping_partition_router import ( 17 GroupingPartitionRouter, 18) 19from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import ( 20 SubstreamPartitionRouter, 21) 22from airbyte_cdk.sources.message import MessageRepository 23from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream 24from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream 25from airbyte_cdk.sources.streams.concurrent.partition_enqueuer import PartitionEnqueuer 26from airbyte_cdk.sources.streams.concurrent.partition_reader import PartitionReader 27from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition 28from airbyte_cdk.sources.streams.concurrent.partitions.types import PartitionCompleteSentinel 29from airbyte_cdk.sources.types import Record 30from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message 31from airbyte_cdk.sources.utils.slice_logger import SliceLogger 32from airbyte_cdk.utils import AirbyteTracedException 33from airbyte_cdk.utils.stream_status_utils import ( 34 as_airbyte_message as stream_status_as_airbyte_message, 35) 36 37 38class ConcurrentReadProcessor: 39 def __init__( 40 self, 41 stream_instances_to_read_from: List[AbstractStream], 42 partition_enqueuer: PartitionEnqueuer, 43 thread_pool_manager: ThreadPoolManager, 44 logger: logging.Logger, 45 slice_logger: SliceLogger, 46 message_repository: MessageRepository, 47 partition_reader: PartitionReader, 48 ): 49 """ 50 This class is responsible for handling items from a concurrent stream read process. 51 :param stream_instances_to_read_from: List of streams to read from 52 :param partition_enqueuer: PartitionEnqueuer instance 53 :param thread_pool_manager: ThreadPoolManager instance 54 :param logger: Logger instance 55 :param slice_logger: SliceLogger instance 56 :param message_repository: MessageRepository instance 57 :param partition_reader: PartitionReader instance 58 """ 59 self._stream_name_to_instance = {s.name: s for s in stream_instances_to_read_from} 60 self._record_counter = {} 61 self._streams_to_running_partitions: Dict[str, Set[Partition]] = {} 62 for stream in stream_instances_to_read_from: 63 self._streams_to_running_partitions[stream.name] = set() 64 self._record_counter[stream.name] = 0 65 self._thread_pool_manager = thread_pool_manager 66 self._partition_enqueuer = partition_enqueuer 67 self._stream_instances_to_start_partition_generation = stream_instances_to_read_from 68 self._streams_currently_generating_partitions: List[str] = [] 69 self._logger = logger 70 self._slice_logger = slice_logger 71 self._message_repository = message_repository 72 self._partition_reader = partition_reader 73 self._streams_done: Set[str] = set() 74 self._exceptions_per_stream_name: dict[str, List[Exception]] = {} 75 76 # Track which streams (by name) are currently active 77 # A stream is "active" if it's generating partitions or has partitions being read 78 self._active_stream_names: Set[str] = set() 79 80 # Store blocking group names for streams that require blocking simultaneous reads 81 # Maps stream name -> group name (empty string means no blocking) 82 self._stream_block_simultaneous_read: Dict[str, str] = { 83 stream.name: stream.block_simultaneous_read for stream in stream_instances_to_read_from 84 } 85 86 # Track which groups are currently active 87 # Maps group name -> set of stream names in that group 88 self._active_groups: Dict[str, Set[str]] = {} 89 90 for stream in stream_instances_to_read_from: 91 if stream.block_simultaneous_read: 92 self._logger.info( 93 f"Stream '{stream.name}' is in blocking group '{stream.block_simultaneous_read}'. " 94 f"Will defer starting this stream if another stream in the same group or its parents are active." 95 ) 96 97 def on_partition_generation_completed( 98 self, sentinel: PartitionGenerationCompletedSentinel 99 ) -> Iterable[AirbyteMessage]: 100 """ 101 This method is called when a partition generation is completed. 102 1. Remove the stream from the list of streams currently generating partitions 103 2. Deactivate parent streams (they were only needed for partition generation) 104 3. If the stream is done, mark it as such and return a stream status message 105 4. If there are more streams to read from, start the next partition generator 106 """ 107 stream_name = sentinel.stream.name 108 self._streams_currently_generating_partitions.remove(sentinel.stream.name) 109 110 # Deactivate all parent streams now that partition generation is complete 111 # Parents were only needed to generate slices, they can now be reused 112 parent_streams = self._collect_all_parent_stream_names(stream_name) 113 for parent_stream_name in parent_streams: 114 if parent_stream_name in self._active_stream_names: 115 self._logger.debug(f"Removing '{parent_stream_name}' from active streams") 116 self._active_stream_names.discard(parent_stream_name) 117 118 # Remove from active groups 119 parent_group = self._stream_block_simultaneous_read.get(parent_stream_name, "") 120 if parent_group: 121 if parent_group in self._active_groups: 122 self._active_groups[parent_group].discard(parent_stream_name) 123 if not self._active_groups[parent_group]: 124 del self._active_groups[parent_group] 125 self._logger.info( 126 f"Parent stream '{parent_stream_name}' (group '{parent_group}') deactivated after " 127 f"partition generation completed for child '{stream_name}'. " 128 f"Blocked streams in the queue will be retried on next start_next_partition_generator call." 129 ) 130 131 # It is possible for the stream to already be done if no partitions were generated 132 # If the partition generation process was completed and there are no partitions left to process, the stream is done 133 if ( 134 self._is_stream_done(stream_name) 135 or len(self._streams_to_running_partitions[stream_name]) == 0 136 ): 137 yield from self._on_stream_is_done(stream_name) 138 if self._stream_instances_to_start_partition_generation: 139 status_message = self.start_next_partition_generator() 140 if status_message: 141 yield status_message 142 143 def on_partition(self, partition: Partition) -> None: 144 """ 145 This method is called when a partition is generated. 146 1. Add the partition to the set of partitions for the stream 147 2. Log the slice if necessary 148 3. Submit the partition to the thread pool manager 149 """ 150 stream_name = partition.stream_name() 151 self._streams_to_running_partitions[stream_name].add(partition) 152 cursor = self._stream_name_to_instance[stream_name].cursor 153 if self._slice_logger.should_log_slice_message(self._logger): 154 self._message_repository.emit_message( 155 self._slice_logger.create_slice_log_message(partition.to_slice()) 156 ) 157 self._thread_pool_manager.submit( 158 self._partition_reader.process_partition, partition, cursor 159 ) 160 161 def on_partition_complete_sentinel( 162 self, sentinel: PartitionCompleteSentinel 163 ) -> Iterable[AirbyteMessage]: 164 """ 165 This method is called when a partition is completed. 166 1. Close the partition 167 2. If the stream is done, mark it as such and return a stream status message 168 3. Emit messages that were added to the message repository 169 4. If there are more streams to read from, start the next partition generator 170 """ 171 partition = sentinel.partition 172 173 partitions_running = self._streams_to_running_partitions[partition.stream_name()] 174 if partition in partitions_running: 175 partitions_running.remove(partition) 176 # If all partitions were generated and this was the last one, the stream is done 177 if ( 178 partition.stream_name() not in self._streams_currently_generating_partitions 179 and len(partitions_running) == 0 180 ): 181 yield from self._on_stream_is_done(partition.stream_name()) 182 # Try to start the next stream in the queue (may be a deferred stream) 183 if self._stream_instances_to_start_partition_generation: 184 status_message = self.start_next_partition_generator() 185 if status_message: 186 yield status_message 187 yield from self._message_repository.consume_queue() 188 189 def on_record(self, record: Record) -> Iterable[AirbyteMessage]: 190 """ 191 This method is called when a record is read from a partition. 192 1. Convert the record to an AirbyteMessage 193 2. If this is the first record for the stream, mark the stream as RUNNING 194 3. Increment the record counter for the stream 195 4. Ensures the cursor knows the record has been successfully emitted 196 5. Emit the message 197 6. Emit messages that were added to the message repository 198 """ 199 # Do not pass a transformer or a schema 200 # AbstractStreams are expected to return data as they are expected. 201 # Any transformation on the data should be done before reaching this point 202 message = stream_data_to_airbyte_message( 203 stream_name=record.stream_name, 204 data_or_message=record.data, 205 file_reference=record.file_reference, 206 ) 207 stream = self._stream_name_to_instance[record.stream_name] 208 209 if message.type == MessageType.RECORD: 210 if self._record_counter[stream.name] == 0: 211 self._logger.info(f"Marking stream {stream.name} as RUNNING") 212 yield stream_status_as_airbyte_message( 213 stream.as_airbyte_stream(), AirbyteStreamStatus.RUNNING 214 ) 215 self._record_counter[stream.name] += 1 216 yield message 217 yield from self._message_repository.consume_queue() 218 219 def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMessage]: 220 """ 221 This method is called when an exception is raised. 222 1. Stop all running streams 223 2. Raise the exception 224 """ 225 self._flag_exception(exception.stream_name, exception.exception) 226 self._logger.exception( 227 f"Exception while syncing stream {exception.stream_name}", exc_info=exception.exception 228 ) 229 230 stream_descriptor = StreamDescriptor(name=exception.stream_name) 231 if isinstance(exception.exception, AirbyteTracedException): 232 yield exception.exception.as_airbyte_message(stream_descriptor=stream_descriptor) 233 else: 234 yield AirbyteTracedException.from_exception( 235 exception.exception, 236 stream_descriptor=stream_descriptor, 237 message=f"An unexpected error occurred in stream {exception.stream_name}: {type(exception.exception).__name__}", 238 ).as_airbyte_message() 239 240 def _flag_exception(self, stream_name: str, exception: Exception) -> None: 241 self._exceptions_per_stream_name.setdefault(stream_name, []).append(exception) 242 243 def start_next_partition_generator(self) -> Optional[AirbyteMessage]: 244 """ 245 Submits the next partition generator to the thread pool. 246 247 A stream will be deferred (moved to end of queue) if: 248 1. The stream itself has block_simultaneous_read=True AND is already active 249 2. Any parent stream has block_simultaneous_read=True AND is currently active 250 251 This prevents simultaneous reads of streams that shouldn't be accessed concurrently. 252 253 :return: A status message if a partition generator was started, otherwise None 254 """ 255 if not self._stream_instances_to_start_partition_generation: 256 return None 257 258 # Remember initial queue size to avoid infinite loops if all streams are blocked 259 max_attempts = len(self._stream_instances_to_start_partition_generation) 260 attempts = 0 261 262 while self._stream_instances_to_start_partition_generation and attempts < max_attempts: 263 attempts += 1 264 265 # Pop the first stream from the queue 266 stream = self._stream_instances_to_start_partition_generation.pop(0) 267 stream_name = stream.name 268 stream_group = self._stream_block_simultaneous_read.get(stream_name, "") 269 270 # Check if this stream has a blocking group and is already active as parent stream 271 # (i.e. being read from during partition generation for another stream) 272 if stream_group and stream_name in self._active_stream_names: 273 # Add back to the END of the queue for retry later 274 self._stream_instances_to_start_partition_generation.append(stream) 275 self._logger.info( 276 f"Deferring stream '{stream_name}' (group '{stream_group}') because it's already active. Trying next stream." 277 ) 278 continue # Try the next stream in the queue 279 280 # Check if this stream's group is already active (another stream in the same group is running) 281 if ( 282 stream_group 283 and stream_group in self._active_groups 284 and self._active_groups[stream_group] 285 ): 286 # Add back to the END of the queue for retry later 287 self._stream_instances_to_start_partition_generation.append(stream) 288 active_streams_in_group = self._active_groups[stream_group] 289 self._logger.info( 290 f"Deferring stream '{stream_name}' (group '{stream_group}') because other stream(s) " 291 f"{active_streams_in_group} in the same group are active. Trying next stream." 292 ) 293 continue # Try the next stream in the queue 294 295 # Check if any parent streams have a blocking group and are currently active 296 parent_streams = self._collect_all_parent_stream_names(stream_name) 297 blocked_by_parents = [ 298 p 299 for p in parent_streams 300 if self._stream_block_simultaneous_read.get(p, "") 301 and p in self._active_stream_names 302 ] 303 304 if blocked_by_parents: 305 # Add back to the END of the queue for retry later 306 self._stream_instances_to_start_partition_generation.append(stream) 307 parent_groups = { 308 self._stream_block_simultaneous_read.get(p, "") for p in blocked_by_parents 309 } 310 self._logger.info( 311 f"Deferring stream '{stream_name}' because parent stream(s) " 312 f"{blocked_by_parents} (groups {parent_groups}) are active. Trying next stream." 313 ) 314 continue # Try the next stream in the queue 315 316 # No blocking - start this stream 317 # Mark stream as active before starting 318 self._active_stream_names.add(stream_name) 319 self._streams_currently_generating_partitions.append(stream_name) 320 321 # Track this stream in its group if it has one 322 if stream_group: 323 if stream_group not in self._active_groups: 324 self._active_groups[stream_group] = set() 325 self._active_groups[stream_group].add(stream_name) 326 self._logger.debug(f"Added '{stream_name}' to active group '{stream_group}'") 327 328 # Also mark all parent streams as active (they will be read from during partition generation) 329 for parent_stream_name in parent_streams: 330 parent_group = self._stream_block_simultaneous_read.get(parent_stream_name, "") 331 if parent_group: 332 self._active_stream_names.add(parent_stream_name) 333 if parent_group not in self._active_groups: 334 self._active_groups[parent_group] = set() 335 self._active_groups[parent_group].add(parent_stream_name) 336 self._logger.info( 337 f"Marking parent stream '{parent_stream_name}' (group '{parent_group}') as active " 338 f"(will be read during partition generation for '{stream_name}')" 339 ) 340 341 self._thread_pool_manager.submit(self._partition_enqueuer.generate_partitions, stream) 342 self._logger.info(f"Marking stream {stream_name} as STARTED") 343 self._logger.info(f"Syncing stream: {stream_name}") 344 return stream_status_as_airbyte_message( 345 stream.as_airbyte_stream(), 346 AirbyteStreamStatus.STARTED, 347 ) 348 349 # All streams in the queue are currently blocked 350 return None 351 352 def is_done(self) -> bool: 353 """ 354 This method is called to check if the sync is done. 355 The sync is done when: 356 1. There are no more streams generating partitions 357 2. There are no more streams to read from 358 3. All partitions for all streams are closed 359 """ 360 is_done = all( 361 [ 362 self._is_stream_done(stream_name) 363 for stream_name in self._stream_name_to_instance.keys() 364 ] 365 ) 366 if is_done and self._stream_instances_to_start_partition_generation: 367 stuck_stream_names = [ 368 s.name for s in self._stream_instances_to_start_partition_generation 369 ] 370 raise AirbyteTracedException( 371 message="Partition generation queue is not empty after all streams completed.", 372 internal_message=f"Streams {stuck_stream_names} remained in the partition generation queue after all streams were marked done.", 373 failure_type=FailureType.system_error, 374 ) 375 if is_done and self._active_groups: 376 raise AirbyteTracedException( 377 message="Active stream groups are not empty after all streams completed.", 378 internal_message=f"Groups {dict(self._active_groups)} still active after all streams were marked done.", 379 failure_type=FailureType.system_error, 380 ) 381 if is_done and self._exceptions_per_stream_name: 382 error_message = generate_failed_streams_error_message(self._exceptions_per_stream_name) 383 self._logger.info(error_message) 384 # We still raise at least one exception when a stream raises an exception because the platform currently relies 385 # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error 386 # type because this combined error isn't actionable, but rather the previously emitted individual errors. 387 raise AirbyteTracedException( 388 message=error_message, 389 internal_message="Concurrent read failure", 390 failure_type=FailureType.config_error, 391 ) 392 return is_done 393 394 def _is_stream_done(self, stream_name: str) -> bool: 395 return stream_name in self._streams_done 396 397 def _collect_all_parent_stream_names(self, stream_name: str) -> Set[str]: 398 """Recursively collect all parent stream names for a given stream. 399 400 For example, if we have: epics -> issues -> comments 401 Then for comments, this returns {issues, epics}. 402 """ 403 parent_names: Set[str] = set() 404 stream = self._stream_name_to_instance.get(stream_name) 405 406 if not stream: 407 return parent_names 408 409 partition_router = ( 410 stream.get_partition_router() if isinstance(stream, DefaultStream) else None 411 ) 412 if isinstance(partition_router, GroupingPartitionRouter): 413 partition_router = partition_router.underlying_partition_router 414 415 if isinstance(partition_router, SubstreamPartitionRouter): 416 for parent_config in partition_router.parent_stream_configs: 417 parent_name = parent_config.stream.name 418 parent_names.add(parent_name) 419 parent_names.update(self._collect_all_parent_stream_names(parent_name)) 420 421 return parent_names 422 423 def _on_stream_is_done(self, stream_name: str) -> Iterable[AirbyteMessage]: 424 self._logger.info( 425 f"Read {self._record_counter[stream_name]} records from {stream_name} stream" 426 ) 427 self._logger.info(f"Marking stream {stream_name} as STOPPED") 428 stream = self._stream_name_to_instance[stream_name] 429 stream.cursor.ensure_at_least_one_state_emitted() 430 yield from self._message_repository.consume_queue() 431 self._logger.info(f"Finished syncing {stream.name}") 432 self._streams_done.add(stream_name) 433 stream_status = ( 434 AirbyteStreamStatus.INCOMPLETE 435 if self._exceptions_per_stream_name.get(stream_name, []) 436 else AirbyteStreamStatus.COMPLETE 437 ) 438 yield stream_status_as_airbyte_message(stream.as_airbyte_stream(), stream_status) 439 440 # Remove only this stream from active set (NOT parents) 441 if stream_name in self._active_stream_names: 442 self._active_stream_names.discard(stream_name) 443 444 # Remove from active groups 445 stream_group = self._stream_block_simultaneous_read.get(stream_name, "") 446 if stream_group: 447 if stream_group in self._active_groups: 448 self._active_groups[stream_group].discard(stream_name) 449 if not self._active_groups[stream_group]: 450 del self._active_groups[stream_group] 451 self._logger.info( 452 f"Stream '{stream_name}' (group '{stream_group}') is no longer active. " 453 f"Blocked streams in the queue will be retried on next start_next_partition_generator call." 454 )
class
ConcurrentReadProcessor:
39class ConcurrentReadProcessor: 40 def __init__( 41 self, 42 stream_instances_to_read_from: List[AbstractStream], 43 partition_enqueuer: PartitionEnqueuer, 44 thread_pool_manager: ThreadPoolManager, 45 logger: logging.Logger, 46 slice_logger: SliceLogger, 47 message_repository: MessageRepository, 48 partition_reader: PartitionReader, 49 ): 50 """ 51 This class is responsible for handling items from a concurrent stream read process. 52 :param stream_instances_to_read_from: List of streams to read from 53 :param partition_enqueuer: PartitionEnqueuer instance 54 :param thread_pool_manager: ThreadPoolManager instance 55 :param logger: Logger instance 56 :param slice_logger: SliceLogger instance 57 :param message_repository: MessageRepository instance 58 :param partition_reader: PartitionReader instance 59 """ 60 self._stream_name_to_instance = {s.name: s for s in stream_instances_to_read_from} 61 self._record_counter = {} 62 self._streams_to_running_partitions: Dict[str, Set[Partition]] = {} 63 for stream in stream_instances_to_read_from: 64 self._streams_to_running_partitions[stream.name] = set() 65 self._record_counter[stream.name] = 0 66 self._thread_pool_manager = thread_pool_manager 67 self._partition_enqueuer = partition_enqueuer 68 self._stream_instances_to_start_partition_generation = stream_instances_to_read_from 69 self._streams_currently_generating_partitions: List[str] = [] 70 self._logger = logger 71 self._slice_logger = slice_logger 72 self._message_repository = message_repository 73 self._partition_reader = partition_reader 74 self._streams_done: Set[str] = set() 75 self._exceptions_per_stream_name: dict[str, List[Exception]] = {} 76 77 # Track which streams (by name) are currently active 78 # A stream is "active" if it's generating partitions or has partitions being read 79 self._active_stream_names: Set[str] = set() 80 81 # Store blocking group names for streams that require blocking simultaneous reads 82 # Maps stream name -> group name (empty string means no blocking) 83 self._stream_block_simultaneous_read: Dict[str, str] = { 84 stream.name: stream.block_simultaneous_read for stream in stream_instances_to_read_from 85 } 86 87 # Track which groups are currently active 88 # Maps group name -> set of stream names in that group 89 self._active_groups: Dict[str, Set[str]] = {} 90 91 for stream in stream_instances_to_read_from: 92 if stream.block_simultaneous_read: 93 self._logger.info( 94 f"Stream '{stream.name}' is in blocking group '{stream.block_simultaneous_read}'. " 95 f"Will defer starting this stream if another stream in the same group or its parents are active." 96 ) 97 98 def on_partition_generation_completed( 99 self, sentinel: PartitionGenerationCompletedSentinel 100 ) -> Iterable[AirbyteMessage]: 101 """ 102 This method is called when a partition generation is completed. 103 1. Remove the stream from the list of streams currently generating partitions 104 2. Deactivate parent streams (they were only needed for partition generation) 105 3. If the stream is done, mark it as such and return a stream status message 106 4. If there are more streams to read from, start the next partition generator 107 """ 108 stream_name = sentinel.stream.name 109 self._streams_currently_generating_partitions.remove(sentinel.stream.name) 110 111 # Deactivate all parent streams now that partition generation is complete 112 # Parents were only needed to generate slices, they can now be reused 113 parent_streams = self._collect_all_parent_stream_names(stream_name) 114 for parent_stream_name in parent_streams: 115 if parent_stream_name in self._active_stream_names: 116 self._logger.debug(f"Removing '{parent_stream_name}' from active streams") 117 self._active_stream_names.discard(parent_stream_name) 118 119 # Remove from active groups 120 parent_group = self._stream_block_simultaneous_read.get(parent_stream_name, "") 121 if parent_group: 122 if parent_group in self._active_groups: 123 self._active_groups[parent_group].discard(parent_stream_name) 124 if not self._active_groups[parent_group]: 125 del self._active_groups[parent_group] 126 self._logger.info( 127 f"Parent stream '{parent_stream_name}' (group '{parent_group}') deactivated after " 128 f"partition generation completed for child '{stream_name}'. " 129 f"Blocked streams in the queue will be retried on next start_next_partition_generator call." 130 ) 131 132 # It is possible for the stream to already be done if no partitions were generated 133 # If the partition generation process was completed and there are no partitions left to process, the stream is done 134 if ( 135 self._is_stream_done(stream_name) 136 or len(self._streams_to_running_partitions[stream_name]) == 0 137 ): 138 yield from self._on_stream_is_done(stream_name) 139 if self._stream_instances_to_start_partition_generation: 140 status_message = self.start_next_partition_generator() 141 if status_message: 142 yield status_message 143 144 def on_partition(self, partition: Partition) -> None: 145 """ 146 This method is called when a partition is generated. 147 1. Add the partition to the set of partitions for the stream 148 2. Log the slice if necessary 149 3. Submit the partition to the thread pool manager 150 """ 151 stream_name = partition.stream_name() 152 self._streams_to_running_partitions[stream_name].add(partition) 153 cursor = self._stream_name_to_instance[stream_name].cursor 154 if self._slice_logger.should_log_slice_message(self._logger): 155 self._message_repository.emit_message( 156 self._slice_logger.create_slice_log_message(partition.to_slice()) 157 ) 158 self._thread_pool_manager.submit( 159 self._partition_reader.process_partition, partition, cursor 160 ) 161 162 def on_partition_complete_sentinel( 163 self, sentinel: PartitionCompleteSentinel 164 ) -> Iterable[AirbyteMessage]: 165 """ 166 This method is called when a partition is completed. 167 1. Close the partition 168 2. If the stream is done, mark it as such and return a stream status message 169 3. Emit messages that were added to the message repository 170 4. If there are more streams to read from, start the next partition generator 171 """ 172 partition = sentinel.partition 173 174 partitions_running = self._streams_to_running_partitions[partition.stream_name()] 175 if partition in partitions_running: 176 partitions_running.remove(partition) 177 # If all partitions were generated and this was the last one, the stream is done 178 if ( 179 partition.stream_name() not in self._streams_currently_generating_partitions 180 and len(partitions_running) == 0 181 ): 182 yield from self._on_stream_is_done(partition.stream_name()) 183 # Try to start the next stream in the queue (may be a deferred stream) 184 if self._stream_instances_to_start_partition_generation: 185 status_message = self.start_next_partition_generator() 186 if status_message: 187 yield status_message 188 yield from self._message_repository.consume_queue() 189 190 def on_record(self, record: Record) -> Iterable[AirbyteMessage]: 191 """ 192 This method is called when a record is read from a partition. 193 1. Convert the record to an AirbyteMessage 194 2. If this is the first record for the stream, mark the stream as RUNNING 195 3. Increment the record counter for the stream 196 4. Ensures the cursor knows the record has been successfully emitted 197 5. Emit the message 198 6. Emit messages that were added to the message repository 199 """ 200 # Do not pass a transformer or a schema 201 # AbstractStreams are expected to return data as they are expected. 202 # Any transformation on the data should be done before reaching this point 203 message = stream_data_to_airbyte_message( 204 stream_name=record.stream_name, 205 data_or_message=record.data, 206 file_reference=record.file_reference, 207 ) 208 stream = self._stream_name_to_instance[record.stream_name] 209 210 if message.type == MessageType.RECORD: 211 if self._record_counter[stream.name] == 0: 212 self._logger.info(f"Marking stream {stream.name} as RUNNING") 213 yield stream_status_as_airbyte_message( 214 stream.as_airbyte_stream(), AirbyteStreamStatus.RUNNING 215 ) 216 self._record_counter[stream.name] += 1 217 yield message 218 yield from self._message_repository.consume_queue() 219 220 def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMessage]: 221 """ 222 This method is called when an exception is raised. 223 1. Stop all running streams 224 2. Raise the exception 225 """ 226 self._flag_exception(exception.stream_name, exception.exception) 227 self._logger.exception( 228 f"Exception while syncing stream {exception.stream_name}", exc_info=exception.exception 229 ) 230 231 stream_descriptor = StreamDescriptor(name=exception.stream_name) 232 if isinstance(exception.exception, AirbyteTracedException): 233 yield exception.exception.as_airbyte_message(stream_descriptor=stream_descriptor) 234 else: 235 yield AirbyteTracedException.from_exception( 236 exception.exception, 237 stream_descriptor=stream_descriptor, 238 message=f"An unexpected error occurred in stream {exception.stream_name}: {type(exception.exception).__name__}", 239 ).as_airbyte_message() 240 241 def _flag_exception(self, stream_name: str, exception: Exception) -> None: 242 self._exceptions_per_stream_name.setdefault(stream_name, []).append(exception) 243 244 def start_next_partition_generator(self) -> Optional[AirbyteMessage]: 245 """ 246 Submits the next partition generator to the thread pool. 247 248 A stream will be deferred (moved to end of queue) if: 249 1. The stream itself has block_simultaneous_read=True AND is already active 250 2. Any parent stream has block_simultaneous_read=True AND is currently active 251 252 This prevents simultaneous reads of streams that shouldn't be accessed concurrently. 253 254 :return: A status message if a partition generator was started, otherwise None 255 """ 256 if not self._stream_instances_to_start_partition_generation: 257 return None 258 259 # Remember initial queue size to avoid infinite loops if all streams are blocked 260 max_attempts = len(self._stream_instances_to_start_partition_generation) 261 attempts = 0 262 263 while self._stream_instances_to_start_partition_generation and attempts < max_attempts: 264 attempts += 1 265 266 # Pop the first stream from the queue 267 stream = self._stream_instances_to_start_partition_generation.pop(0) 268 stream_name = stream.name 269 stream_group = self._stream_block_simultaneous_read.get(stream_name, "") 270 271 # Check if this stream has a blocking group and is already active as parent stream 272 # (i.e. being read from during partition generation for another stream) 273 if stream_group and stream_name in self._active_stream_names: 274 # Add back to the END of the queue for retry later 275 self._stream_instances_to_start_partition_generation.append(stream) 276 self._logger.info( 277 f"Deferring stream '{stream_name}' (group '{stream_group}') because it's already active. Trying next stream." 278 ) 279 continue # Try the next stream in the queue 280 281 # Check if this stream's group is already active (another stream in the same group is running) 282 if ( 283 stream_group 284 and stream_group in self._active_groups 285 and self._active_groups[stream_group] 286 ): 287 # Add back to the END of the queue for retry later 288 self._stream_instances_to_start_partition_generation.append(stream) 289 active_streams_in_group = self._active_groups[stream_group] 290 self._logger.info( 291 f"Deferring stream '{stream_name}' (group '{stream_group}') because other stream(s) " 292 f"{active_streams_in_group} in the same group are active. Trying next stream." 293 ) 294 continue # Try the next stream in the queue 295 296 # Check if any parent streams have a blocking group and are currently active 297 parent_streams = self._collect_all_parent_stream_names(stream_name) 298 blocked_by_parents = [ 299 p 300 for p in parent_streams 301 if self._stream_block_simultaneous_read.get(p, "") 302 and p in self._active_stream_names 303 ] 304 305 if blocked_by_parents: 306 # Add back to the END of the queue for retry later 307 self._stream_instances_to_start_partition_generation.append(stream) 308 parent_groups = { 309 self._stream_block_simultaneous_read.get(p, "") for p in blocked_by_parents 310 } 311 self._logger.info( 312 f"Deferring stream '{stream_name}' because parent stream(s) " 313 f"{blocked_by_parents} (groups {parent_groups}) are active. Trying next stream." 314 ) 315 continue # Try the next stream in the queue 316 317 # No blocking - start this stream 318 # Mark stream as active before starting 319 self._active_stream_names.add(stream_name) 320 self._streams_currently_generating_partitions.append(stream_name) 321 322 # Track this stream in its group if it has one 323 if stream_group: 324 if stream_group not in self._active_groups: 325 self._active_groups[stream_group] = set() 326 self._active_groups[stream_group].add(stream_name) 327 self._logger.debug(f"Added '{stream_name}' to active group '{stream_group}'") 328 329 # Also mark all parent streams as active (they will be read from during partition generation) 330 for parent_stream_name in parent_streams: 331 parent_group = self._stream_block_simultaneous_read.get(parent_stream_name, "") 332 if parent_group: 333 self._active_stream_names.add(parent_stream_name) 334 if parent_group not in self._active_groups: 335 self._active_groups[parent_group] = set() 336 self._active_groups[parent_group].add(parent_stream_name) 337 self._logger.info( 338 f"Marking parent stream '{parent_stream_name}' (group '{parent_group}') as active " 339 f"(will be read during partition generation for '{stream_name}')" 340 ) 341 342 self._thread_pool_manager.submit(self._partition_enqueuer.generate_partitions, stream) 343 self._logger.info(f"Marking stream {stream_name} as STARTED") 344 self._logger.info(f"Syncing stream: {stream_name}") 345 return stream_status_as_airbyte_message( 346 stream.as_airbyte_stream(), 347 AirbyteStreamStatus.STARTED, 348 ) 349 350 # All streams in the queue are currently blocked 351 return None 352 353 def is_done(self) -> bool: 354 """ 355 This method is called to check if the sync is done. 356 The sync is done when: 357 1. There are no more streams generating partitions 358 2. There are no more streams to read from 359 3. All partitions for all streams are closed 360 """ 361 is_done = all( 362 [ 363 self._is_stream_done(stream_name) 364 for stream_name in self._stream_name_to_instance.keys() 365 ] 366 ) 367 if is_done and self._stream_instances_to_start_partition_generation: 368 stuck_stream_names = [ 369 s.name for s in self._stream_instances_to_start_partition_generation 370 ] 371 raise AirbyteTracedException( 372 message="Partition generation queue is not empty after all streams completed.", 373 internal_message=f"Streams {stuck_stream_names} remained in the partition generation queue after all streams were marked done.", 374 failure_type=FailureType.system_error, 375 ) 376 if is_done and self._active_groups: 377 raise AirbyteTracedException( 378 message="Active stream groups are not empty after all streams completed.", 379 internal_message=f"Groups {dict(self._active_groups)} still active after all streams were marked done.", 380 failure_type=FailureType.system_error, 381 ) 382 if is_done and self._exceptions_per_stream_name: 383 error_message = generate_failed_streams_error_message(self._exceptions_per_stream_name) 384 self._logger.info(error_message) 385 # We still raise at least one exception when a stream raises an exception because the platform currently relies 386 # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error 387 # type because this combined error isn't actionable, but rather the previously emitted individual errors. 388 raise AirbyteTracedException( 389 message=error_message, 390 internal_message="Concurrent read failure", 391 failure_type=FailureType.config_error, 392 ) 393 return is_done 394 395 def _is_stream_done(self, stream_name: str) -> bool: 396 return stream_name in self._streams_done 397 398 def _collect_all_parent_stream_names(self, stream_name: str) -> Set[str]: 399 """Recursively collect all parent stream names for a given stream. 400 401 For example, if we have: epics -> issues -> comments 402 Then for comments, this returns {issues, epics}. 403 """ 404 parent_names: Set[str] = set() 405 stream = self._stream_name_to_instance.get(stream_name) 406 407 if not stream: 408 return parent_names 409 410 partition_router = ( 411 stream.get_partition_router() if isinstance(stream, DefaultStream) else None 412 ) 413 if isinstance(partition_router, GroupingPartitionRouter): 414 partition_router = partition_router.underlying_partition_router 415 416 if isinstance(partition_router, SubstreamPartitionRouter): 417 for parent_config in partition_router.parent_stream_configs: 418 parent_name = parent_config.stream.name 419 parent_names.add(parent_name) 420 parent_names.update(self._collect_all_parent_stream_names(parent_name)) 421 422 return parent_names 423 424 def _on_stream_is_done(self, stream_name: str) -> Iterable[AirbyteMessage]: 425 self._logger.info( 426 f"Read {self._record_counter[stream_name]} records from {stream_name} stream" 427 ) 428 self._logger.info(f"Marking stream {stream_name} as STOPPED") 429 stream = self._stream_name_to_instance[stream_name] 430 stream.cursor.ensure_at_least_one_state_emitted() 431 yield from self._message_repository.consume_queue() 432 self._logger.info(f"Finished syncing {stream.name}") 433 self._streams_done.add(stream_name) 434 stream_status = ( 435 AirbyteStreamStatus.INCOMPLETE 436 if self._exceptions_per_stream_name.get(stream_name, []) 437 else AirbyteStreamStatus.COMPLETE 438 ) 439 yield stream_status_as_airbyte_message(stream.as_airbyte_stream(), stream_status) 440 441 # Remove only this stream from active set (NOT parents) 442 if stream_name in self._active_stream_names: 443 self._active_stream_names.discard(stream_name) 444 445 # Remove from active groups 446 stream_group = self._stream_block_simultaneous_read.get(stream_name, "") 447 if stream_group: 448 if stream_group in self._active_groups: 449 self._active_groups[stream_group].discard(stream_name) 450 if not self._active_groups[stream_group]: 451 del self._active_groups[stream_group] 452 self._logger.info( 453 f"Stream '{stream_name}' (group '{stream_group}') is no longer active. " 454 f"Blocked streams in the queue will be retried on next start_next_partition_generator call." 455 )
ConcurrentReadProcessor( stream_instances_to_read_from: List[airbyte_cdk.sources.streams.concurrent.abstract_stream.AbstractStream], partition_enqueuer: airbyte_cdk.sources.streams.concurrent.partition_enqueuer.PartitionEnqueuer, thread_pool_manager: airbyte_cdk.sources.concurrent_source.thread_pool_manager.ThreadPoolManager, logger: logging.Logger, slice_logger: airbyte_cdk.sources.utils.slice_logger.SliceLogger, message_repository: airbyte_cdk.MessageRepository, partition_reader: airbyte_cdk.sources.streams.concurrent.partition_reader.PartitionReader)
40 def __init__( 41 self, 42 stream_instances_to_read_from: List[AbstractStream], 43 partition_enqueuer: PartitionEnqueuer, 44 thread_pool_manager: ThreadPoolManager, 45 logger: logging.Logger, 46 slice_logger: SliceLogger, 47 message_repository: MessageRepository, 48 partition_reader: PartitionReader, 49 ): 50 """ 51 This class is responsible for handling items from a concurrent stream read process. 52 :param stream_instances_to_read_from: List of streams to read from 53 :param partition_enqueuer: PartitionEnqueuer instance 54 :param thread_pool_manager: ThreadPoolManager instance 55 :param logger: Logger instance 56 :param slice_logger: SliceLogger instance 57 :param message_repository: MessageRepository instance 58 :param partition_reader: PartitionReader instance 59 """ 60 self._stream_name_to_instance = {s.name: s for s in stream_instances_to_read_from} 61 self._record_counter = {} 62 self._streams_to_running_partitions: Dict[str, Set[Partition]] = {} 63 for stream in stream_instances_to_read_from: 64 self._streams_to_running_partitions[stream.name] = set() 65 self._record_counter[stream.name] = 0 66 self._thread_pool_manager = thread_pool_manager 67 self._partition_enqueuer = partition_enqueuer 68 self._stream_instances_to_start_partition_generation = stream_instances_to_read_from 69 self._streams_currently_generating_partitions: List[str] = [] 70 self._logger = logger 71 self._slice_logger = slice_logger 72 self._message_repository = message_repository 73 self._partition_reader = partition_reader 74 self._streams_done: Set[str] = set() 75 self._exceptions_per_stream_name: dict[str, List[Exception]] = {} 76 77 # Track which streams (by name) are currently active 78 # A stream is "active" if it's generating partitions or has partitions being read 79 self._active_stream_names: Set[str] = set() 80 81 # Store blocking group names for streams that require blocking simultaneous reads 82 # Maps stream name -> group name (empty string means no blocking) 83 self._stream_block_simultaneous_read: Dict[str, str] = { 84 stream.name: stream.block_simultaneous_read for stream in stream_instances_to_read_from 85 } 86 87 # Track which groups are currently active 88 # Maps group name -> set of stream names in that group 89 self._active_groups: Dict[str, Set[str]] = {} 90 91 for stream in stream_instances_to_read_from: 92 if stream.block_simultaneous_read: 93 self._logger.info( 94 f"Stream '{stream.name}' is in blocking group '{stream.block_simultaneous_read}'. " 95 f"Will defer starting this stream if another stream in the same group or its parents are active." 96 )
This class is responsible for handling items from a concurrent stream read process.
Parameters
- stream_instances_to_read_from: List of streams to read from
- partition_enqueuer: PartitionEnqueuer instance
- thread_pool_manager: ThreadPoolManager instance
- logger: Logger instance
- slice_logger: SliceLogger instance
- message_repository: MessageRepository instance
- partition_reader: PartitionReader instance
def
on_partition_generation_completed( self, sentinel: airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel.PartitionGenerationCompletedSentinel) -> Iterable[airbyte_cdk.AirbyteMessage]:
98 def on_partition_generation_completed( 99 self, sentinel: PartitionGenerationCompletedSentinel 100 ) -> Iterable[AirbyteMessage]: 101 """ 102 This method is called when a partition generation is completed. 103 1. Remove the stream from the list of streams currently generating partitions 104 2. Deactivate parent streams (they were only needed for partition generation) 105 3. If the stream is done, mark it as such and return a stream status message 106 4. If there are more streams to read from, start the next partition generator 107 """ 108 stream_name = sentinel.stream.name 109 self._streams_currently_generating_partitions.remove(sentinel.stream.name) 110 111 # Deactivate all parent streams now that partition generation is complete 112 # Parents were only needed to generate slices, they can now be reused 113 parent_streams = self._collect_all_parent_stream_names(stream_name) 114 for parent_stream_name in parent_streams: 115 if parent_stream_name in self._active_stream_names: 116 self._logger.debug(f"Removing '{parent_stream_name}' from active streams") 117 self._active_stream_names.discard(parent_stream_name) 118 119 # Remove from active groups 120 parent_group = self._stream_block_simultaneous_read.get(parent_stream_name, "") 121 if parent_group: 122 if parent_group in self._active_groups: 123 self._active_groups[parent_group].discard(parent_stream_name) 124 if not self._active_groups[parent_group]: 125 del self._active_groups[parent_group] 126 self._logger.info( 127 f"Parent stream '{parent_stream_name}' (group '{parent_group}') deactivated after " 128 f"partition generation completed for child '{stream_name}'. " 129 f"Blocked streams in the queue will be retried on next start_next_partition_generator call." 130 ) 131 132 # It is possible for the stream to already be done if no partitions were generated 133 # If the partition generation process was completed and there are no partitions left to process, the stream is done 134 if ( 135 self._is_stream_done(stream_name) 136 or len(self._streams_to_running_partitions[stream_name]) == 0 137 ): 138 yield from self._on_stream_is_done(stream_name) 139 if self._stream_instances_to_start_partition_generation: 140 status_message = self.start_next_partition_generator() 141 if status_message: 142 yield status_message
This method is called when a partition generation is completed.
- Remove the stream from the list of streams currently generating partitions
- Deactivate parent streams (they were only needed for partition generation)
- If the stream is done, mark it as such and return a stream status message
- If there are more streams to read from, start the next partition generator
def
on_partition( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition) -> None:
144 def on_partition(self, partition: Partition) -> None: 145 """ 146 This method is called when a partition is generated. 147 1. Add the partition to the set of partitions for the stream 148 2. Log the slice if necessary 149 3. Submit the partition to the thread pool manager 150 """ 151 stream_name = partition.stream_name() 152 self._streams_to_running_partitions[stream_name].add(partition) 153 cursor = self._stream_name_to_instance[stream_name].cursor 154 if self._slice_logger.should_log_slice_message(self._logger): 155 self._message_repository.emit_message( 156 self._slice_logger.create_slice_log_message(partition.to_slice()) 157 ) 158 self._thread_pool_manager.submit( 159 self._partition_reader.process_partition, partition, cursor 160 )
This method is called when a partition is generated.
- Add the partition to the set of partitions for the stream
- Log the slice if necessary
- Submit the partition to the thread pool manager
def
on_partition_complete_sentinel( self, sentinel: airbyte_cdk.sources.streams.concurrent.partitions.types.PartitionCompleteSentinel) -> Iterable[airbyte_cdk.AirbyteMessage]:
162 def on_partition_complete_sentinel( 163 self, sentinel: PartitionCompleteSentinel 164 ) -> Iterable[AirbyteMessage]: 165 """ 166 This method is called when a partition is completed. 167 1. Close the partition 168 2. If the stream is done, mark it as such and return a stream status message 169 3. Emit messages that were added to the message repository 170 4. If there are more streams to read from, start the next partition generator 171 """ 172 partition = sentinel.partition 173 174 partitions_running = self._streams_to_running_partitions[partition.stream_name()] 175 if partition in partitions_running: 176 partitions_running.remove(partition) 177 # If all partitions were generated and this was the last one, the stream is done 178 if ( 179 partition.stream_name() not in self._streams_currently_generating_partitions 180 and len(partitions_running) == 0 181 ): 182 yield from self._on_stream_is_done(partition.stream_name()) 183 # Try to start the next stream in the queue (may be a deferred stream) 184 if self._stream_instances_to_start_partition_generation: 185 status_message = self.start_next_partition_generator() 186 if status_message: 187 yield status_message 188 yield from self._message_repository.consume_queue()
This method is called when a partition is completed.
- Close the partition
- If the stream is done, mark it as such and return a stream status message
- Emit messages that were added to the message repository
- If there are more streams to read from, start the next partition generator
190 def on_record(self, record: Record) -> Iterable[AirbyteMessage]: 191 """ 192 This method is called when a record is read from a partition. 193 1. Convert the record to an AirbyteMessage 194 2. If this is the first record for the stream, mark the stream as RUNNING 195 3. Increment the record counter for the stream 196 4. Ensures the cursor knows the record has been successfully emitted 197 5. Emit the message 198 6. Emit messages that were added to the message repository 199 """ 200 # Do not pass a transformer or a schema 201 # AbstractStreams are expected to return data as they are expected. 202 # Any transformation on the data should be done before reaching this point 203 message = stream_data_to_airbyte_message( 204 stream_name=record.stream_name, 205 data_or_message=record.data, 206 file_reference=record.file_reference, 207 ) 208 stream = self._stream_name_to_instance[record.stream_name] 209 210 if message.type == MessageType.RECORD: 211 if self._record_counter[stream.name] == 0: 212 self._logger.info(f"Marking stream {stream.name} as RUNNING") 213 yield stream_status_as_airbyte_message( 214 stream.as_airbyte_stream(), AirbyteStreamStatus.RUNNING 215 ) 216 self._record_counter[stream.name] += 1 217 yield message 218 yield from self._message_repository.consume_queue()
This method is called when a record is read from a partition.
- Convert the record to an AirbyteMessage
- If this is the first record for the stream, mark the stream as RUNNING
- Increment the record counter for the stream
- Ensures the cursor knows the record has been successfully emitted
- Emit the message
- Emit messages that were added to the message repository
def
on_exception( self, exception: airbyte_cdk.sources.concurrent_source.stream_thread_exception.StreamThreadException) -> Iterable[airbyte_cdk.AirbyteMessage]:
220 def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMessage]: 221 """ 222 This method is called when an exception is raised. 223 1. Stop all running streams 224 2. Raise the exception 225 """ 226 self._flag_exception(exception.stream_name, exception.exception) 227 self._logger.exception( 228 f"Exception while syncing stream {exception.stream_name}", exc_info=exception.exception 229 ) 230 231 stream_descriptor = StreamDescriptor(name=exception.stream_name) 232 if isinstance(exception.exception, AirbyteTracedException): 233 yield exception.exception.as_airbyte_message(stream_descriptor=stream_descriptor) 234 else: 235 yield AirbyteTracedException.from_exception( 236 exception.exception, 237 stream_descriptor=stream_descriptor, 238 message=f"An unexpected error occurred in stream {exception.stream_name}: {type(exception.exception).__name__}", 239 ).as_airbyte_message()
This method is called when an exception is raised.
- Stop all running streams
- Raise the exception
244 def start_next_partition_generator(self) -> Optional[AirbyteMessage]: 245 """ 246 Submits the next partition generator to the thread pool. 247 248 A stream will be deferred (moved to end of queue) if: 249 1. The stream itself has block_simultaneous_read=True AND is already active 250 2. Any parent stream has block_simultaneous_read=True AND is currently active 251 252 This prevents simultaneous reads of streams that shouldn't be accessed concurrently. 253 254 :return: A status message if a partition generator was started, otherwise None 255 """ 256 if not self._stream_instances_to_start_partition_generation: 257 return None 258 259 # Remember initial queue size to avoid infinite loops if all streams are blocked 260 max_attempts = len(self._stream_instances_to_start_partition_generation) 261 attempts = 0 262 263 while self._stream_instances_to_start_partition_generation and attempts < max_attempts: 264 attempts += 1 265 266 # Pop the first stream from the queue 267 stream = self._stream_instances_to_start_partition_generation.pop(0) 268 stream_name = stream.name 269 stream_group = self._stream_block_simultaneous_read.get(stream_name, "") 270 271 # Check if this stream has a blocking group and is already active as parent stream 272 # (i.e. being read from during partition generation for another stream) 273 if stream_group and stream_name in self._active_stream_names: 274 # Add back to the END of the queue for retry later 275 self._stream_instances_to_start_partition_generation.append(stream) 276 self._logger.info( 277 f"Deferring stream '{stream_name}' (group '{stream_group}') because it's already active. Trying next stream." 278 ) 279 continue # Try the next stream in the queue 280 281 # Check if this stream's group is already active (another stream in the same group is running) 282 if ( 283 stream_group 284 and stream_group in self._active_groups 285 and self._active_groups[stream_group] 286 ): 287 # Add back to the END of the queue for retry later 288 self._stream_instances_to_start_partition_generation.append(stream) 289 active_streams_in_group = self._active_groups[stream_group] 290 self._logger.info( 291 f"Deferring stream '{stream_name}' (group '{stream_group}') because other stream(s) " 292 f"{active_streams_in_group} in the same group are active. Trying next stream." 293 ) 294 continue # Try the next stream in the queue 295 296 # Check if any parent streams have a blocking group and are currently active 297 parent_streams = self._collect_all_parent_stream_names(stream_name) 298 blocked_by_parents = [ 299 p 300 for p in parent_streams 301 if self._stream_block_simultaneous_read.get(p, "") 302 and p in self._active_stream_names 303 ] 304 305 if blocked_by_parents: 306 # Add back to the END of the queue for retry later 307 self._stream_instances_to_start_partition_generation.append(stream) 308 parent_groups = { 309 self._stream_block_simultaneous_read.get(p, "") for p in blocked_by_parents 310 } 311 self._logger.info( 312 f"Deferring stream '{stream_name}' because parent stream(s) " 313 f"{blocked_by_parents} (groups {parent_groups}) are active. Trying next stream." 314 ) 315 continue # Try the next stream in the queue 316 317 # No blocking - start this stream 318 # Mark stream as active before starting 319 self._active_stream_names.add(stream_name) 320 self._streams_currently_generating_partitions.append(stream_name) 321 322 # Track this stream in its group if it has one 323 if stream_group: 324 if stream_group not in self._active_groups: 325 self._active_groups[stream_group] = set() 326 self._active_groups[stream_group].add(stream_name) 327 self._logger.debug(f"Added '{stream_name}' to active group '{stream_group}'") 328 329 # Also mark all parent streams as active (they will be read from during partition generation) 330 for parent_stream_name in parent_streams: 331 parent_group = self._stream_block_simultaneous_read.get(parent_stream_name, "") 332 if parent_group: 333 self._active_stream_names.add(parent_stream_name) 334 if parent_group not in self._active_groups: 335 self._active_groups[parent_group] = set() 336 self._active_groups[parent_group].add(parent_stream_name) 337 self._logger.info( 338 f"Marking parent stream '{parent_stream_name}' (group '{parent_group}') as active " 339 f"(will be read during partition generation for '{stream_name}')" 340 ) 341 342 self._thread_pool_manager.submit(self._partition_enqueuer.generate_partitions, stream) 343 self._logger.info(f"Marking stream {stream_name} as STARTED") 344 self._logger.info(f"Syncing stream: {stream_name}") 345 return stream_status_as_airbyte_message( 346 stream.as_airbyte_stream(), 347 AirbyteStreamStatus.STARTED, 348 ) 349 350 # All streams in the queue are currently blocked 351 return None
Submits the next partition generator to the thread pool.
A stream will be deferred (moved to end of queue) if:
- The stream itself has block_simultaneous_read=True AND is already active
- Any parent stream has block_simultaneous_read=True AND is currently active
This prevents simultaneous reads of streams that shouldn't be accessed concurrently.
Returns
A status message if a partition generator was started, otherwise None
def
is_done(self) -> bool:
353 def is_done(self) -> bool: 354 """ 355 This method is called to check if the sync is done. 356 The sync is done when: 357 1. There are no more streams generating partitions 358 2. There are no more streams to read from 359 3. All partitions for all streams are closed 360 """ 361 is_done = all( 362 [ 363 self._is_stream_done(stream_name) 364 for stream_name in self._stream_name_to_instance.keys() 365 ] 366 ) 367 if is_done and self._stream_instances_to_start_partition_generation: 368 stuck_stream_names = [ 369 s.name for s in self._stream_instances_to_start_partition_generation 370 ] 371 raise AirbyteTracedException( 372 message="Partition generation queue is not empty after all streams completed.", 373 internal_message=f"Streams {stuck_stream_names} remained in the partition generation queue after all streams were marked done.", 374 failure_type=FailureType.system_error, 375 ) 376 if is_done and self._active_groups: 377 raise AirbyteTracedException( 378 message="Active stream groups are not empty after all streams completed.", 379 internal_message=f"Groups {dict(self._active_groups)} still active after all streams were marked done.", 380 failure_type=FailureType.system_error, 381 ) 382 if is_done and self._exceptions_per_stream_name: 383 error_message = generate_failed_streams_error_message(self._exceptions_per_stream_name) 384 self._logger.info(error_message) 385 # We still raise at least one exception when a stream raises an exception because the platform currently relies 386 # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error 387 # type because this combined error isn't actionable, but rather the previously emitted individual errors. 388 raise AirbyteTracedException( 389 message=error_message, 390 internal_message="Concurrent read failure", 391 failure_type=FailureType.config_error, 392 ) 393 return is_done
This method is called to check if the sync is done. The sync is done when:
- There are no more streams generating partitions
- There are no more streams to read from
- All partitions for all streams are closed