airbyte_cdk.sources.concurrent_source.concurrent_read_processor
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4import logging 5import os 6from typing import Dict, Iterable, List, Optional, Set 7 8from airbyte_cdk.exception_handler import generate_failed_streams_error_message 9from airbyte_cdk.models import AirbyteMessage, AirbyteStreamStatus, FailureType, StreamDescriptor 10from airbyte_cdk.models import Type as MessageType 11from airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel import ( 12 PartitionGenerationCompletedSentinel, 13) 14from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException 15from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager 16from airbyte_cdk.sources.declarative.partition_routers.grouping_partition_router import ( 17 GroupingPartitionRouter, 18) 19from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import ( 20 SubstreamPartitionRouter, 21) 22from airbyte_cdk.sources.message import MessageRepository 23from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream 24from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream 25from airbyte_cdk.sources.streams.concurrent.partition_enqueuer import PartitionEnqueuer 26from airbyte_cdk.sources.streams.concurrent.partition_reader import PartitionReader 27from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition 28from airbyte_cdk.sources.streams.concurrent.partitions.types import PartitionCompleteSentinel 29from airbyte_cdk.sources.types import Record 30from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message 31from airbyte_cdk.sources.utils.slice_logger import SliceLogger 32from airbyte_cdk.utils import AirbyteTracedException 33from airbyte_cdk.utils.stream_status_utils import ( 34 as_airbyte_message as stream_status_as_airbyte_message, 35) 36 37 38class ConcurrentReadProcessor: 39 def __init__( 40 self, 41 stream_instances_to_read_from: List[AbstractStream], 42 partition_enqueuer: PartitionEnqueuer, 43 thread_pool_manager: ThreadPoolManager, 44 logger: logging.Logger, 45 slice_logger: SliceLogger, 46 message_repository: MessageRepository, 47 partition_reader: PartitionReader, 48 max_concurrent_partition_generators: Optional[int] = None, 49 ): 50 """ 51 This class is responsible for handling items from a concurrent stream read process. 52 :param stream_instances_to_read_from: List of streams to read from 53 :param partition_enqueuer: PartitionEnqueuer instance 54 :param thread_pool_manager: ThreadPoolManager instance 55 :param logger: Logger instance 56 :param slice_logger: SliceLogger instance 57 :param message_repository: MessageRepository instance 58 :param partition_reader: PartitionReader instance 59 :param max_concurrent_partition_generators: Maximum number of partition generators allowed 60 to run concurrently. None means no limit. When set, should be less than the number of 61 workers in multi-worker mode so at least one worker slot is always available for 62 partition reading, preventing thread pool starvation. In single-threaded mode 63 (num_workers=1) the value may equal num_workers; ConcurrentSource.create() handles 64 this distinction. ConcurrentSource.read() passes this value explicitly. 65 """ 66 self._stream_name_to_instance = {s.name: s for s in stream_instances_to_read_from} 67 self._record_counter = {} 68 self._streams_to_running_partitions: Dict[str, Set[Partition]] = {} 69 for stream in stream_instances_to_read_from: 70 self._streams_to_running_partitions[stream.name] = set() 71 self._record_counter[stream.name] = 0 72 if ( 73 max_concurrent_partition_generators is not None 74 and max_concurrent_partition_generators < 1 75 ): 76 raise ValueError( 77 f"max_concurrent_partition_generators must be >= 1 or None, got {max_concurrent_partition_generators}" 78 ) 79 self._thread_pool_manager = thread_pool_manager 80 self._partition_enqueuer = partition_enqueuer 81 self._max_concurrent_partition_generators = max_concurrent_partition_generators 82 self._stream_instances_to_start_partition_generation = stream_instances_to_read_from 83 self._streams_currently_generating_partitions: List[str] = [] 84 self._logger = logger 85 self._slice_logger = slice_logger 86 self._message_repository = message_repository 87 self._partition_reader = partition_reader 88 self._streams_done: Set[str] = set() 89 self._exceptions_per_stream_name: dict[str, List[Exception]] = {} 90 91 # Track which streams (by name) are currently active 92 # A stream is "active" if it's generating partitions or has partitions being read 93 self._active_stream_names: Set[str] = set() 94 95 # Store blocking group names for streams that require blocking simultaneous reads 96 # Maps stream name -> group name (empty string means no blocking) 97 self._stream_block_simultaneous_read: Dict[str, str] = { 98 stream.name: stream.block_simultaneous_read for stream in stream_instances_to_read_from 99 } 100 101 # Track which groups are currently active 102 # Maps group name -> set of stream names in that group 103 self._active_groups: Dict[str, Set[str]] = {} 104 105 for stream in stream_instances_to_read_from: 106 if stream.block_simultaneous_read: 107 self._logger.info( 108 f"Stream '{stream.name}' is in blocking group '{stream.block_simultaneous_read}'. " 109 f"Will defer starting this stream if another stream in the same group or its parents are active." 110 ) 111 112 def on_partition_generation_completed( 113 self, sentinel: PartitionGenerationCompletedSentinel 114 ) -> Iterable[AirbyteMessage]: 115 """ 116 This method is called when a partition generation is completed. 117 1. Remove the stream from the list of streams currently generating partitions 118 2. Deactivate parent streams (they were only needed for partition generation) 119 3. If the stream is done, mark it as such and return a stream status message 120 4. If there are more streams to read from, start the next partition generator 121 """ 122 stream_name = sentinel.stream.name 123 self._streams_currently_generating_partitions.remove(sentinel.stream.name) 124 125 # Deactivate all parent streams now that partition generation is complete 126 # Parents were only needed to generate slices, they can now be reused 127 parent_streams = self._collect_all_parent_stream_names(stream_name) 128 for parent_stream_name in parent_streams: 129 if parent_stream_name in self._active_stream_names: 130 self._logger.debug(f"Removing '{parent_stream_name}' from active streams") 131 self._active_stream_names.discard(parent_stream_name) 132 133 # Remove from active groups 134 parent_group = self._stream_block_simultaneous_read.get(parent_stream_name, "") 135 if parent_group: 136 if parent_group in self._active_groups: 137 self._active_groups[parent_group].discard(parent_stream_name) 138 if not self._active_groups[parent_group]: 139 del self._active_groups[parent_group] 140 self._logger.info( 141 f"Parent stream '{parent_stream_name}' (group '{parent_group}') deactivated after " 142 f"partition generation completed for child '{stream_name}'. " 143 f"Blocked streams in the queue will be retried on next start_next_partition_generator call." 144 ) 145 146 # It is possible for the stream to already be done if no partitions were generated 147 # If the partition generation process was completed and there are no partitions left to process, the stream is done 148 if ( 149 self._is_stream_done(stream_name) 150 or len(self._streams_to_running_partitions[stream_name]) == 0 151 ): 152 yield from self._on_stream_is_done(stream_name) 153 if self._stream_instances_to_start_partition_generation: 154 status_message = self.start_next_partition_generator() 155 if status_message: 156 yield status_message 157 158 def on_partition(self, partition: Partition) -> None: 159 """ 160 This method is called when a partition is generated. 161 1. Add the partition to the set of partitions for the stream 162 2. Log the slice if necessary 163 3. Submit the partition to the thread pool manager 164 """ 165 stream_name = partition.stream_name() 166 self._streams_to_running_partitions[stream_name].add(partition) 167 cursor = self._stream_name_to_instance[stream_name].cursor 168 if self._slice_logger.should_log_slice_message(self._logger): 169 self._message_repository.emit_message( 170 self._slice_logger.create_slice_log_message(partition.to_slice()) 171 ) 172 self._thread_pool_manager.submit( 173 self._partition_reader.process_partition, partition, cursor 174 ) 175 176 def on_partition_complete_sentinel( 177 self, sentinel: PartitionCompleteSentinel 178 ) -> Iterable[AirbyteMessage]: 179 """ 180 This method is called when a partition is completed. 181 1. Close the partition 182 2. If the stream is done, mark it as such and return a stream status message 183 3. Emit messages that were added to the message repository 184 4. If there are more streams to read from, start the next partition generator 185 """ 186 partition = sentinel.partition 187 188 partitions_running = self._streams_to_running_partitions[partition.stream_name()] 189 if partition in partitions_running: 190 partitions_running.remove(partition) 191 # If all partitions were generated and this was the last one, the stream is done 192 if ( 193 partition.stream_name() not in self._streams_currently_generating_partitions 194 and len(partitions_running) == 0 195 ): 196 yield from self._on_stream_is_done(partition.stream_name()) 197 # Try to start the next stream in the queue (may be a deferred stream) 198 if self._stream_instances_to_start_partition_generation: 199 status_message = self.start_next_partition_generator() 200 if status_message: 201 yield status_message 202 yield from self._message_repository.consume_queue() 203 204 def on_record(self, record: Record) -> Iterable[AirbyteMessage]: 205 """ 206 This method is called when a record is read from a partition. 207 1. Convert the record to an AirbyteMessage 208 2. If this is the first record for the stream, mark the stream as RUNNING 209 3. Increment the record counter for the stream 210 4. Ensures the cursor knows the record has been successfully emitted 211 5. Emit the message 212 6. Emit messages that were added to the message repository 213 """ 214 # Do not pass a transformer or a schema 215 # AbstractStreams are expected to return data as they are expected. 216 # Any transformation on the data should be done before reaching this point 217 message = stream_data_to_airbyte_message( 218 stream_name=record.stream_name, 219 data_or_message=record.data, 220 file_reference=record.file_reference, 221 ) 222 stream = self._stream_name_to_instance[record.stream_name] 223 224 if message.type == MessageType.RECORD: 225 if self._record_counter[stream.name] == 0: 226 self._logger.info(f"Marking stream {stream.name} as RUNNING") 227 yield stream_status_as_airbyte_message( 228 stream.as_airbyte_stream(), AirbyteStreamStatus.RUNNING 229 ) 230 self._record_counter[stream.name] += 1 231 yield message 232 yield from self._message_repository.consume_queue() 233 234 def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMessage]: 235 """ 236 This method is called when an exception is raised. 237 1. Stop all running streams 238 2. Raise the exception 239 """ 240 self._flag_exception(exception.stream_name, exception.exception) 241 self._logger.exception( 242 f"Exception while syncing stream {exception.stream_name}", exc_info=exception.exception 243 ) 244 245 stream_descriptor = StreamDescriptor(name=exception.stream_name) 246 if isinstance(exception.exception, AirbyteTracedException): 247 yield exception.exception.as_airbyte_message(stream_descriptor=stream_descriptor) 248 else: 249 yield AirbyteTracedException.from_exception( 250 exception.exception, 251 stream_descriptor=stream_descriptor, 252 message=f"An unexpected error occurred in stream {exception.stream_name}: {type(exception.exception).__name__}", 253 ).as_airbyte_message() 254 255 def _flag_exception(self, stream_name: str, exception: Exception) -> None: 256 self._exceptions_per_stream_name.setdefault(stream_name, []).append(exception) 257 258 def start_next_partition_generator(self) -> Optional[AirbyteMessage]: 259 """ 260 Submits the next partition generator to the thread pool. 261 262 A stream will be deferred (moved to end of queue) if: 263 1. The stream itself has block_simultaneous_read=True AND is already active 264 2. Any parent stream has block_simultaneous_read=True AND is currently active 265 266 This prevents simultaneous reads of streams that shouldn't be accessed concurrently. 267 268 :return: A status message if a partition generator was started, otherwise None 269 """ 270 if not self._stream_instances_to_start_partition_generation: 271 return None 272 273 # Enforce the concurrent generator cap so at least one worker slot is always available 274 # for partition reading. Recovery is guaranteed: on_partition_generation_completed 275 # decrements the count before calling here, so the guard always passes there. 276 if ( 277 self._max_concurrent_partition_generators is not None 278 and len(self._streams_currently_generating_partitions) 279 >= self._max_concurrent_partition_generators 280 ): 281 self._logger.debug( 282 f"Concurrent partition generator cap ({self._max_concurrent_partition_generators}) reached " 283 f"({len(self._streams_currently_generating_partitions)} active). Deferring next generator start." 284 ) 285 return None 286 287 # Remember initial queue size to avoid infinite loops if all streams are blocked 288 max_attempts = len(self._stream_instances_to_start_partition_generation) 289 attempts = 0 290 291 while self._stream_instances_to_start_partition_generation and attempts < max_attempts: 292 attempts += 1 293 294 # Pop the first stream from the queue 295 stream = self._stream_instances_to_start_partition_generation.pop(0) 296 stream_name = stream.name 297 stream_group = self._stream_block_simultaneous_read.get(stream_name, "") 298 299 # Check if this stream has a blocking group and is already active as parent stream 300 # (i.e. being read from during partition generation for another stream) 301 if stream_group and stream_name in self._active_stream_names: 302 # Add back to the END of the queue for retry later 303 self._stream_instances_to_start_partition_generation.append(stream) 304 self._logger.info( 305 f"Deferring stream '{stream_name}' (group '{stream_group}') because it's already active. Trying next stream." 306 ) 307 continue # Try the next stream in the queue 308 309 # Check if this stream's group is already active (another stream in the same group is running) 310 if ( 311 stream_group 312 and stream_group in self._active_groups 313 and self._active_groups[stream_group] 314 ): 315 # Add back to the END of the queue for retry later 316 self._stream_instances_to_start_partition_generation.append(stream) 317 active_streams_in_group = self._active_groups[stream_group] 318 self._logger.info( 319 f"Deferring stream '{stream_name}' (group '{stream_group}') because other stream(s) " 320 f"{active_streams_in_group} in the same group are active. Trying next stream." 321 ) 322 continue # Try the next stream in the queue 323 324 # Check if any parent streams have a blocking group and are currently active 325 parent_streams = self._collect_all_parent_stream_names(stream_name) 326 blocked_by_parents = [ 327 p 328 for p in parent_streams 329 if self._stream_block_simultaneous_read.get(p, "") 330 and p in self._active_stream_names 331 ] 332 333 if blocked_by_parents: 334 # Add back to the END of the queue for retry later 335 self._stream_instances_to_start_partition_generation.append(stream) 336 parent_groups = { 337 self._stream_block_simultaneous_read.get(p, "") for p in blocked_by_parents 338 } 339 self._logger.info( 340 f"Deferring stream '{stream_name}' because parent stream(s) " 341 f"{blocked_by_parents} (groups {parent_groups}) are active. Trying next stream." 342 ) 343 continue # Try the next stream in the queue 344 345 # No blocking - start this stream 346 # Mark stream as active before starting 347 self._active_stream_names.add(stream_name) 348 self._streams_currently_generating_partitions.append(stream_name) 349 350 # Track this stream in its group if it has one 351 if stream_group: 352 if stream_group not in self._active_groups: 353 self._active_groups[stream_group] = set() 354 self._active_groups[stream_group].add(stream_name) 355 self._logger.debug(f"Added '{stream_name}' to active group '{stream_group}'") 356 357 # Also mark all parent streams as active (they will be read from during partition generation) 358 for parent_stream_name in parent_streams: 359 parent_group = self._stream_block_simultaneous_read.get(parent_stream_name, "") 360 if parent_group: 361 self._active_stream_names.add(parent_stream_name) 362 if parent_group not in self._active_groups: 363 self._active_groups[parent_group] = set() 364 self._active_groups[parent_group].add(parent_stream_name) 365 self._logger.info( 366 f"Marking parent stream '{parent_stream_name}' (group '{parent_group}') as active " 367 f"(will be read during partition generation for '{stream_name}')" 368 ) 369 370 self._thread_pool_manager.submit(self._partition_enqueuer.generate_partitions, stream) 371 self._logger.info(f"Marking stream {stream_name} as STARTED") 372 self._logger.info(f"Syncing stream: {stream_name}") 373 return stream_status_as_airbyte_message( 374 stream.as_airbyte_stream(), 375 AirbyteStreamStatus.STARTED, 376 ) 377 378 # All streams in the queue are currently blocked 379 return None 380 381 def is_done(self) -> bool: 382 """ 383 This method is called to check if the sync is done. 384 The sync is done when: 385 1. There are no more streams generating partitions 386 2. There are no more streams to read from 387 3. All partitions for all streams are closed 388 """ 389 is_done = all( 390 [ 391 self._is_stream_done(stream_name) 392 for stream_name in self._stream_name_to_instance.keys() 393 ] 394 ) 395 if is_done and self._stream_instances_to_start_partition_generation: 396 stuck_stream_names = [ 397 s.name for s in self._stream_instances_to_start_partition_generation 398 ] 399 raise AirbyteTracedException( 400 message="Partition generation queue is not empty after all streams completed.", 401 internal_message=f"Streams {stuck_stream_names} remained in the partition generation queue after all streams were marked done.", 402 failure_type=FailureType.system_error, 403 ) 404 if is_done and self._active_groups: 405 raise AirbyteTracedException( 406 message="Active stream groups are not empty after all streams completed.", 407 internal_message=f"Groups {dict(self._active_groups)} still active after all streams were marked done.", 408 failure_type=FailureType.system_error, 409 ) 410 if is_done and self._exceptions_per_stream_name: 411 error_message = generate_failed_streams_error_message(self._exceptions_per_stream_name) 412 self._logger.info(error_message) 413 # We still raise at least one exception when a stream raises an exception because the platform currently relies 414 # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error 415 # type because this combined error isn't actionable, but rather the previously emitted individual errors. 416 raise AirbyteTracedException( 417 message=error_message, 418 internal_message="Concurrent read failure", 419 failure_type=FailureType.config_error, 420 ) 421 return is_done 422 423 def _is_stream_done(self, stream_name: str) -> bool: 424 return stream_name in self._streams_done 425 426 def _collect_all_parent_stream_names(self, stream_name: str) -> Set[str]: 427 """Recursively collect all parent stream names for a given stream. 428 429 For example, if we have: epics -> issues -> comments 430 Then for comments, this returns {issues, epics}. 431 """ 432 parent_names: Set[str] = set() 433 stream = self._stream_name_to_instance.get(stream_name) 434 435 if not stream: 436 return parent_names 437 438 partition_router = ( 439 stream.get_partition_router() if isinstance(stream, DefaultStream) else None 440 ) 441 if isinstance(partition_router, GroupingPartitionRouter): 442 partition_router = partition_router.underlying_partition_router 443 444 if isinstance(partition_router, SubstreamPartitionRouter): 445 for parent_config in partition_router.parent_stream_configs: 446 parent_name = parent_config.stream.name 447 parent_names.add(parent_name) 448 parent_names.update(self._collect_all_parent_stream_names(parent_name)) 449 450 return parent_names 451 452 def _on_stream_is_done(self, stream_name: str) -> Iterable[AirbyteMessage]: 453 self._logger.info( 454 f"Read {self._record_counter[stream_name]} records from {stream_name} stream" 455 ) 456 self._logger.info(f"Marking stream {stream_name} as STOPPED") 457 stream = self._stream_name_to_instance[stream_name] 458 stream.cursor.ensure_at_least_one_state_emitted() 459 yield from self._message_repository.consume_queue() 460 self._logger.info(f"Finished syncing {stream.name}") 461 self._streams_done.add(stream_name) 462 stream_status = ( 463 AirbyteStreamStatus.INCOMPLETE 464 if self._exceptions_per_stream_name.get(stream_name, []) 465 else AirbyteStreamStatus.COMPLETE 466 ) 467 yield stream_status_as_airbyte_message(stream.as_airbyte_stream(), stream_status) 468 469 # Remove only this stream from active set (NOT parents) 470 if stream_name in self._active_stream_names: 471 self._active_stream_names.discard(stream_name) 472 473 # Remove from active groups 474 stream_group = self._stream_block_simultaneous_read.get(stream_name, "") 475 if stream_group: 476 if stream_group in self._active_groups: 477 self._active_groups[stream_group].discard(stream_name) 478 if not self._active_groups[stream_group]: 479 del self._active_groups[stream_group] 480 self._logger.info( 481 f"Stream '{stream_name}' (group '{stream_group}') is no longer active. " 482 f"Blocked streams in the queue will be retried on next start_next_partition_generator call." 483 )
class
ConcurrentReadProcessor:
39class ConcurrentReadProcessor: 40 def __init__( 41 self, 42 stream_instances_to_read_from: List[AbstractStream], 43 partition_enqueuer: PartitionEnqueuer, 44 thread_pool_manager: ThreadPoolManager, 45 logger: logging.Logger, 46 slice_logger: SliceLogger, 47 message_repository: MessageRepository, 48 partition_reader: PartitionReader, 49 max_concurrent_partition_generators: Optional[int] = None, 50 ): 51 """ 52 This class is responsible for handling items from a concurrent stream read process. 53 :param stream_instances_to_read_from: List of streams to read from 54 :param partition_enqueuer: PartitionEnqueuer instance 55 :param thread_pool_manager: ThreadPoolManager instance 56 :param logger: Logger instance 57 :param slice_logger: SliceLogger instance 58 :param message_repository: MessageRepository instance 59 :param partition_reader: PartitionReader instance 60 :param max_concurrent_partition_generators: Maximum number of partition generators allowed 61 to run concurrently. None means no limit. When set, should be less than the number of 62 workers in multi-worker mode so at least one worker slot is always available for 63 partition reading, preventing thread pool starvation. In single-threaded mode 64 (num_workers=1) the value may equal num_workers; ConcurrentSource.create() handles 65 this distinction. ConcurrentSource.read() passes this value explicitly. 66 """ 67 self._stream_name_to_instance = {s.name: s for s in stream_instances_to_read_from} 68 self._record_counter = {} 69 self._streams_to_running_partitions: Dict[str, Set[Partition]] = {} 70 for stream in stream_instances_to_read_from: 71 self._streams_to_running_partitions[stream.name] = set() 72 self._record_counter[stream.name] = 0 73 if ( 74 max_concurrent_partition_generators is not None 75 and max_concurrent_partition_generators < 1 76 ): 77 raise ValueError( 78 f"max_concurrent_partition_generators must be >= 1 or None, got {max_concurrent_partition_generators}" 79 ) 80 self._thread_pool_manager = thread_pool_manager 81 self._partition_enqueuer = partition_enqueuer 82 self._max_concurrent_partition_generators = max_concurrent_partition_generators 83 self._stream_instances_to_start_partition_generation = stream_instances_to_read_from 84 self._streams_currently_generating_partitions: List[str] = [] 85 self._logger = logger 86 self._slice_logger = slice_logger 87 self._message_repository = message_repository 88 self._partition_reader = partition_reader 89 self._streams_done: Set[str] = set() 90 self._exceptions_per_stream_name: dict[str, List[Exception]] = {} 91 92 # Track which streams (by name) are currently active 93 # A stream is "active" if it's generating partitions or has partitions being read 94 self._active_stream_names: Set[str] = set() 95 96 # Store blocking group names for streams that require blocking simultaneous reads 97 # Maps stream name -> group name (empty string means no blocking) 98 self._stream_block_simultaneous_read: Dict[str, str] = { 99 stream.name: stream.block_simultaneous_read for stream in stream_instances_to_read_from 100 } 101 102 # Track which groups are currently active 103 # Maps group name -> set of stream names in that group 104 self._active_groups: Dict[str, Set[str]] = {} 105 106 for stream in stream_instances_to_read_from: 107 if stream.block_simultaneous_read: 108 self._logger.info( 109 f"Stream '{stream.name}' is in blocking group '{stream.block_simultaneous_read}'. " 110 f"Will defer starting this stream if another stream in the same group or its parents are active." 111 ) 112 113 def on_partition_generation_completed( 114 self, sentinel: PartitionGenerationCompletedSentinel 115 ) -> Iterable[AirbyteMessage]: 116 """ 117 This method is called when a partition generation is completed. 118 1. Remove the stream from the list of streams currently generating partitions 119 2. Deactivate parent streams (they were only needed for partition generation) 120 3. If the stream is done, mark it as such and return a stream status message 121 4. If there are more streams to read from, start the next partition generator 122 """ 123 stream_name = sentinel.stream.name 124 self._streams_currently_generating_partitions.remove(sentinel.stream.name) 125 126 # Deactivate all parent streams now that partition generation is complete 127 # Parents were only needed to generate slices, they can now be reused 128 parent_streams = self._collect_all_parent_stream_names(stream_name) 129 for parent_stream_name in parent_streams: 130 if parent_stream_name in self._active_stream_names: 131 self._logger.debug(f"Removing '{parent_stream_name}' from active streams") 132 self._active_stream_names.discard(parent_stream_name) 133 134 # Remove from active groups 135 parent_group = self._stream_block_simultaneous_read.get(parent_stream_name, "") 136 if parent_group: 137 if parent_group in self._active_groups: 138 self._active_groups[parent_group].discard(parent_stream_name) 139 if not self._active_groups[parent_group]: 140 del self._active_groups[parent_group] 141 self._logger.info( 142 f"Parent stream '{parent_stream_name}' (group '{parent_group}') deactivated after " 143 f"partition generation completed for child '{stream_name}'. " 144 f"Blocked streams in the queue will be retried on next start_next_partition_generator call." 145 ) 146 147 # It is possible for the stream to already be done if no partitions were generated 148 # If the partition generation process was completed and there are no partitions left to process, the stream is done 149 if ( 150 self._is_stream_done(stream_name) 151 or len(self._streams_to_running_partitions[stream_name]) == 0 152 ): 153 yield from self._on_stream_is_done(stream_name) 154 if self._stream_instances_to_start_partition_generation: 155 status_message = self.start_next_partition_generator() 156 if status_message: 157 yield status_message 158 159 def on_partition(self, partition: Partition) -> None: 160 """ 161 This method is called when a partition is generated. 162 1. Add the partition to the set of partitions for the stream 163 2. Log the slice if necessary 164 3. Submit the partition to the thread pool manager 165 """ 166 stream_name = partition.stream_name() 167 self._streams_to_running_partitions[stream_name].add(partition) 168 cursor = self._stream_name_to_instance[stream_name].cursor 169 if self._slice_logger.should_log_slice_message(self._logger): 170 self._message_repository.emit_message( 171 self._slice_logger.create_slice_log_message(partition.to_slice()) 172 ) 173 self._thread_pool_manager.submit( 174 self._partition_reader.process_partition, partition, cursor 175 ) 176 177 def on_partition_complete_sentinel( 178 self, sentinel: PartitionCompleteSentinel 179 ) -> Iterable[AirbyteMessage]: 180 """ 181 This method is called when a partition is completed. 182 1. Close the partition 183 2. If the stream is done, mark it as such and return a stream status message 184 3. Emit messages that were added to the message repository 185 4. If there are more streams to read from, start the next partition generator 186 """ 187 partition = sentinel.partition 188 189 partitions_running = self._streams_to_running_partitions[partition.stream_name()] 190 if partition in partitions_running: 191 partitions_running.remove(partition) 192 # If all partitions were generated and this was the last one, the stream is done 193 if ( 194 partition.stream_name() not in self._streams_currently_generating_partitions 195 and len(partitions_running) == 0 196 ): 197 yield from self._on_stream_is_done(partition.stream_name()) 198 # Try to start the next stream in the queue (may be a deferred stream) 199 if self._stream_instances_to_start_partition_generation: 200 status_message = self.start_next_partition_generator() 201 if status_message: 202 yield status_message 203 yield from self._message_repository.consume_queue() 204 205 def on_record(self, record: Record) -> Iterable[AirbyteMessage]: 206 """ 207 This method is called when a record is read from a partition. 208 1. Convert the record to an AirbyteMessage 209 2. If this is the first record for the stream, mark the stream as RUNNING 210 3. Increment the record counter for the stream 211 4. Ensures the cursor knows the record has been successfully emitted 212 5. Emit the message 213 6. Emit messages that were added to the message repository 214 """ 215 # Do not pass a transformer or a schema 216 # AbstractStreams are expected to return data as they are expected. 217 # Any transformation on the data should be done before reaching this point 218 message = stream_data_to_airbyte_message( 219 stream_name=record.stream_name, 220 data_or_message=record.data, 221 file_reference=record.file_reference, 222 ) 223 stream = self._stream_name_to_instance[record.stream_name] 224 225 if message.type == MessageType.RECORD: 226 if self._record_counter[stream.name] == 0: 227 self._logger.info(f"Marking stream {stream.name} as RUNNING") 228 yield stream_status_as_airbyte_message( 229 stream.as_airbyte_stream(), AirbyteStreamStatus.RUNNING 230 ) 231 self._record_counter[stream.name] += 1 232 yield message 233 yield from self._message_repository.consume_queue() 234 235 def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMessage]: 236 """ 237 This method is called when an exception is raised. 238 1. Stop all running streams 239 2. Raise the exception 240 """ 241 self._flag_exception(exception.stream_name, exception.exception) 242 self._logger.exception( 243 f"Exception while syncing stream {exception.stream_name}", exc_info=exception.exception 244 ) 245 246 stream_descriptor = StreamDescriptor(name=exception.stream_name) 247 if isinstance(exception.exception, AirbyteTracedException): 248 yield exception.exception.as_airbyte_message(stream_descriptor=stream_descriptor) 249 else: 250 yield AirbyteTracedException.from_exception( 251 exception.exception, 252 stream_descriptor=stream_descriptor, 253 message=f"An unexpected error occurred in stream {exception.stream_name}: {type(exception.exception).__name__}", 254 ).as_airbyte_message() 255 256 def _flag_exception(self, stream_name: str, exception: Exception) -> None: 257 self._exceptions_per_stream_name.setdefault(stream_name, []).append(exception) 258 259 def start_next_partition_generator(self) -> Optional[AirbyteMessage]: 260 """ 261 Submits the next partition generator to the thread pool. 262 263 A stream will be deferred (moved to end of queue) if: 264 1. The stream itself has block_simultaneous_read=True AND is already active 265 2. Any parent stream has block_simultaneous_read=True AND is currently active 266 267 This prevents simultaneous reads of streams that shouldn't be accessed concurrently. 268 269 :return: A status message if a partition generator was started, otherwise None 270 """ 271 if not self._stream_instances_to_start_partition_generation: 272 return None 273 274 # Enforce the concurrent generator cap so at least one worker slot is always available 275 # for partition reading. Recovery is guaranteed: on_partition_generation_completed 276 # decrements the count before calling here, so the guard always passes there. 277 if ( 278 self._max_concurrent_partition_generators is not None 279 and len(self._streams_currently_generating_partitions) 280 >= self._max_concurrent_partition_generators 281 ): 282 self._logger.debug( 283 f"Concurrent partition generator cap ({self._max_concurrent_partition_generators}) reached " 284 f"({len(self._streams_currently_generating_partitions)} active). Deferring next generator start." 285 ) 286 return None 287 288 # Remember initial queue size to avoid infinite loops if all streams are blocked 289 max_attempts = len(self._stream_instances_to_start_partition_generation) 290 attempts = 0 291 292 while self._stream_instances_to_start_partition_generation and attempts < max_attempts: 293 attempts += 1 294 295 # Pop the first stream from the queue 296 stream = self._stream_instances_to_start_partition_generation.pop(0) 297 stream_name = stream.name 298 stream_group = self._stream_block_simultaneous_read.get(stream_name, "") 299 300 # Check if this stream has a blocking group and is already active as parent stream 301 # (i.e. being read from during partition generation for another stream) 302 if stream_group and stream_name in self._active_stream_names: 303 # Add back to the END of the queue for retry later 304 self._stream_instances_to_start_partition_generation.append(stream) 305 self._logger.info( 306 f"Deferring stream '{stream_name}' (group '{stream_group}') because it's already active. Trying next stream." 307 ) 308 continue # Try the next stream in the queue 309 310 # Check if this stream's group is already active (another stream in the same group is running) 311 if ( 312 stream_group 313 and stream_group in self._active_groups 314 and self._active_groups[stream_group] 315 ): 316 # Add back to the END of the queue for retry later 317 self._stream_instances_to_start_partition_generation.append(stream) 318 active_streams_in_group = self._active_groups[stream_group] 319 self._logger.info( 320 f"Deferring stream '{stream_name}' (group '{stream_group}') because other stream(s) " 321 f"{active_streams_in_group} in the same group are active. Trying next stream." 322 ) 323 continue # Try the next stream in the queue 324 325 # Check if any parent streams have a blocking group and are currently active 326 parent_streams = self._collect_all_parent_stream_names(stream_name) 327 blocked_by_parents = [ 328 p 329 for p in parent_streams 330 if self._stream_block_simultaneous_read.get(p, "") 331 and p in self._active_stream_names 332 ] 333 334 if blocked_by_parents: 335 # Add back to the END of the queue for retry later 336 self._stream_instances_to_start_partition_generation.append(stream) 337 parent_groups = { 338 self._stream_block_simultaneous_read.get(p, "") for p in blocked_by_parents 339 } 340 self._logger.info( 341 f"Deferring stream '{stream_name}' because parent stream(s) " 342 f"{blocked_by_parents} (groups {parent_groups}) are active. Trying next stream." 343 ) 344 continue # Try the next stream in the queue 345 346 # No blocking - start this stream 347 # Mark stream as active before starting 348 self._active_stream_names.add(stream_name) 349 self._streams_currently_generating_partitions.append(stream_name) 350 351 # Track this stream in its group if it has one 352 if stream_group: 353 if stream_group not in self._active_groups: 354 self._active_groups[stream_group] = set() 355 self._active_groups[stream_group].add(stream_name) 356 self._logger.debug(f"Added '{stream_name}' to active group '{stream_group}'") 357 358 # Also mark all parent streams as active (they will be read from during partition generation) 359 for parent_stream_name in parent_streams: 360 parent_group = self._stream_block_simultaneous_read.get(parent_stream_name, "") 361 if parent_group: 362 self._active_stream_names.add(parent_stream_name) 363 if parent_group not in self._active_groups: 364 self._active_groups[parent_group] = set() 365 self._active_groups[parent_group].add(parent_stream_name) 366 self._logger.info( 367 f"Marking parent stream '{parent_stream_name}' (group '{parent_group}') as active " 368 f"(will be read during partition generation for '{stream_name}')" 369 ) 370 371 self._thread_pool_manager.submit(self._partition_enqueuer.generate_partitions, stream) 372 self._logger.info(f"Marking stream {stream_name} as STARTED") 373 self._logger.info(f"Syncing stream: {stream_name}") 374 return stream_status_as_airbyte_message( 375 stream.as_airbyte_stream(), 376 AirbyteStreamStatus.STARTED, 377 ) 378 379 # All streams in the queue are currently blocked 380 return None 381 382 def is_done(self) -> bool: 383 """ 384 This method is called to check if the sync is done. 385 The sync is done when: 386 1. There are no more streams generating partitions 387 2. There are no more streams to read from 388 3. All partitions for all streams are closed 389 """ 390 is_done = all( 391 [ 392 self._is_stream_done(stream_name) 393 for stream_name in self._stream_name_to_instance.keys() 394 ] 395 ) 396 if is_done and self._stream_instances_to_start_partition_generation: 397 stuck_stream_names = [ 398 s.name for s in self._stream_instances_to_start_partition_generation 399 ] 400 raise AirbyteTracedException( 401 message="Partition generation queue is not empty after all streams completed.", 402 internal_message=f"Streams {stuck_stream_names} remained in the partition generation queue after all streams were marked done.", 403 failure_type=FailureType.system_error, 404 ) 405 if is_done and self._active_groups: 406 raise AirbyteTracedException( 407 message="Active stream groups are not empty after all streams completed.", 408 internal_message=f"Groups {dict(self._active_groups)} still active after all streams were marked done.", 409 failure_type=FailureType.system_error, 410 ) 411 if is_done and self._exceptions_per_stream_name: 412 error_message = generate_failed_streams_error_message(self._exceptions_per_stream_name) 413 self._logger.info(error_message) 414 # We still raise at least one exception when a stream raises an exception because the platform currently relies 415 # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error 416 # type because this combined error isn't actionable, but rather the previously emitted individual errors. 417 raise AirbyteTracedException( 418 message=error_message, 419 internal_message="Concurrent read failure", 420 failure_type=FailureType.config_error, 421 ) 422 return is_done 423 424 def _is_stream_done(self, stream_name: str) -> bool: 425 return stream_name in self._streams_done 426 427 def _collect_all_parent_stream_names(self, stream_name: str) -> Set[str]: 428 """Recursively collect all parent stream names for a given stream. 429 430 For example, if we have: epics -> issues -> comments 431 Then for comments, this returns {issues, epics}. 432 """ 433 parent_names: Set[str] = set() 434 stream = self._stream_name_to_instance.get(stream_name) 435 436 if not stream: 437 return parent_names 438 439 partition_router = ( 440 stream.get_partition_router() if isinstance(stream, DefaultStream) else None 441 ) 442 if isinstance(partition_router, GroupingPartitionRouter): 443 partition_router = partition_router.underlying_partition_router 444 445 if isinstance(partition_router, SubstreamPartitionRouter): 446 for parent_config in partition_router.parent_stream_configs: 447 parent_name = parent_config.stream.name 448 parent_names.add(parent_name) 449 parent_names.update(self._collect_all_parent_stream_names(parent_name)) 450 451 return parent_names 452 453 def _on_stream_is_done(self, stream_name: str) -> Iterable[AirbyteMessage]: 454 self._logger.info( 455 f"Read {self._record_counter[stream_name]} records from {stream_name} stream" 456 ) 457 self._logger.info(f"Marking stream {stream_name} as STOPPED") 458 stream = self._stream_name_to_instance[stream_name] 459 stream.cursor.ensure_at_least_one_state_emitted() 460 yield from self._message_repository.consume_queue() 461 self._logger.info(f"Finished syncing {stream.name}") 462 self._streams_done.add(stream_name) 463 stream_status = ( 464 AirbyteStreamStatus.INCOMPLETE 465 if self._exceptions_per_stream_name.get(stream_name, []) 466 else AirbyteStreamStatus.COMPLETE 467 ) 468 yield stream_status_as_airbyte_message(stream.as_airbyte_stream(), stream_status) 469 470 # Remove only this stream from active set (NOT parents) 471 if stream_name in self._active_stream_names: 472 self._active_stream_names.discard(stream_name) 473 474 # Remove from active groups 475 stream_group = self._stream_block_simultaneous_read.get(stream_name, "") 476 if stream_group: 477 if stream_group in self._active_groups: 478 self._active_groups[stream_group].discard(stream_name) 479 if not self._active_groups[stream_group]: 480 del self._active_groups[stream_group] 481 self._logger.info( 482 f"Stream '{stream_name}' (group '{stream_group}') is no longer active. " 483 f"Blocked streams in the queue will be retried on next start_next_partition_generator call." 484 )
ConcurrentReadProcessor( stream_instances_to_read_from: List[airbyte_cdk.sources.streams.concurrent.abstract_stream.AbstractStream], partition_enqueuer: airbyte_cdk.sources.streams.concurrent.partition_enqueuer.PartitionEnqueuer, thread_pool_manager: airbyte_cdk.sources.concurrent_source.thread_pool_manager.ThreadPoolManager, logger: logging.Logger, slice_logger: airbyte_cdk.sources.utils.slice_logger.SliceLogger, message_repository: airbyte_cdk.MessageRepository, partition_reader: airbyte_cdk.sources.streams.concurrent.partition_reader.PartitionReader, max_concurrent_partition_generators: Optional[int] = None)
40 def __init__( 41 self, 42 stream_instances_to_read_from: List[AbstractStream], 43 partition_enqueuer: PartitionEnqueuer, 44 thread_pool_manager: ThreadPoolManager, 45 logger: logging.Logger, 46 slice_logger: SliceLogger, 47 message_repository: MessageRepository, 48 partition_reader: PartitionReader, 49 max_concurrent_partition_generators: Optional[int] = None, 50 ): 51 """ 52 This class is responsible for handling items from a concurrent stream read process. 53 :param stream_instances_to_read_from: List of streams to read from 54 :param partition_enqueuer: PartitionEnqueuer instance 55 :param thread_pool_manager: ThreadPoolManager instance 56 :param logger: Logger instance 57 :param slice_logger: SliceLogger instance 58 :param message_repository: MessageRepository instance 59 :param partition_reader: PartitionReader instance 60 :param max_concurrent_partition_generators: Maximum number of partition generators allowed 61 to run concurrently. None means no limit. When set, should be less than the number of 62 workers in multi-worker mode so at least one worker slot is always available for 63 partition reading, preventing thread pool starvation. In single-threaded mode 64 (num_workers=1) the value may equal num_workers; ConcurrentSource.create() handles 65 this distinction. ConcurrentSource.read() passes this value explicitly. 66 """ 67 self._stream_name_to_instance = {s.name: s for s in stream_instances_to_read_from} 68 self._record_counter = {} 69 self._streams_to_running_partitions: Dict[str, Set[Partition]] = {} 70 for stream in stream_instances_to_read_from: 71 self._streams_to_running_partitions[stream.name] = set() 72 self._record_counter[stream.name] = 0 73 if ( 74 max_concurrent_partition_generators is not None 75 and max_concurrent_partition_generators < 1 76 ): 77 raise ValueError( 78 f"max_concurrent_partition_generators must be >= 1 or None, got {max_concurrent_partition_generators}" 79 ) 80 self._thread_pool_manager = thread_pool_manager 81 self._partition_enqueuer = partition_enqueuer 82 self._max_concurrent_partition_generators = max_concurrent_partition_generators 83 self._stream_instances_to_start_partition_generation = stream_instances_to_read_from 84 self._streams_currently_generating_partitions: List[str] = [] 85 self._logger = logger 86 self._slice_logger = slice_logger 87 self._message_repository = message_repository 88 self._partition_reader = partition_reader 89 self._streams_done: Set[str] = set() 90 self._exceptions_per_stream_name: dict[str, List[Exception]] = {} 91 92 # Track which streams (by name) are currently active 93 # A stream is "active" if it's generating partitions or has partitions being read 94 self._active_stream_names: Set[str] = set() 95 96 # Store blocking group names for streams that require blocking simultaneous reads 97 # Maps stream name -> group name (empty string means no blocking) 98 self._stream_block_simultaneous_read: Dict[str, str] = { 99 stream.name: stream.block_simultaneous_read for stream in stream_instances_to_read_from 100 } 101 102 # Track which groups are currently active 103 # Maps group name -> set of stream names in that group 104 self._active_groups: Dict[str, Set[str]] = {} 105 106 for stream in stream_instances_to_read_from: 107 if stream.block_simultaneous_read: 108 self._logger.info( 109 f"Stream '{stream.name}' is in blocking group '{stream.block_simultaneous_read}'. " 110 f"Will defer starting this stream if another stream in the same group or its parents are active." 111 )
This class is responsible for handling items from a concurrent stream read process.
Parameters
- stream_instances_to_read_from: List of streams to read from
- partition_enqueuer: PartitionEnqueuer instance
- thread_pool_manager: ThreadPoolManager instance
- logger: Logger instance
- slice_logger: SliceLogger instance
- message_repository: MessageRepository instance
- partition_reader: PartitionReader instance
- max_concurrent_partition_generators: Maximum number of partition generators allowed to run concurrently. None means no limit. When set, should be less than the number of workers in multi-worker mode so at least one worker slot is always available for partition reading, preventing thread pool starvation. In single-threaded mode (num_workers=1) the value may equal num_workers; ConcurrentSource.create() handles this distinction. ConcurrentSource.read() passes this value explicitly.
def
on_partition_generation_completed( self, sentinel: airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel.PartitionGenerationCompletedSentinel) -> Iterable[airbyte_cdk.AirbyteMessage]:
113 def on_partition_generation_completed( 114 self, sentinel: PartitionGenerationCompletedSentinel 115 ) -> Iterable[AirbyteMessage]: 116 """ 117 This method is called when a partition generation is completed. 118 1. Remove the stream from the list of streams currently generating partitions 119 2. Deactivate parent streams (they were only needed for partition generation) 120 3. If the stream is done, mark it as such and return a stream status message 121 4. If there are more streams to read from, start the next partition generator 122 """ 123 stream_name = sentinel.stream.name 124 self._streams_currently_generating_partitions.remove(sentinel.stream.name) 125 126 # Deactivate all parent streams now that partition generation is complete 127 # Parents were only needed to generate slices, they can now be reused 128 parent_streams = self._collect_all_parent_stream_names(stream_name) 129 for parent_stream_name in parent_streams: 130 if parent_stream_name in self._active_stream_names: 131 self._logger.debug(f"Removing '{parent_stream_name}' from active streams") 132 self._active_stream_names.discard(parent_stream_name) 133 134 # Remove from active groups 135 parent_group = self._stream_block_simultaneous_read.get(parent_stream_name, "") 136 if parent_group: 137 if parent_group in self._active_groups: 138 self._active_groups[parent_group].discard(parent_stream_name) 139 if not self._active_groups[parent_group]: 140 del self._active_groups[parent_group] 141 self._logger.info( 142 f"Parent stream '{parent_stream_name}' (group '{parent_group}') deactivated after " 143 f"partition generation completed for child '{stream_name}'. " 144 f"Blocked streams in the queue will be retried on next start_next_partition_generator call." 145 ) 146 147 # It is possible for the stream to already be done if no partitions were generated 148 # If the partition generation process was completed and there are no partitions left to process, the stream is done 149 if ( 150 self._is_stream_done(stream_name) 151 or len(self._streams_to_running_partitions[stream_name]) == 0 152 ): 153 yield from self._on_stream_is_done(stream_name) 154 if self._stream_instances_to_start_partition_generation: 155 status_message = self.start_next_partition_generator() 156 if status_message: 157 yield status_message
This method is called when a partition generation is completed.
- Remove the stream from the list of streams currently generating partitions
- Deactivate parent streams (they were only needed for partition generation)
- If the stream is done, mark it as such and return a stream status message
- If there are more streams to read from, start the next partition generator
def
on_partition( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition) -> None:
159 def on_partition(self, partition: Partition) -> None: 160 """ 161 This method is called when a partition is generated. 162 1. Add the partition to the set of partitions for the stream 163 2. Log the slice if necessary 164 3. Submit the partition to the thread pool manager 165 """ 166 stream_name = partition.stream_name() 167 self._streams_to_running_partitions[stream_name].add(partition) 168 cursor = self._stream_name_to_instance[stream_name].cursor 169 if self._slice_logger.should_log_slice_message(self._logger): 170 self._message_repository.emit_message( 171 self._slice_logger.create_slice_log_message(partition.to_slice()) 172 ) 173 self._thread_pool_manager.submit( 174 self._partition_reader.process_partition, partition, cursor 175 )
This method is called when a partition is generated.
- Add the partition to the set of partitions for the stream
- Log the slice if necessary
- Submit the partition to the thread pool manager
def
on_partition_complete_sentinel( self, sentinel: airbyte_cdk.sources.streams.concurrent.partitions.types.PartitionCompleteSentinel) -> Iterable[airbyte_cdk.AirbyteMessage]:
177 def on_partition_complete_sentinel( 178 self, sentinel: PartitionCompleteSentinel 179 ) -> Iterable[AirbyteMessage]: 180 """ 181 This method is called when a partition is completed. 182 1. Close the partition 183 2. If the stream is done, mark it as such and return a stream status message 184 3. Emit messages that were added to the message repository 185 4. If there are more streams to read from, start the next partition generator 186 """ 187 partition = sentinel.partition 188 189 partitions_running = self._streams_to_running_partitions[partition.stream_name()] 190 if partition in partitions_running: 191 partitions_running.remove(partition) 192 # If all partitions were generated and this was the last one, the stream is done 193 if ( 194 partition.stream_name() not in self._streams_currently_generating_partitions 195 and len(partitions_running) == 0 196 ): 197 yield from self._on_stream_is_done(partition.stream_name()) 198 # Try to start the next stream in the queue (may be a deferred stream) 199 if self._stream_instances_to_start_partition_generation: 200 status_message = self.start_next_partition_generator() 201 if status_message: 202 yield status_message 203 yield from self._message_repository.consume_queue()
This method is called when a partition is completed.
- Close the partition
- If the stream is done, mark it as such and return a stream status message
- Emit messages that were added to the message repository
- If there are more streams to read from, start the next partition generator
205 def on_record(self, record: Record) -> Iterable[AirbyteMessage]: 206 """ 207 This method is called when a record is read from a partition. 208 1. Convert the record to an AirbyteMessage 209 2. If this is the first record for the stream, mark the stream as RUNNING 210 3. Increment the record counter for the stream 211 4. Ensures the cursor knows the record has been successfully emitted 212 5. Emit the message 213 6. Emit messages that were added to the message repository 214 """ 215 # Do not pass a transformer or a schema 216 # AbstractStreams are expected to return data as they are expected. 217 # Any transformation on the data should be done before reaching this point 218 message = stream_data_to_airbyte_message( 219 stream_name=record.stream_name, 220 data_or_message=record.data, 221 file_reference=record.file_reference, 222 ) 223 stream = self._stream_name_to_instance[record.stream_name] 224 225 if message.type == MessageType.RECORD: 226 if self._record_counter[stream.name] == 0: 227 self._logger.info(f"Marking stream {stream.name} as RUNNING") 228 yield stream_status_as_airbyte_message( 229 stream.as_airbyte_stream(), AirbyteStreamStatus.RUNNING 230 ) 231 self._record_counter[stream.name] += 1 232 yield message 233 yield from self._message_repository.consume_queue()
This method is called when a record is read from a partition.
- Convert the record to an AirbyteMessage
- If this is the first record for the stream, mark the stream as RUNNING
- Increment the record counter for the stream
- Ensures the cursor knows the record has been successfully emitted
- Emit the message
- Emit messages that were added to the message repository
def
on_exception( self, exception: airbyte_cdk.sources.concurrent_source.stream_thread_exception.StreamThreadException) -> Iterable[airbyte_cdk.AirbyteMessage]:
235 def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMessage]: 236 """ 237 This method is called when an exception is raised. 238 1. Stop all running streams 239 2. Raise the exception 240 """ 241 self._flag_exception(exception.stream_name, exception.exception) 242 self._logger.exception( 243 f"Exception while syncing stream {exception.stream_name}", exc_info=exception.exception 244 ) 245 246 stream_descriptor = StreamDescriptor(name=exception.stream_name) 247 if isinstance(exception.exception, AirbyteTracedException): 248 yield exception.exception.as_airbyte_message(stream_descriptor=stream_descriptor) 249 else: 250 yield AirbyteTracedException.from_exception( 251 exception.exception, 252 stream_descriptor=stream_descriptor, 253 message=f"An unexpected error occurred in stream {exception.stream_name}: {type(exception.exception).__name__}", 254 ).as_airbyte_message()
This method is called when an exception is raised.
- Stop all running streams
- Raise the exception
259 def start_next_partition_generator(self) -> Optional[AirbyteMessage]: 260 """ 261 Submits the next partition generator to the thread pool. 262 263 A stream will be deferred (moved to end of queue) if: 264 1. The stream itself has block_simultaneous_read=True AND is already active 265 2. Any parent stream has block_simultaneous_read=True AND is currently active 266 267 This prevents simultaneous reads of streams that shouldn't be accessed concurrently. 268 269 :return: A status message if a partition generator was started, otherwise None 270 """ 271 if not self._stream_instances_to_start_partition_generation: 272 return None 273 274 # Enforce the concurrent generator cap so at least one worker slot is always available 275 # for partition reading. Recovery is guaranteed: on_partition_generation_completed 276 # decrements the count before calling here, so the guard always passes there. 277 if ( 278 self._max_concurrent_partition_generators is not None 279 and len(self._streams_currently_generating_partitions) 280 >= self._max_concurrent_partition_generators 281 ): 282 self._logger.debug( 283 f"Concurrent partition generator cap ({self._max_concurrent_partition_generators}) reached " 284 f"({len(self._streams_currently_generating_partitions)} active). Deferring next generator start." 285 ) 286 return None 287 288 # Remember initial queue size to avoid infinite loops if all streams are blocked 289 max_attempts = len(self._stream_instances_to_start_partition_generation) 290 attempts = 0 291 292 while self._stream_instances_to_start_partition_generation and attempts < max_attempts: 293 attempts += 1 294 295 # Pop the first stream from the queue 296 stream = self._stream_instances_to_start_partition_generation.pop(0) 297 stream_name = stream.name 298 stream_group = self._stream_block_simultaneous_read.get(stream_name, "") 299 300 # Check if this stream has a blocking group and is already active as parent stream 301 # (i.e. being read from during partition generation for another stream) 302 if stream_group and stream_name in self._active_stream_names: 303 # Add back to the END of the queue for retry later 304 self._stream_instances_to_start_partition_generation.append(stream) 305 self._logger.info( 306 f"Deferring stream '{stream_name}' (group '{stream_group}') because it's already active. Trying next stream." 307 ) 308 continue # Try the next stream in the queue 309 310 # Check if this stream's group is already active (another stream in the same group is running) 311 if ( 312 stream_group 313 and stream_group in self._active_groups 314 and self._active_groups[stream_group] 315 ): 316 # Add back to the END of the queue for retry later 317 self._stream_instances_to_start_partition_generation.append(stream) 318 active_streams_in_group = self._active_groups[stream_group] 319 self._logger.info( 320 f"Deferring stream '{stream_name}' (group '{stream_group}') because other stream(s) " 321 f"{active_streams_in_group} in the same group are active. Trying next stream." 322 ) 323 continue # Try the next stream in the queue 324 325 # Check if any parent streams have a blocking group and are currently active 326 parent_streams = self._collect_all_parent_stream_names(stream_name) 327 blocked_by_parents = [ 328 p 329 for p in parent_streams 330 if self._stream_block_simultaneous_read.get(p, "") 331 and p in self._active_stream_names 332 ] 333 334 if blocked_by_parents: 335 # Add back to the END of the queue for retry later 336 self._stream_instances_to_start_partition_generation.append(stream) 337 parent_groups = { 338 self._stream_block_simultaneous_read.get(p, "") for p in blocked_by_parents 339 } 340 self._logger.info( 341 f"Deferring stream '{stream_name}' because parent stream(s) " 342 f"{blocked_by_parents} (groups {parent_groups}) are active. Trying next stream." 343 ) 344 continue # Try the next stream in the queue 345 346 # No blocking - start this stream 347 # Mark stream as active before starting 348 self._active_stream_names.add(stream_name) 349 self._streams_currently_generating_partitions.append(stream_name) 350 351 # Track this stream in its group if it has one 352 if stream_group: 353 if stream_group not in self._active_groups: 354 self._active_groups[stream_group] = set() 355 self._active_groups[stream_group].add(stream_name) 356 self._logger.debug(f"Added '{stream_name}' to active group '{stream_group}'") 357 358 # Also mark all parent streams as active (they will be read from during partition generation) 359 for parent_stream_name in parent_streams: 360 parent_group = self._stream_block_simultaneous_read.get(parent_stream_name, "") 361 if parent_group: 362 self._active_stream_names.add(parent_stream_name) 363 if parent_group not in self._active_groups: 364 self._active_groups[parent_group] = set() 365 self._active_groups[parent_group].add(parent_stream_name) 366 self._logger.info( 367 f"Marking parent stream '{parent_stream_name}' (group '{parent_group}') as active " 368 f"(will be read during partition generation for '{stream_name}')" 369 ) 370 371 self._thread_pool_manager.submit(self._partition_enqueuer.generate_partitions, stream) 372 self._logger.info(f"Marking stream {stream_name} as STARTED") 373 self._logger.info(f"Syncing stream: {stream_name}") 374 return stream_status_as_airbyte_message( 375 stream.as_airbyte_stream(), 376 AirbyteStreamStatus.STARTED, 377 ) 378 379 # All streams in the queue are currently blocked 380 return None
Submits the next partition generator to the thread pool.
A stream will be deferred (moved to end of queue) if:
- The stream itself has block_simultaneous_read=True AND is already active
- Any parent stream has block_simultaneous_read=True AND is currently active
This prevents simultaneous reads of streams that shouldn't be accessed concurrently.
Returns
A status message if a partition generator was started, otherwise None
def
is_done(self) -> bool:
382 def is_done(self) -> bool: 383 """ 384 This method is called to check if the sync is done. 385 The sync is done when: 386 1. There are no more streams generating partitions 387 2. There are no more streams to read from 388 3. All partitions for all streams are closed 389 """ 390 is_done = all( 391 [ 392 self._is_stream_done(stream_name) 393 for stream_name in self._stream_name_to_instance.keys() 394 ] 395 ) 396 if is_done and self._stream_instances_to_start_partition_generation: 397 stuck_stream_names = [ 398 s.name for s in self._stream_instances_to_start_partition_generation 399 ] 400 raise AirbyteTracedException( 401 message="Partition generation queue is not empty after all streams completed.", 402 internal_message=f"Streams {stuck_stream_names} remained in the partition generation queue after all streams were marked done.", 403 failure_type=FailureType.system_error, 404 ) 405 if is_done and self._active_groups: 406 raise AirbyteTracedException( 407 message="Active stream groups are not empty after all streams completed.", 408 internal_message=f"Groups {dict(self._active_groups)} still active after all streams were marked done.", 409 failure_type=FailureType.system_error, 410 ) 411 if is_done and self._exceptions_per_stream_name: 412 error_message = generate_failed_streams_error_message(self._exceptions_per_stream_name) 413 self._logger.info(error_message) 414 # We still raise at least one exception when a stream raises an exception because the platform currently relies 415 # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error 416 # type because this combined error isn't actionable, but rather the previously emitted individual errors. 417 raise AirbyteTracedException( 418 message=error_message, 419 internal_message="Concurrent read failure", 420 failure_type=FailureType.config_error, 421 ) 422 return is_done
This method is called to check if the sync is done. The sync is done when:
- There are no more streams generating partitions
- There are no more streams to read from
- All partitions for all streams are closed