airbyte_cdk.sources.declarative.incremental
96class ConcurrentCursorFactory: 97 def __init__(self, create_function: Callable[..., ConcurrentCursor]): 98 self._create_function = create_function 99 100 def create( 101 self, stream_state: Mapping[str, Any], runtime_lookback_window: Optional[timedelta] 102 ) -> ConcurrentCursor: 103 return self._create_function( 104 stream_state=stream_state, runtime_lookback_window=runtime_lookback_window 105 )
108class ConcurrentPerPartitionCursor(Cursor): 109 """ 110 Manages state per partition when a stream has many partitions, preventing data loss or duplication. 111 112 Attributes: 113 DEFAULT_MAX_PARTITIONS_NUMBER (int): Maximum number of partitions to retain in memory (default is 10,000). This limit needs to be higher than the number of threads we might enqueue (which is represented by ThreadPoolManager.DEFAULT_MAX_QUEUE_SIZE). If not, we could have partitions that have been generated and submitted to the ThreadPool but got deleted from the ConcurrentPerPartitionCursor and when closing them, it will generate KeyError. 114 115 - **Partition Limitation Logic** 116 Ensures the number of tracked partitions does not exceed the specified limit to prevent memory overuse. Oldest partitions are removed when the limit is reached. 117 118 - **Global Cursor Fallback** 119 New partitions use global state as the initial state to progress the state for deleted or new partitions. The history data added after the initial sync will be missing. 120 121 CurrentPerPartitionCursor expects the state of the ConcurrentCursor to follow the format {cursor_field: cursor_value}. 122 """ 123 124 DEFAULT_MAX_PARTITIONS_NUMBER = 25_000 125 SWITCH_TO_GLOBAL_LIMIT = 10_000 126 _NO_STATE: Mapping[str, Any] = {} 127 _NO_CURSOR_STATE: Mapping[str, Any] = {} 128 _GLOBAL_STATE_KEY = "state" 129 _PERPARTITION_STATE_KEY = "states" 130 _IS_PARTITION_DUPLICATION_LOGGED = False 131 _PARENT_STATE = 0 132 _GENERATION_SEQUENCE = 1 133 134 def __init__( 135 self, 136 cursor_factory: ConcurrentCursorFactory, 137 partition_router: PartitionRouter, 138 stream_name: str, 139 stream_namespace: Optional[str], 140 stream_state: Any, 141 message_repository: MessageRepository, 142 connector_state_manager: ConnectorStateManager, 143 connector_state_converter: AbstractStreamStateConverter, 144 cursor_field: CursorField, 145 use_global_cursor: bool = False, 146 attempt_to_create_cursor_if_not_provided: bool = False, 147 ) -> None: 148 self._global_cursor: Optional[StreamState] = {} 149 self._stream_name = stream_name 150 self._stream_namespace = stream_namespace 151 self._message_repository = message_repository 152 self._connector_state_manager = connector_state_manager 153 self._connector_state_converter = connector_state_converter 154 self._cursor_field = cursor_field 155 156 self._cursor_factory = cursor_factory # self._cursor_factory is flagged as private but is used in model_to_component_factory to ease pagination reset instantiation 157 self._partition_router = partition_router 158 159 # The dict is ordered to ensure that once the maximum number of partitions is reached, 160 # the oldest partitions can be efficiently removed, maintaining the most recent partitions. 161 self._cursor_per_partition: OrderedDict[str, ConcurrentCursor] = OrderedDict() 162 self._semaphore_per_partition: OrderedDict[str, threading.Semaphore] = OrderedDict() 163 164 # Parent-state tracking: store each partition’s parent state in creation order 165 self._partition_parent_state_map: OrderedDict[str, tuple[Mapping[str, Any], int]] = ( 166 OrderedDict() 167 ) 168 self._parent_state: Optional[StreamState] = None 169 170 # Tracks when the last slice for partition is emitted 171 self._partitions_done_generating_stream_slices: set[str] = set() 172 # Used to track the index of partitions that are not closed yet 173 self._processing_partitions_indexes: List[int] = list() 174 self._generated_partitions_count: int = 0 175 # Dictionary to map partition keys to their index 176 self._partition_key_to_index: dict[str, int] = {} 177 178 self._lock = threading.Lock() 179 self._lookback_window: int = 0 180 self._new_global_cursor: Optional[StreamState] = None 181 self._number_of_partitions: int = 0 182 self._use_global_cursor: bool = use_global_cursor 183 self._partition_serializer = PerPartitionKeySerializer() 184 185 # Track the last time a state message was emitted 186 self._last_emission_time: float = 0.0 187 self._timer = Timer() 188 189 self._set_initial_state(stream_state) 190 191 # FIXME this is a temporary field the time of the migration from declarative cursors to concurrent ones 192 self._attempt_to_create_cursor_if_not_provided = attempt_to_create_cursor_if_not_provided 193 self._synced_some_data = False 194 self._logged_regarding_datetime_format_error = False 195 196 @property 197 def cursor_field(self) -> CursorField: 198 return self._cursor_field 199 200 @property 201 def state(self) -> MutableMapping[str, Any]: 202 state: dict[str, Any] = {"use_global_cursor": self._use_global_cursor} 203 if not self._use_global_cursor: 204 states = [] 205 for partition_tuple, cursor in self._cursor_per_partition.items(): 206 if cursor.state: 207 states.append( 208 { 209 "partition": self._to_dict(partition_tuple), 210 "cursor": copy.deepcopy(cursor.state), 211 } 212 ) 213 state[self._PERPARTITION_STATE_KEY] = states 214 215 if self._global_cursor: 216 state[self._GLOBAL_STATE_KEY] = self._global_cursor 217 if self._lookback_window is not None: 218 state["lookback_window"] = self._lookback_window 219 if self._parent_state is not None: 220 state["parent_state"] = self._parent_state 221 return state 222 223 def close_partition(self, partition: Partition) -> None: 224 # Attempt to retrieve the stream slice 225 stream_slice: Optional[StreamSlice] = partition.to_slice() # type: ignore[assignment] 226 227 # Ensure stream_slice is not None 228 if stream_slice is None: 229 raise ValueError("stream_slice cannot be None") 230 231 partition_key = self._to_partition_key(stream_slice.partition) 232 with self._lock: 233 self._semaphore_per_partition[partition_key].acquire() 234 if not self._use_global_cursor: 235 cursor = self._cursor_per_partition[partition_key] 236 cursor.close_partition(partition=partition) 237 if ( 238 partition_key in self._partitions_done_generating_stream_slices 239 and self._semaphore_per_partition[partition_key]._value == 0 240 ): 241 self._update_global_cursor(cursor.state[self.cursor_field.cursor_field_key]) 242 243 # Clean up the partition if it is fully processed 244 self._cleanup_if_done(partition_key) 245 246 self._check_and_update_parent_state() 247 248 self._emit_state_message() 249 250 def _check_and_update_parent_state(self) -> None: 251 last_closed_state = None 252 253 while self._partition_parent_state_map: 254 earliest_key, (candidate_state, candidate_seq) = next( 255 iter(self._partition_parent_state_map.items()) 256 ) 257 258 # if any partition that started <= candidate_seq is still open, we must wait 259 if ( 260 self._processing_partitions_indexes 261 and self._processing_partitions_indexes[0] <= candidate_seq 262 ): 263 break 264 265 # safe to pop 266 self._partition_parent_state_map.popitem(last=False) 267 last_closed_state = candidate_state 268 269 if last_closed_state is not None: 270 self._parent_state = last_closed_state 271 272 def ensure_at_least_one_state_emitted(self) -> None: 273 """ 274 The platform expects at least one state message on successful syncs. Hence, whatever happens, we expect this method to be 275 called. 276 """ 277 if not any( 278 semaphore_item[1]._value for semaphore_item in self._semaphore_per_partition.items() 279 ): 280 if self._synced_some_data: 281 # we only update those if we actually synced some data 282 self._global_cursor = self._new_global_cursor 283 self._lookback_window = self._timer.finish() 284 self._parent_state = self._partition_router.get_stream_state() 285 self._emit_state_message(throttle=False) 286 287 def _throttle_state_message(self) -> Optional[float]: 288 """ 289 Throttles the state message emission to once every 600 seconds. 290 """ 291 current_time = time.time() 292 if current_time - self._last_emission_time <= 600: 293 return None 294 return current_time 295 296 def _emit_state_message(self, throttle: bool = True) -> None: 297 if throttle: 298 current_time = self._throttle_state_message() 299 if current_time is None: 300 return 301 self._last_emission_time = current_time 302 # Skip state emit for global cursor if parent state is empty 303 if self._use_global_cursor and not self._parent_state: 304 return 305 306 self._connector_state_manager.update_state_for_stream( 307 self._stream_name, 308 self._stream_namespace, 309 self.state, 310 ) 311 state_message = self._connector_state_manager.create_state_message( 312 self._stream_name, self._stream_namespace 313 ) 314 self._message_repository.emit_message(state_message) 315 316 def stream_slices(self) -> Iterable[StreamSlice]: 317 if self._timer.is_running(): 318 raise RuntimeError("stream_slices has been executed more than once.") 319 320 slices = self._partition_router.stream_slices() 321 self._timer.start() 322 for partition, last, parent_state in iterate_with_last_flag_and_state( 323 slices, self._partition_router.get_stream_state 324 ): 325 yield from self._generate_slices_from_partition(partition, parent_state) 326 327 def _generate_slices_from_partition( 328 self, partition: StreamSlice, parent_state: Mapping[str, Any] 329 ) -> Iterable[StreamSlice]: 330 # Ensure the maximum number of partitions is not exceeded 331 self._ensure_partition_limit() 332 333 partition_key = self._to_partition_key(partition.partition) 334 335 cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition)) 336 if not cursor: 337 cursor = self._create_cursor( 338 self._global_cursor, 339 self._lookback_window if self._global_cursor else 0, 340 ) 341 with self._lock: 342 self._number_of_partitions += 1 343 self._cursor_per_partition[partition_key] = cursor 344 345 if partition_key in self._semaphore_per_partition: 346 if not self._IS_PARTITION_DUPLICATION_LOGGED: 347 logger.warning(f"Partition duplication detected for stream {self._stream_name}") 348 self._IS_PARTITION_DUPLICATION_LOGGED = True 349 return 350 else: 351 self._semaphore_per_partition[partition_key] = threading.Semaphore(0) 352 353 with self._lock: 354 seq = self._generated_partitions_count 355 self._generated_partitions_count += 1 356 self._processing_partitions_indexes.append(seq) 357 self._partition_key_to_index[partition_key] = seq 358 359 if ( 360 len(self._partition_parent_state_map) == 0 361 or self._partition_parent_state_map[ 362 next(reversed(self._partition_parent_state_map)) 363 ][self._PARENT_STATE] 364 != parent_state 365 ): 366 self._partition_parent_state_map[partition_key] = (deepcopy(parent_state), seq) 367 368 for cursor_slice, is_last_slice, _ in iterate_with_last_flag_and_state( 369 cursor.stream_slices(), 370 lambda: None, 371 ): 372 self._semaphore_per_partition[partition_key].release() 373 if is_last_slice: 374 self._partitions_done_generating_stream_slices.add(partition_key) 375 yield StreamSlice( 376 partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields 377 ) 378 379 def _ensure_partition_limit(self) -> None: 380 """ 381 Ensure the maximum number of partitions does not exceed the predefined limit. 382 383 Steps: 384 1. Attempt to remove partitions that are marked as finished in `_finished_partitions`. 385 These partitions are considered processed and safe to delete. 386 2. If the limit is still exceeded and no finished partitions are available for removal, 387 remove the oldest partition unconditionally. We expect failed partitions to be removed. 388 389 Logging: 390 - Logs a warning each time a partition is removed, indicating whether it was finished 391 or removed due to being the oldest. 392 """ 393 if not self._use_global_cursor and self.limit_reached(): 394 logger.info( 395 f"Exceeded the 'SWITCH_TO_GLOBAL_LIMIT' of {self.SWITCH_TO_GLOBAL_LIMIT}. " 396 f"Switching to global cursor for {self._stream_name}." 397 ) 398 self._use_global_cursor = True 399 400 with self._lock: 401 while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1: 402 # Try removing finished partitions first 403 for partition_key in list(self._cursor_per_partition.keys()): 404 if partition_key not in self._partition_key_to_index: 405 oldest_partition = self._cursor_per_partition.pop( 406 partition_key 407 ) # Remove the oldest partition 408 logger.debug( 409 f"The maximum number of partitions has been reached. Dropping the oldest finished partition: {oldest_partition}. Over limit: {self._number_of_partitions - self.DEFAULT_MAX_PARTITIONS_NUMBER}." 410 ) 411 break 412 else: 413 # If no finished partitions can be removed, fall back to removing the oldest partition 414 oldest_partition = self._cursor_per_partition.popitem(last=False)[ 415 1 416 ] # Remove the oldest partition 417 logger.warning( 418 f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._number_of_partitions - self.DEFAULT_MAX_PARTITIONS_NUMBER}." 419 ) 420 421 def _set_initial_state(self, stream_state: StreamState) -> None: 422 """ 423 Initialize the cursor's state using the provided `stream_state`. 424 425 This method supports global and per-partition state initialization. 426 427 - **Global State**: If `states` is missing, the `state` is treated as global and applied to all partitions. 428 The `global state` holds a single cursor position representing the latest processed record across all partitions. 429 430 - **Lookback Window**: Configured via `lookback_window`, it defines the period (in seconds) for reprocessing records. 431 This ensures robustness in case of upstream data delays or reordering. If not specified, it defaults to 0. 432 433 - **Per-Partition State**: If `states` is present, each partition's cursor state is initialized separately. 434 435 - **Parent State**: (if available) Used to initialize partition routers based on parent streams. 436 437 Args: 438 stream_state (StreamState): The state of the streams to be set. The format of the stream state should be: 439 { 440 "states": [ 441 { 442 "partition": { 443 "partition_key": "value" 444 }, 445 "cursor": { 446 "last_updated": "2023-05-27T00:00:00Z" 447 } 448 } 449 ], 450 "state": { 451 "last_updated": "2023-05-27T00:00:00Z" 452 }, 453 lookback_window: 10, 454 "parent_state": { 455 "parent_stream_name": { 456 "last_updated": "2023-05-27T00:00:00Z" 457 } 458 } 459 } 460 """ 461 if not stream_state: 462 return 463 464 if ( 465 self._PERPARTITION_STATE_KEY not in stream_state 466 and self._GLOBAL_STATE_KEY not in stream_state 467 ): 468 # We assume that `stream_state` is in a global format that can be applied to all partitions. 469 # Example: {"global_state_format_key": "global_state_format_value"} 470 self._set_global_state(stream_state) 471 472 else: 473 self._use_global_cursor = stream_state.get("use_global_cursor", False) 474 475 self._lookback_window = int(stream_state.get("lookback_window", 0)) 476 477 for state in stream_state.get(self._PERPARTITION_STATE_KEY, []): 478 self._number_of_partitions += 1 479 self._cursor_per_partition[self._to_partition_key(state["partition"])] = ( 480 self._create_cursor(state["cursor"]) 481 ) 482 483 # set default state for missing partitions if it is per partition with fallback to global 484 if self._GLOBAL_STATE_KEY in stream_state: 485 self._set_global_state(stream_state[self._GLOBAL_STATE_KEY]) 486 487 # Set initial parent state 488 if stream_state.get("parent_state"): 489 self._parent_state = stream_state["parent_state"] 490 491 def _set_global_state(self, stream_state: Mapping[str, Any]) -> None: 492 """ 493 Initializes the global cursor state from the provided stream state. 494 495 If the cursor field key is present in the stream state, its value is parsed, 496 formatted, and stored as the global cursor. This ensures consistency in state 497 representation across partitions. 498 """ 499 if self.cursor_field.cursor_field_key in stream_state: 500 global_state_value = stream_state[self.cursor_field.cursor_field_key] 501 final_format_global_state_value = self._connector_state_converter.output_format( 502 self._connector_state_converter.parse_value(global_state_value) 503 ) 504 505 fixed_global_state = { 506 self.cursor_field.cursor_field_key: final_format_global_state_value 507 } 508 509 self._global_cursor = deepcopy(fixed_global_state) 510 self._new_global_cursor = deepcopy(fixed_global_state) 511 512 def observe(self, record: Record) -> None: 513 if not record.associated_slice: 514 raise ValueError( 515 "Invalid state as stream slices that are emitted should refer to an existing cursor" 516 ) 517 518 # if the current record has no cursor value, we cannot meaningfully update the state based on it, so there is nothing more to do 519 try: 520 record_cursor_value = self._cursor_field.extract_value(record) 521 except ValueError: 522 return 523 524 try: 525 record_cursor = self._connector_state_converter.output_format( 526 self._connector_state_converter.parse_value(record_cursor_value) 527 ) 528 except ValueError as exception: 529 if not self._logged_regarding_datetime_format_error: 530 logger.warning( 531 "Skipping cursor update for stream '%s': failed to parse cursor field '%s' value %r: %s", 532 self._stream_name, 533 self._cursor_field.cursor_field_key, 534 record_cursor_value, 535 exception, 536 ) 537 self._logged_regarding_datetime_format_error = True 538 return 539 540 self._synced_some_data = True 541 self._update_global_cursor(record_cursor) 542 if not self._use_global_cursor: 543 self._cursor_per_partition[ 544 self._to_partition_key(record.associated_slice.partition) 545 ].observe(record) 546 547 def _update_global_cursor(self, value: Any) -> None: 548 if ( 549 self._new_global_cursor is None 550 or self._new_global_cursor[self.cursor_field.cursor_field_key] < value 551 ): 552 self._new_global_cursor = {self.cursor_field.cursor_field_key: copy.deepcopy(value)} 553 554 def _cleanup_if_done(self, partition_key: str) -> None: 555 """ 556 Free every in-memory structure that belonged to a completed partition: 557 cursor, semaphore, flag inside `_finished_partitions` 558 """ 559 if not ( 560 partition_key in self._partitions_done_generating_stream_slices 561 and self._semaphore_per_partition[partition_key]._value == 0 562 ): 563 return 564 565 self._semaphore_per_partition.pop(partition_key, None) 566 self._partitions_done_generating_stream_slices.discard(partition_key) 567 568 seq = self._partition_key_to_index.pop(partition_key) 569 self._processing_partitions_indexes.remove(seq) 570 571 logger.debug(f"Partition {partition_key} fully processed and cleaned up.") 572 573 def _to_partition_key(self, partition: Mapping[str, Any]) -> str: 574 return self._partition_serializer.to_partition_key(partition) 575 576 def _to_dict(self, partition_key: str) -> Mapping[str, Any]: 577 return self._partition_serializer.to_partition(partition_key) 578 579 def _create_cursor( 580 self, cursor_state: Any, runtime_lookback_window: int = 0 581 ) -> ConcurrentCursor: 582 cursor = self._cursor_factory.create( 583 stream_state=deepcopy(cursor_state), 584 runtime_lookback_window=timedelta(seconds=runtime_lookback_window), 585 ) 586 return cursor 587 588 def should_be_synced(self, record: Record) -> bool: 589 return self._get_cursor(record).should_be_synced(record) 590 591 def _get_cursor(self, record: Record) -> ConcurrentCursor: 592 if not record.associated_slice: 593 raise ValueError( 594 "Invalid state as stream slices that are emitted should refer to an existing cursor" 595 ) 596 597 if self._use_global_cursor: 598 return self._create_cursor( 599 self._global_cursor, 600 self._lookback_window if self._global_cursor else 0, 601 ) 602 603 partition_key = self._to_partition_key(record.associated_slice.partition) 604 if ( 605 partition_key not in self._cursor_per_partition 606 and not self._attempt_to_create_cursor_if_not_provided 607 ): 608 raise ValueError( 609 "Invalid state as stream slices that are emitted should refer to an existing cursor" 610 ) 611 elif partition_key not in self._cursor_per_partition: 612 return self._create_cursor( 613 self._global_cursor, 614 self._lookback_window if self._global_cursor else 0, 615 ) 616 else: 617 return self._cursor_per_partition[partition_key] 618 619 def limit_reached(self) -> bool: 620 return self._number_of_partitions > self.SWITCH_TO_GLOBAL_LIMIT 621 622 @staticmethod 623 def get_parent_state( 624 stream_state: Optional[StreamState], parent_stream_name: str 625 ) -> Optional[AirbyteStateMessage]: 626 if not stream_state: 627 return None 628 629 if "parent_state" not in stream_state: 630 logger.warning( 631 f"Trying to get_parent_state for stream `{parent_stream_name}` when there are not parent state in the state" 632 ) 633 return None 634 elif parent_stream_name not in stream_state["parent_state"]: 635 logger.info( 636 f"Could not find parent state for stream `{parent_stream_name}`. On parents available are {list(stream_state['parent_state'].keys())}" 637 ) 638 return None 639 640 return AirbyteStateMessage( 641 type=AirbyteStateType.STREAM, 642 stream=AirbyteStreamState( 643 stream_descriptor=StreamDescriptor(parent_stream_name, None), 644 stream_state=AirbyteStateBlob(stream_state["parent_state"][parent_stream_name]), 645 ), 646 ) 647 648 @staticmethod 649 def get_global_state( 650 stream_state: Optional[StreamState], parent_stream_name: str 651 ) -> Optional[AirbyteStateMessage]: 652 return ( 653 AirbyteStateMessage( 654 type=AirbyteStateType.STREAM, 655 stream=AirbyteStreamState( 656 stream_descriptor=StreamDescriptor(parent_stream_name, None), 657 stream_state=AirbyteStateBlob(stream_state["state"]), 658 ), 659 ) 660 if stream_state and "state" in stream_state 661 else None 662 ) 663 664 def get_cursor_datetime_from_state( 665 self, stream_state: Mapping[str, Any] 666 ) -> datetime.datetime | None: 667 """Extract and parse the cursor datetime from the global cursor in per-partition state. 668 669 For per-partition cursors, the global cursor is stored under the "state" key. 670 This method delegates to the underlying cursor factory to parse the datetime. 671 672 Returns None if the global cursor is not present or cannot be parsed. 673 """ 674 global_state = stream_state.get(self._GLOBAL_STATE_KEY) 675 if not global_state or not isinstance(global_state, dict): 676 return None 677 678 # Create a cursor to delegate the parsing 679 cursor = self._cursor_factory.create(stream_state={}, runtime_lookback_window=None) 680 return cursor.get_cursor_datetime_from_state(global_state)
Manages state per partition when a stream has many partitions, preventing data loss or duplication.
Attributes:
DEFAULT_MAX_PARTITIONS_NUMBER (int): Maximum number of partitions to retain in memory (default is 10,000). This limit needs to be higher than the number of threads we might enqueue (which is represented by ThreadPoolManager.DEFAULT_MAX_QUEUE_SIZE). If not, we could have partitions that have been generated and submitted to the ThreadPool but got deleted from the ConcurrentPerPartitionCursor and when closing them, it will generate KeyError.
Partition Limitation Logic Ensures the number of tracked partitions does not exceed the specified limit to prevent memory overuse. Oldest partitions are removed when the limit is reached.
Global Cursor Fallback New partitions use global state as the initial state to progress the state for deleted or new partitions. The history data added after the initial sync will be missing.
CurrentPerPartitionCursor expects the state of the ConcurrentCursor to follow the format {cursor_field: cursor_value}.
134 def __init__( 135 self, 136 cursor_factory: ConcurrentCursorFactory, 137 partition_router: PartitionRouter, 138 stream_name: str, 139 stream_namespace: Optional[str], 140 stream_state: Any, 141 message_repository: MessageRepository, 142 connector_state_manager: ConnectorStateManager, 143 connector_state_converter: AbstractStreamStateConverter, 144 cursor_field: CursorField, 145 use_global_cursor: bool = False, 146 attempt_to_create_cursor_if_not_provided: bool = False, 147 ) -> None: 148 self._global_cursor: Optional[StreamState] = {} 149 self._stream_name = stream_name 150 self._stream_namespace = stream_namespace 151 self._message_repository = message_repository 152 self._connector_state_manager = connector_state_manager 153 self._connector_state_converter = connector_state_converter 154 self._cursor_field = cursor_field 155 156 self._cursor_factory = cursor_factory # self._cursor_factory is flagged as private but is used in model_to_component_factory to ease pagination reset instantiation 157 self._partition_router = partition_router 158 159 # The dict is ordered to ensure that once the maximum number of partitions is reached, 160 # the oldest partitions can be efficiently removed, maintaining the most recent partitions. 161 self._cursor_per_partition: OrderedDict[str, ConcurrentCursor] = OrderedDict() 162 self._semaphore_per_partition: OrderedDict[str, threading.Semaphore] = OrderedDict() 163 164 # Parent-state tracking: store each partition’s parent state in creation order 165 self._partition_parent_state_map: OrderedDict[str, tuple[Mapping[str, Any], int]] = ( 166 OrderedDict() 167 ) 168 self._parent_state: Optional[StreamState] = None 169 170 # Tracks when the last slice for partition is emitted 171 self._partitions_done_generating_stream_slices: set[str] = set() 172 # Used to track the index of partitions that are not closed yet 173 self._processing_partitions_indexes: List[int] = list() 174 self._generated_partitions_count: int = 0 175 # Dictionary to map partition keys to their index 176 self._partition_key_to_index: dict[str, int] = {} 177 178 self._lock = threading.Lock() 179 self._lookback_window: int = 0 180 self._new_global_cursor: Optional[StreamState] = None 181 self._number_of_partitions: int = 0 182 self._use_global_cursor: bool = use_global_cursor 183 self._partition_serializer = PerPartitionKeySerializer() 184 185 # Track the last time a state message was emitted 186 self._last_emission_time: float = 0.0 187 self._timer = Timer() 188 189 self._set_initial_state(stream_state) 190 191 # FIXME this is a temporary field the time of the migration from declarative cursors to concurrent ones 192 self._attempt_to_create_cursor_if_not_provided = attempt_to_create_cursor_if_not_provided 193 self._synced_some_data = False 194 self._logged_regarding_datetime_format_error = False
200 @property 201 def state(self) -> MutableMapping[str, Any]: 202 state: dict[str, Any] = {"use_global_cursor": self._use_global_cursor} 203 if not self._use_global_cursor: 204 states = [] 205 for partition_tuple, cursor in self._cursor_per_partition.items(): 206 if cursor.state: 207 states.append( 208 { 209 "partition": self._to_dict(partition_tuple), 210 "cursor": copy.deepcopy(cursor.state), 211 } 212 ) 213 state[self._PERPARTITION_STATE_KEY] = states 214 215 if self._global_cursor: 216 state[self._GLOBAL_STATE_KEY] = self._global_cursor 217 if self._lookback_window is not None: 218 state["lookback_window"] = self._lookback_window 219 if self._parent_state is not None: 220 state["parent_state"] = self._parent_state 221 return state
223 def close_partition(self, partition: Partition) -> None: 224 # Attempt to retrieve the stream slice 225 stream_slice: Optional[StreamSlice] = partition.to_slice() # type: ignore[assignment] 226 227 # Ensure stream_slice is not None 228 if stream_slice is None: 229 raise ValueError("stream_slice cannot be None") 230 231 partition_key = self._to_partition_key(stream_slice.partition) 232 with self._lock: 233 self._semaphore_per_partition[partition_key].acquire() 234 if not self._use_global_cursor: 235 cursor = self._cursor_per_partition[partition_key] 236 cursor.close_partition(partition=partition) 237 if ( 238 partition_key in self._partitions_done_generating_stream_slices 239 and self._semaphore_per_partition[partition_key]._value == 0 240 ): 241 self._update_global_cursor(cursor.state[self.cursor_field.cursor_field_key]) 242 243 # Clean up the partition if it is fully processed 244 self._cleanup_if_done(partition_key) 245 246 self._check_and_update_parent_state() 247 248 self._emit_state_message()
Indicate to the cursor that the partition has been successfully processed
272 def ensure_at_least_one_state_emitted(self) -> None: 273 """ 274 The platform expects at least one state message on successful syncs. Hence, whatever happens, we expect this method to be 275 called. 276 """ 277 if not any( 278 semaphore_item[1]._value for semaphore_item in self._semaphore_per_partition.items() 279 ): 280 if self._synced_some_data: 281 # we only update those if we actually synced some data 282 self._global_cursor = self._new_global_cursor 283 self._lookback_window = self._timer.finish() 284 self._parent_state = self._partition_router.get_stream_state() 285 self._emit_state_message(throttle=False)
The platform expects at least one state message on successful syncs. Hence, whatever happens, we expect this method to be called.
316 def stream_slices(self) -> Iterable[StreamSlice]: 317 if self._timer.is_running(): 318 raise RuntimeError("stream_slices has been executed more than once.") 319 320 slices = self._partition_router.stream_slices() 321 self._timer.start() 322 for partition, last, parent_state in iterate_with_last_flag_and_state( 323 slices, self._partition_router.get_stream_state 324 ): 325 yield from self._generate_slices_from_partition(partition, parent_state)
Default placeholder implementation of generate_slices. Subclasses can override this method to provide actual behavior.
512 def observe(self, record: Record) -> None: 513 if not record.associated_slice: 514 raise ValueError( 515 "Invalid state as stream slices that are emitted should refer to an existing cursor" 516 ) 517 518 # if the current record has no cursor value, we cannot meaningfully update the state based on it, so there is nothing more to do 519 try: 520 record_cursor_value = self._cursor_field.extract_value(record) 521 except ValueError: 522 return 523 524 try: 525 record_cursor = self._connector_state_converter.output_format( 526 self._connector_state_converter.parse_value(record_cursor_value) 527 ) 528 except ValueError as exception: 529 if not self._logged_regarding_datetime_format_error: 530 logger.warning( 531 "Skipping cursor update for stream '%s': failed to parse cursor field '%s' value %r: %s", 532 self._stream_name, 533 self._cursor_field.cursor_field_key, 534 record_cursor_value, 535 exception, 536 ) 537 self._logged_regarding_datetime_format_error = True 538 return 539 540 self._synced_some_data = True 541 self._update_global_cursor(record_cursor) 542 if not self._use_global_cursor: 543 self._cursor_per_partition[ 544 self._to_partition_key(record.associated_slice.partition) 545 ].observe(record)
Indicate to the cursor that the record has been emitted
622 @staticmethod 623 def get_parent_state( 624 stream_state: Optional[StreamState], parent_stream_name: str 625 ) -> Optional[AirbyteStateMessage]: 626 if not stream_state: 627 return None 628 629 if "parent_state" not in stream_state: 630 logger.warning( 631 f"Trying to get_parent_state for stream `{parent_stream_name}` when there are not parent state in the state" 632 ) 633 return None 634 elif parent_stream_name not in stream_state["parent_state"]: 635 logger.info( 636 f"Could not find parent state for stream `{parent_stream_name}`. On parents available are {list(stream_state['parent_state'].keys())}" 637 ) 638 return None 639 640 return AirbyteStateMessage( 641 type=AirbyteStateType.STREAM, 642 stream=AirbyteStreamState( 643 stream_descriptor=StreamDescriptor(parent_stream_name, None), 644 stream_state=AirbyteStateBlob(stream_state["parent_state"][parent_stream_name]), 645 ), 646 )
648 @staticmethod 649 def get_global_state( 650 stream_state: Optional[StreamState], parent_stream_name: str 651 ) -> Optional[AirbyteStateMessage]: 652 return ( 653 AirbyteStateMessage( 654 type=AirbyteStateType.STREAM, 655 stream=AirbyteStreamState( 656 stream_descriptor=StreamDescriptor(parent_stream_name, None), 657 stream_state=AirbyteStateBlob(stream_state["state"]), 658 ), 659 ) 660 if stream_state and "state" in stream_state 661 else None 662 )
664 def get_cursor_datetime_from_state( 665 self, stream_state: Mapping[str, Any] 666 ) -> datetime.datetime | None: 667 """Extract and parse the cursor datetime from the global cursor in per-partition state. 668 669 For per-partition cursors, the global cursor is stored under the "state" key. 670 This method delegates to the underlying cursor factory to parse the datetime. 671 672 Returns None if the global cursor is not present or cannot be parsed. 673 """ 674 global_state = stream_state.get(self._GLOBAL_STATE_KEY) 675 if not global_state or not isinstance(global_state, dict): 676 return None 677 678 # Create a cursor to delegate the parsing 679 cursor = self._cursor_factory.create(stream_state={}, runtime_lookback_window=None) 680 return cursor.get_cursor_datetime_from_state(global_state)
Extract and parse the cursor datetime from the global cursor in per-partition state.
For per-partition cursors, the global cursor is stored under the "state" key. This method delegates to the underlying cursor factory to parse the datetime.
Returns None if the global cursor is not present or cannot be parsed.