airbyte_cdk.sources.declarative.incremental
95class ConcurrentCursorFactory: 96 def __init__(self, create_function: Callable[..., ConcurrentCursor]): 97 self._create_function = create_function 98 99 def create( 100 self, stream_state: Mapping[str, Any], runtime_lookback_window: Optional[timedelta] 101 ) -> ConcurrentCursor: 102 return self._create_function( 103 stream_state=stream_state, runtime_lookback_window=runtime_lookback_window 104 )
107class ConcurrentPerPartitionCursor(Cursor): 108 """ 109 Manages state per partition when a stream has many partitions, preventing data loss or duplication. 110 111 Attributes: 112 DEFAULT_MAX_PARTITIONS_NUMBER (int): Maximum number of partitions to retain in memory (default is 10,000). This limit needs to be higher than the number of threads we might enqueue (which is represented by ThreadPoolManager.DEFAULT_MAX_QUEUE_SIZE). If not, we could have partitions that have been generated and submitted to the ThreadPool but got deleted from the ConcurrentPerPartitionCursor and when closing them, it will generate KeyError. 113 114 - **Partition Limitation Logic** 115 Ensures the number of tracked partitions does not exceed the specified limit to prevent memory overuse. Oldest partitions are removed when the limit is reached. 116 117 - **Global Cursor Fallback** 118 New partitions use global state as the initial state to progress the state for deleted or new partitions. The history data added after the initial sync will be missing. 119 120 CurrentPerPartitionCursor expects the state of the ConcurrentCursor to follow the format {cursor_field: cursor_value}. 121 """ 122 123 DEFAULT_MAX_PARTITIONS_NUMBER = 25_000 124 SWITCH_TO_GLOBAL_LIMIT = 10_000 125 _NO_STATE: Mapping[str, Any] = {} 126 _NO_CURSOR_STATE: Mapping[str, Any] = {} 127 _GLOBAL_STATE_KEY = "state" 128 _PERPARTITION_STATE_KEY = "states" 129 _IS_PARTITION_DUPLICATION_LOGGED = False 130 _PARENT_STATE = 0 131 _GENERATION_SEQUENCE = 1 132 133 def __init__( 134 self, 135 cursor_factory: ConcurrentCursorFactory, 136 partition_router: PartitionRouter, 137 stream_name: str, 138 stream_namespace: Optional[str], 139 stream_state: Any, 140 message_repository: MessageRepository, 141 connector_state_manager: ConnectorStateManager, 142 connector_state_converter: AbstractStreamStateConverter, 143 cursor_field: CursorField, 144 use_global_cursor: bool = False, 145 attempt_to_create_cursor_if_not_provided: bool = False, 146 ) -> None: 147 self._global_cursor: Optional[StreamState] = {} 148 self._stream_name = stream_name 149 self._stream_namespace = stream_namespace 150 self._message_repository = message_repository 151 self._connector_state_manager = connector_state_manager 152 self._connector_state_converter = connector_state_converter 153 self._cursor_field = cursor_field 154 155 self._cursor_factory = cursor_factory 156 self._partition_router = partition_router 157 158 # The dict is ordered to ensure that once the maximum number of partitions is reached, 159 # the oldest partitions can be efficiently removed, maintaining the most recent partitions. 160 self._cursor_per_partition: OrderedDict[str, ConcurrentCursor] = OrderedDict() 161 self._semaphore_per_partition: OrderedDict[str, threading.Semaphore] = OrderedDict() 162 163 # Parent-state tracking: store each partition’s parent state in creation order 164 self._partition_parent_state_map: OrderedDict[str, tuple[Mapping[str, Any], int]] = ( 165 OrderedDict() 166 ) 167 self._parent_state: Optional[StreamState] = None 168 169 # Tracks when the last slice for partition is emitted 170 self._partitions_done_generating_stream_slices: set[str] = set() 171 # Used to track the index of partitions that are not closed yet 172 self._processing_partitions_indexes: List[int] = list() 173 self._generated_partitions_count: int = 0 174 # Dictionary to map partition keys to their index 175 self._partition_key_to_index: dict[str, int] = {} 176 177 self._lock = threading.Lock() 178 self._lookback_window: int = 0 179 self._new_global_cursor: Optional[StreamState] = None 180 self._number_of_partitions: int = 0 181 self._use_global_cursor: bool = use_global_cursor 182 self._partition_serializer = PerPartitionKeySerializer() 183 184 # Track the last time a state message was emitted 185 self._last_emission_time: float = 0.0 186 self._timer = Timer() 187 188 self._set_initial_state(stream_state) 189 190 # FIXME this is a temporary field the time of the migration from declarative cursors to concurrent ones 191 self._attempt_to_create_cursor_if_not_provided = attempt_to_create_cursor_if_not_provided 192 self._synced_some_data = False 193 self._logged_regarding_datetime_format_error = False 194 195 @property 196 def cursor_field(self) -> CursorField: 197 return self._cursor_field 198 199 @property 200 def state(self) -> MutableMapping[str, Any]: 201 state: dict[str, Any] = {"use_global_cursor": self._use_global_cursor} 202 if not self._use_global_cursor: 203 states = [] 204 for partition_tuple, cursor in self._cursor_per_partition.items(): 205 if cursor.state: 206 states.append( 207 { 208 "partition": self._to_dict(partition_tuple), 209 "cursor": copy.deepcopy(cursor.state), 210 } 211 ) 212 state[self._PERPARTITION_STATE_KEY] = states 213 214 if self._global_cursor: 215 state[self._GLOBAL_STATE_KEY] = self._global_cursor 216 if self._lookback_window is not None: 217 state["lookback_window"] = self._lookback_window 218 if self._parent_state is not None: 219 state["parent_state"] = self._parent_state 220 return state 221 222 def close_partition(self, partition: Partition) -> None: 223 # Attempt to retrieve the stream slice 224 stream_slice: Optional[StreamSlice] = partition.to_slice() # type: ignore[assignment] 225 226 # Ensure stream_slice is not None 227 if stream_slice is None: 228 raise ValueError("stream_slice cannot be None") 229 230 partition_key = self._to_partition_key(stream_slice.partition) 231 with self._lock: 232 self._semaphore_per_partition[partition_key].acquire() 233 if not self._use_global_cursor: 234 cursor = self._cursor_per_partition[partition_key] 235 cursor.close_partition(partition=partition) 236 if ( 237 partition_key in self._partitions_done_generating_stream_slices 238 and self._semaphore_per_partition[partition_key]._value == 0 239 ): 240 self._update_global_cursor(cursor.state[self.cursor_field.cursor_field_key]) 241 242 # Clean up the partition if it is fully processed 243 self._cleanup_if_done(partition_key) 244 245 self._check_and_update_parent_state() 246 247 self._emit_state_message() 248 249 def _check_and_update_parent_state(self) -> None: 250 last_closed_state = None 251 252 while self._partition_parent_state_map: 253 earliest_key, (candidate_state, candidate_seq) = next( 254 iter(self._partition_parent_state_map.items()) 255 ) 256 257 # if any partition that started <= candidate_seq is still open, we must wait 258 if ( 259 self._processing_partitions_indexes 260 and self._processing_partitions_indexes[0] <= candidate_seq 261 ): 262 break 263 264 # safe to pop 265 self._partition_parent_state_map.popitem(last=False) 266 last_closed_state = candidate_state 267 268 if last_closed_state is not None: 269 self._parent_state = last_closed_state 270 271 def ensure_at_least_one_state_emitted(self) -> None: 272 """ 273 The platform expects at least one state message on successful syncs. Hence, whatever happens, we expect this method to be 274 called. 275 """ 276 if not any( 277 semaphore_item[1]._value for semaphore_item in self._semaphore_per_partition.items() 278 ): 279 if self._synced_some_data: 280 # we only update those if we actually synced some data 281 self._global_cursor = self._new_global_cursor 282 self._lookback_window = self._timer.finish() 283 self._parent_state = self._partition_router.get_stream_state() 284 self._emit_state_message(throttle=False) 285 286 def _throttle_state_message(self) -> Optional[float]: 287 """ 288 Throttles the state message emission to once every 600 seconds. 289 """ 290 current_time = time.time() 291 if current_time - self._last_emission_time <= 600: 292 return None 293 return current_time 294 295 def _emit_state_message(self, throttle: bool = True) -> None: 296 if throttle: 297 current_time = self._throttle_state_message() 298 if current_time is None: 299 return 300 self._last_emission_time = current_time 301 # Skip state emit for global cursor if parent state is empty 302 if self._use_global_cursor and not self._parent_state: 303 return 304 305 self._connector_state_manager.update_state_for_stream( 306 self._stream_name, 307 self._stream_namespace, 308 self.state, 309 ) 310 state_message = self._connector_state_manager.create_state_message( 311 self._stream_name, self._stream_namespace 312 ) 313 self._message_repository.emit_message(state_message) 314 315 def stream_slices(self) -> Iterable[StreamSlice]: 316 if self._timer.is_running(): 317 raise RuntimeError("stream_slices has been executed more than once.") 318 319 slices = self._partition_router.stream_slices() 320 self._timer.start() 321 for partition, last, parent_state in iterate_with_last_flag_and_state( 322 slices, self._partition_router.get_stream_state 323 ): 324 yield from self._generate_slices_from_partition(partition, parent_state) 325 326 def _generate_slices_from_partition( 327 self, partition: StreamSlice, parent_state: Mapping[str, Any] 328 ) -> Iterable[StreamSlice]: 329 # Ensure the maximum number of partitions is not exceeded 330 self._ensure_partition_limit() 331 332 partition_key = self._to_partition_key(partition.partition) 333 334 cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition)) 335 if not cursor: 336 cursor = self._create_cursor( 337 self._global_cursor, 338 self._lookback_window if self._global_cursor else 0, 339 ) 340 with self._lock: 341 self._number_of_partitions += 1 342 self._cursor_per_partition[partition_key] = cursor 343 344 if partition_key in self._semaphore_per_partition: 345 if not self._IS_PARTITION_DUPLICATION_LOGGED: 346 logger.warning(f"Partition duplication detected for stream {self._stream_name}") 347 self._IS_PARTITION_DUPLICATION_LOGGED = True 348 return 349 else: 350 self._semaphore_per_partition[partition_key] = threading.Semaphore(0) 351 352 with self._lock: 353 seq = self._generated_partitions_count 354 self._generated_partitions_count += 1 355 self._processing_partitions_indexes.append(seq) 356 self._partition_key_to_index[partition_key] = seq 357 358 if ( 359 len(self._partition_parent_state_map) == 0 360 or self._partition_parent_state_map[ 361 next(reversed(self._partition_parent_state_map)) 362 ][self._PARENT_STATE] 363 != parent_state 364 ): 365 self._partition_parent_state_map[partition_key] = (deepcopy(parent_state), seq) 366 367 for cursor_slice, is_last_slice, _ in iterate_with_last_flag_and_state( 368 cursor.stream_slices(), 369 lambda: None, 370 ): 371 self._semaphore_per_partition[partition_key].release() 372 if is_last_slice: 373 self._partitions_done_generating_stream_slices.add(partition_key) 374 yield StreamSlice( 375 partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields 376 ) 377 378 def _ensure_partition_limit(self) -> None: 379 """ 380 Ensure the maximum number of partitions does not exceed the predefined limit. 381 382 Steps: 383 1. Attempt to remove partitions that are marked as finished in `_finished_partitions`. 384 These partitions are considered processed and safe to delete. 385 2. If the limit is still exceeded and no finished partitions are available for removal, 386 remove the oldest partition unconditionally. We expect failed partitions to be removed. 387 388 Logging: 389 - Logs a warning each time a partition is removed, indicating whether it was finished 390 or removed due to being the oldest. 391 """ 392 if not self._use_global_cursor and self.limit_reached(): 393 logger.info( 394 f"Exceeded the 'SWITCH_TO_GLOBAL_LIMIT' of {self.SWITCH_TO_GLOBAL_LIMIT}. " 395 f"Switching to global cursor for {self._stream_name}." 396 ) 397 self._use_global_cursor = True 398 399 with self._lock: 400 while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1: 401 # Try removing finished partitions first 402 for partition_key in list(self._cursor_per_partition.keys()): 403 if partition_key not in self._partition_key_to_index: 404 oldest_partition = self._cursor_per_partition.pop( 405 partition_key 406 ) # Remove the oldest partition 407 logger.debug( 408 f"The maximum number of partitions has been reached. Dropping the oldest finished partition: {oldest_partition}. Over limit: {self._number_of_partitions - self.DEFAULT_MAX_PARTITIONS_NUMBER}." 409 ) 410 break 411 else: 412 # If no finished partitions can be removed, fall back to removing the oldest partition 413 oldest_partition = self._cursor_per_partition.popitem(last=False)[ 414 1 415 ] # Remove the oldest partition 416 logger.warning( 417 f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._number_of_partitions - self.DEFAULT_MAX_PARTITIONS_NUMBER}." 418 ) 419 420 def _set_initial_state(self, stream_state: StreamState) -> None: 421 """ 422 Initialize the cursor's state using the provided `stream_state`. 423 424 This method supports global and per-partition state initialization. 425 426 - **Global State**: If `states` is missing, the `state` is treated as global and applied to all partitions. 427 The `global state` holds a single cursor position representing the latest processed record across all partitions. 428 429 - **Lookback Window**: Configured via `lookback_window`, it defines the period (in seconds) for reprocessing records. 430 This ensures robustness in case of upstream data delays or reordering. If not specified, it defaults to 0. 431 432 - **Per-Partition State**: If `states` is present, each partition's cursor state is initialized separately. 433 434 - **Parent State**: (if available) Used to initialize partition routers based on parent streams. 435 436 Args: 437 stream_state (StreamState): The state of the streams to be set. The format of the stream state should be: 438 { 439 "states": [ 440 { 441 "partition": { 442 "partition_key": "value" 443 }, 444 "cursor": { 445 "last_updated": "2023-05-27T00:00:00Z" 446 } 447 } 448 ], 449 "state": { 450 "last_updated": "2023-05-27T00:00:00Z" 451 }, 452 lookback_window: 10, 453 "parent_state": { 454 "parent_stream_name": { 455 "last_updated": "2023-05-27T00:00:00Z" 456 } 457 } 458 } 459 """ 460 if not stream_state: 461 return 462 463 if ( 464 self._PERPARTITION_STATE_KEY not in stream_state 465 and self._GLOBAL_STATE_KEY not in stream_state 466 ): 467 # We assume that `stream_state` is in a global format that can be applied to all partitions. 468 # Example: {"global_state_format_key": "global_state_format_value"} 469 self._set_global_state(stream_state) 470 471 else: 472 self._use_global_cursor = stream_state.get("use_global_cursor", False) 473 474 self._lookback_window = int(stream_state.get("lookback_window", 0)) 475 476 for state in stream_state.get(self._PERPARTITION_STATE_KEY, []): 477 self._number_of_partitions += 1 478 self._cursor_per_partition[self._to_partition_key(state["partition"])] = ( 479 self._create_cursor(state["cursor"]) 480 ) 481 482 # set default state for missing partitions if it is per partition with fallback to global 483 if self._GLOBAL_STATE_KEY in stream_state: 484 self._set_global_state(stream_state[self._GLOBAL_STATE_KEY]) 485 486 # Set initial parent state 487 if stream_state.get("parent_state"): 488 self._parent_state = stream_state["parent_state"] 489 490 def _set_global_state(self, stream_state: Mapping[str, Any]) -> None: 491 """ 492 Initializes the global cursor state from the provided stream state. 493 494 If the cursor field key is present in the stream state, its value is parsed, 495 formatted, and stored as the global cursor. This ensures consistency in state 496 representation across partitions. 497 """ 498 if self.cursor_field.cursor_field_key in stream_state: 499 global_state_value = stream_state[self.cursor_field.cursor_field_key] 500 final_format_global_state_value = self._connector_state_converter.output_format( 501 self._connector_state_converter.parse_value(global_state_value) 502 ) 503 504 fixed_global_state = { 505 self.cursor_field.cursor_field_key: final_format_global_state_value 506 } 507 508 self._global_cursor = deepcopy(fixed_global_state) 509 self._new_global_cursor = deepcopy(fixed_global_state) 510 511 def observe(self, record: Record) -> None: 512 if not record.associated_slice: 513 raise ValueError( 514 "Invalid state as stream slices that are emitted should refer to an existing cursor" 515 ) 516 517 # if the current record has no cursor value, we cannot meaningfully update the state based on it, so there is nothing more to do 518 try: 519 record_cursor_value = self._cursor_field.extract_value(record) 520 except ValueError: 521 return 522 523 try: 524 record_cursor = self._connector_state_converter.output_format( 525 self._connector_state_converter.parse_value(record_cursor_value) 526 ) 527 except ValueError as exception: 528 if not self._logged_regarding_datetime_format_error: 529 logger.warning( 530 "Skipping cursor update for stream '%s': failed to parse cursor field '%s' value %r: %s", 531 self._stream_name, 532 self._cursor_field.cursor_field_key, 533 record_cursor_value, 534 exception, 535 ) 536 self._logged_regarding_datetime_format_error = True 537 return 538 539 self._synced_some_data = True 540 self._update_global_cursor(record_cursor) 541 if not self._use_global_cursor: 542 self._cursor_per_partition[ 543 self._to_partition_key(record.associated_slice.partition) 544 ].observe(record) 545 546 def _update_global_cursor(self, value: Any) -> None: 547 if ( 548 self._new_global_cursor is None 549 or self._new_global_cursor[self.cursor_field.cursor_field_key] < value 550 ): 551 self._new_global_cursor = {self.cursor_field.cursor_field_key: copy.deepcopy(value)} 552 553 def _cleanup_if_done(self, partition_key: str) -> None: 554 """ 555 Free every in-memory structure that belonged to a completed partition: 556 cursor, semaphore, flag inside `_finished_partitions` 557 """ 558 if not ( 559 partition_key in self._partitions_done_generating_stream_slices 560 and self._semaphore_per_partition[partition_key]._value == 0 561 ): 562 return 563 564 self._semaphore_per_partition.pop(partition_key, None) 565 self._partitions_done_generating_stream_slices.discard(partition_key) 566 567 seq = self._partition_key_to_index.pop(partition_key) 568 self._processing_partitions_indexes.remove(seq) 569 570 logger.debug(f"Partition {partition_key} fully processed and cleaned up.") 571 572 def _to_partition_key(self, partition: Mapping[str, Any]) -> str: 573 return self._partition_serializer.to_partition_key(partition) 574 575 def _to_dict(self, partition_key: str) -> Mapping[str, Any]: 576 return self._partition_serializer.to_partition(partition_key) 577 578 def _create_cursor( 579 self, cursor_state: Any, runtime_lookback_window: int = 0 580 ) -> ConcurrentCursor: 581 cursor = self._cursor_factory.create( 582 stream_state=deepcopy(cursor_state), 583 runtime_lookback_window=timedelta(seconds=runtime_lookback_window), 584 ) 585 return cursor 586 587 def should_be_synced(self, record: Record) -> bool: 588 return self._get_cursor(record).should_be_synced(record) 589 590 def _get_cursor(self, record: Record) -> ConcurrentCursor: 591 if not record.associated_slice: 592 raise ValueError( 593 "Invalid state as stream slices that are emitted should refer to an existing cursor" 594 ) 595 596 if self._use_global_cursor: 597 return self._create_cursor( 598 self._global_cursor, 599 self._lookback_window if self._global_cursor else 0, 600 ) 601 602 partition_key = self._to_partition_key(record.associated_slice.partition) 603 if ( 604 partition_key not in self._cursor_per_partition 605 and not self._attempt_to_create_cursor_if_not_provided 606 ): 607 raise ValueError( 608 "Invalid state as stream slices that are emitted should refer to an existing cursor" 609 ) 610 elif partition_key not in self._cursor_per_partition: 611 return self._create_cursor( 612 self._global_cursor, 613 self._lookback_window if self._global_cursor else 0, 614 ) 615 else: 616 return self._cursor_per_partition[partition_key] 617 618 def limit_reached(self) -> bool: 619 return self._number_of_partitions > self.SWITCH_TO_GLOBAL_LIMIT 620 621 @staticmethod 622 def get_parent_state( 623 stream_state: Optional[StreamState], parent_stream_name: str 624 ) -> Optional[AirbyteStateMessage]: 625 if not stream_state: 626 return None 627 628 if "parent_state" not in stream_state: 629 logger.warning( 630 f"Trying to get_parent_state for stream `{parent_stream_name}` when there are not parent state in the state" 631 ) 632 return None 633 elif parent_stream_name not in stream_state["parent_state"]: 634 logger.info( 635 f"Could not find parent state for stream `{parent_stream_name}`. On parents available are {list(stream_state['parent_state'].keys())}" 636 ) 637 return None 638 639 return AirbyteStateMessage( 640 type=AirbyteStateType.STREAM, 641 stream=AirbyteStreamState( 642 stream_descriptor=StreamDescriptor(parent_stream_name, None), 643 stream_state=AirbyteStateBlob(stream_state["parent_state"][parent_stream_name]), 644 ), 645 ) 646 647 @staticmethod 648 def get_global_state( 649 stream_state: Optional[StreamState], parent_stream_name: str 650 ) -> Optional[AirbyteStateMessage]: 651 return ( 652 AirbyteStateMessage( 653 type=AirbyteStateType.STREAM, 654 stream=AirbyteStreamState( 655 stream_descriptor=StreamDescriptor(parent_stream_name, None), 656 stream_state=AirbyteStateBlob(stream_state["state"]), 657 ), 658 ) 659 if stream_state and "state" in stream_state 660 else None 661 )
Manages state per partition when a stream has many partitions, preventing data loss or duplication.
Attributes:
DEFAULT_MAX_PARTITIONS_NUMBER (int): Maximum number of partitions to retain in memory (default is 10,000). This limit needs to be higher than the number of threads we might enqueue (which is represented by ThreadPoolManager.DEFAULT_MAX_QUEUE_SIZE). If not, we could have partitions that have been generated and submitted to the ThreadPool but got deleted from the ConcurrentPerPartitionCursor and when closing them, it will generate KeyError.
Partition Limitation Logic Ensures the number of tracked partitions does not exceed the specified limit to prevent memory overuse. Oldest partitions are removed when the limit is reached.
Global Cursor Fallback New partitions use global state as the initial state to progress the state for deleted or new partitions. The history data added after the initial sync will be missing.
CurrentPerPartitionCursor expects the state of the ConcurrentCursor to follow the format {cursor_field: cursor_value}.
133 def __init__( 134 self, 135 cursor_factory: ConcurrentCursorFactory, 136 partition_router: PartitionRouter, 137 stream_name: str, 138 stream_namespace: Optional[str], 139 stream_state: Any, 140 message_repository: MessageRepository, 141 connector_state_manager: ConnectorStateManager, 142 connector_state_converter: AbstractStreamStateConverter, 143 cursor_field: CursorField, 144 use_global_cursor: bool = False, 145 attempt_to_create_cursor_if_not_provided: bool = False, 146 ) -> None: 147 self._global_cursor: Optional[StreamState] = {} 148 self._stream_name = stream_name 149 self._stream_namespace = stream_namespace 150 self._message_repository = message_repository 151 self._connector_state_manager = connector_state_manager 152 self._connector_state_converter = connector_state_converter 153 self._cursor_field = cursor_field 154 155 self._cursor_factory = cursor_factory 156 self._partition_router = partition_router 157 158 # The dict is ordered to ensure that once the maximum number of partitions is reached, 159 # the oldest partitions can be efficiently removed, maintaining the most recent partitions. 160 self._cursor_per_partition: OrderedDict[str, ConcurrentCursor] = OrderedDict() 161 self._semaphore_per_partition: OrderedDict[str, threading.Semaphore] = OrderedDict() 162 163 # Parent-state tracking: store each partition’s parent state in creation order 164 self._partition_parent_state_map: OrderedDict[str, tuple[Mapping[str, Any], int]] = ( 165 OrderedDict() 166 ) 167 self._parent_state: Optional[StreamState] = None 168 169 # Tracks when the last slice for partition is emitted 170 self._partitions_done_generating_stream_slices: set[str] = set() 171 # Used to track the index of partitions that are not closed yet 172 self._processing_partitions_indexes: List[int] = list() 173 self._generated_partitions_count: int = 0 174 # Dictionary to map partition keys to their index 175 self._partition_key_to_index: dict[str, int] = {} 176 177 self._lock = threading.Lock() 178 self._lookback_window: int = 0 179 self._new_global_cursor: Optional[StreamState] = None 180 self._number_of_partitions: int = 0 181 self._use_global_cursor: bool = use_global_cursor 182 self._partition_serializer = PerPartitionKeySerializer() 183 184 # Track the last time a state message was emitted 185 self._last_emission_time: float = 0.0 186 self._timer = Timer() 187 188 self._set_initial_state(stream_state) 189 190 # FIXME this is a temporary field the time of the migration from declarative cursors to concurrent ones 191 self._attempt_to_create_cursor_if_not_provided = attempt_to_create_cursor_if_not_provided 192 self._synced_some_data = False 193 self._logged_regarding_datetime_format_error = False
199 @property 200 def state(self) -> MutableMapping[str, Any]: 201 state: dict[str, Any] = {"use_global_cursor": self._use_global_cursor} 202 if not self._use_global_cursor: 203 states = [] 204 for partition_tuple, cursor in self._cursor_per_partition.items(): 205 if cursor.state: 206 states.append( 207 { 208 "partition": self._to_dict(partition_tuple), 209 "cursor": copy.deepcopy(cursor.state), 210 } 211 ) 212 state[self._PERPARTITION_STATE_KEY] = states 213 214 if self._global_cursor: 215 state[self._GLOBAL_STATE_KEY] = self._global_cursor 216 if self._lookback_window is not None: 217 state["lookback_window"] = self._lookback_window 218 if self._parent_state is not None: 219 state["parent_state"] = self._parent_state 220 return state
222 def close_partition(self, partition: Partition) -> None: 223 # Attempt to retrieve the stream slice 224 stream_slice: Optional[StreamSlice] = partition.to_slice() # type: ignore[assignment] 225 226 # Ensure stream_slice is not None 227 if stream_slice is None: 228 raise ValueError("stream_slice cannot be None") 229 230 partition_key = self._to_partition_key(stream_slice.partition) 231 with self._lock: 232 self._semaphore_per_partition[partition_key].acquire() 233 if not self._use_global_cursor: 234 cursor = self._cursor_per_partition[partition_key] 235 cursor.close_partition(partition=partition) 236 if ( 237 partition_key in self._partitions_done_generating_stream_slices 238 and self._semaphore_per_partition[partition_key]._value == 0 239 ): 240 self._update_global_cursor(cursor.state[self.cursor_field.cursor_field_key]) 241 242 # Clean up the partition if it is fully processed 243 self._cleanup_if_done(partition_key) 244 245 self._check_and_update_parent_state() 246 247 self._emit_state_message()
Indicate to the cursor that the partition has been successfully processed
271 def ensure_at_least_one_state_emitted(self) -> None: 272 """ 273 The platform expects at least one state message on successful syncs. Hence, whatever happens, we expect this method to be 274 called. 275 """ 276 if not any( 277 semaphore_item[1]._value for semaphore_item in self._semaphore_per_partition.items() 278 ): 279 if self._synced_some_data: 280 # we only update those if we actually synced some data 281 self._global_cursor = self._new_global_cursor 282 self._lookback_window = self._timer.finish() 283 self._parent_state = self._partition_router.get_stream_state() 284 self._emit_state_message(throttle=False)
The platform expects at least one state message on successful syncs. Hence, whatever happens, we expect this method to be called.
315 def stream_slices(self) -> Iterable[StreamSlice]: 316 if self._timer.is_running(): 317 raise RuntimeError("stream_slices has been executed more than once.") 318 319 slices = self._partition_router.stream_slices() 320 self._timer.start() 321 for partition, last, parent_state in iterate_with_last_flag_and_state( 322 slices, self._partition_router.get_stream_state 323 ): 324 yield from self._generate_slices_from_partition(partition, parent_state)
Default placeholder implementation of generate_slices. Subclasses can override this method to provide actual behavior.
511 def observe(self, record: Record) -> None: 512 if not record.associated_slice: 513 raise ValueError( 514 "Invalid state as stream slices that are emitted should refer to an existing cursor" 515 ) 516 517 # if the current record has no cursor value, we cannot meaningfully update the state based on it, so there is nothing more to do 518 try: 519 record_cursor_value = self._cursor_field.extract_value(record) 520 except ValueError: 521 return 522 523 try: 524 record_cursor = self._connector_state_converter.output_format( 525 self._connector_state_converter.parse_value(record_cursor_value) 526 ) 527 except ValueError as exception: 528 if not self._logged_regarding_datetime_format_error: 529 logger.warning( 530 "Skipping cursor update for stream '%s': failed to parse cursor field '%s' value %r: %s", 531 self._stream_name, 532 self._cursor_field.cursor_field_key, 533 record_cursor_value, 534 exception, 535 ) 536 self._logged_regarding_datetime_format_error = True 537 return 538 539 self._synced_some_data = True 540 self._update_global_cursor(record_cursor) 541 if not self._use_global_cursor: 542 self._cursor_per_partition[ 543 self._to_partition_key(record.associated_slice.partition) 544 ].observe(record)
Indicate to the cursor that the record has been emitted
621 @staticmethod 622 def get_parent_state( 623 stream_state: Optional[StreamState], parent_stream_name: str 624 ) -> Optional[AirbyteStateMessage]: 625 if not stream_state: 626 return None 627 628 if "parent_state" not in stream_state: 629 logger.warning( 630 f"Trying to get_parent_state for stream `{parent_stream_name}` when there are not parent state in the state" 631 ) 632 return None 633 elif parent_stream_name not in stream_state["parent_state"]: 634 logger.info( 635 f"Could not find parent state for stream `{parent_stream_name}`. On parents available are {list(stream_state['parent_state'].keys())}" 636 ) 637 return None 638 639 return AirbyteStateMessage( 640 type=AirbyteStateType.STREAM, 641 stream=AirbyteStreamState( 642 stream_descriptor=StreamDescriptor(parent_stream_name, None), 643 stream_state=AirbyteStateBlob(stream_state["parent_state"][parent_stream_name]), 644 ), 645 )
647 @staticmethod 648 def get_global_state( 649 stream_state: Optional[StreamState], parent_stream_name: str 650 ) -> Optional[AirbyteStateMessage]: 651 return ( 652 AirbyteStateMessage( 653 type=AirbyteStateType.STREAM, 654 stream=AirbyteStreamState( 655 stream_descriptor=StreamDescriptor(parent_stream_name, None), 656 stream_state=AirbyteStateBlob(stream_state["state"]), 657 ), 658 ) 659 if stream_state and "state" in stream_state 660 else None 661 )