airbyte_cdk.sources.streams.concurrent.cursor
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import datetime 6import functools 7import logging 8import threading 9from abc import ABC, abstractmethod 10from typing import ( 11 Any, 12 Callable, 13 Iterable, 14 List, 15 Mapping, 16 MutableMapping, 17 Optional, 18 Tuple, 19 Union, 20) 21 22from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager 23from airbyte_cdk.sources.message import MessageRepository, NoopMessageRepository 24from airbyte_cdk.sources.streams import NO_CURSOR_STATE_KEY 25from airbyte_cdk.sources.streams.concurrent.clamping import ClampingStrategy, NoClamping 26from airbyte_cdk.sources.streams.concurrent.cursor_types import CursorValueType, GapType 27from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition 28from airbyte_cdk.sources.streams.concurrent.partitions.stream_slicer import StreamSlicer 29from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import ( 30 AbstractStreamStateConverter, 31) 32from airbyte_cdk.sources.types import Record, StreamSlice 33 34LOGGER = logging.getLogger("airbyte") 35 36 37def _extract_value(mapping: Mapping[str, Any], path: List[str]) -> Any: 38 return functools.reduce(lambda a, b: a[b], path, mapping) 39 40 41class CursorField: 42 def __init__( 43 self, cursor_field_key: str, supports_catalog_defined_cursor_field: bool = False 44 ) -> None: 45 self.cursor_field_key = cursor_field_key 46 self.supports_catalog_defined_cursor_field = supports_catalog_defined_cursor_field 47 48 def extract_value(self, record: Record) -> Any: 49 cursor_value = record.data.get(self.cursor_field_key) 50 if cursor_value is None: 51 raise ValueError(f"Could not find cursor field {self.cursor_field_key} in record") 52 return cursor_value # type: ignore # we assume that the value the path points at is a comparable 53 54 55class Cursor(StreamSlicer, ABC): 56 @property 57 @abstractmethod 58 def state(self) -> MutableMapping[str, Any]: ... 59 60 @abstractmethod 61 def observe(self, record: Record) -> None: 62 """ 63 Indicate to the cursor that the record has been emitted 64 """ 65 raise NotImplementedError() 66 67 @abstractmethod 68 def close_partition(self, partition: Partition) -> None: 69 """ 70 Indicate to the cursor that the partition has been successfully processed 71 """ 72 raise NotImplementedError() 73 74 @abstractmethod 75 def ensure_at_least_one_state_emitted(self) -> None: 76 """ 77 State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per 78 stream. Hence, if no partitions are generated, this method needs to be called. 79 """ 80 raise NotImplementedError() 81 82 @abstractmethod 83 def should_be_synced(self, record: Record) -> bool: 84 pass 85 86 def stream_slices(self) -> Iterable[StreamSlice]: 87 """ 88 Default placeholder implementation of generate_slices. 89 Subclasses can override this method to provide actual behavior. 90 """ 91 yield StreamSlice(partition={}, cursor_slice={}) 92 93 def get_cursor_datetime_from_state( 94 self, stream_state: Mapping[str, Any] 95 ) -> datetime.datetime | None: 96 """Extract and parse the cursor datetime from the given stream state. 97 98 This method is used by StateDelegatingStream to validate cursor age against 99 an API's data retention period. Subclasses should implement this method to 100 extract the cursor value from their specific state structure and parse it 101 into a datetime object. 102 103 Returns None if the cursor cannot be extracted or parsed, which will cause 104 StateDelegatingStream to fall back to full refresh (safe default). 105 106 Raises NotImplementedError by default - subclasses must implement this method 107 if they want to support cursor age validation with api_retention_period. 108 """ 109 raise NotImplementedError( 110 f"{self.__class__.__name__} does not implement get_cursor_datetime_from_state. " 111 f"Cursor age validation with api_retention_period is not supported for this cursor type." 112 ) 113 114 115class FinalStateCursor(Cursor): 116 """Cursor that is used to guarantee at least one state message is emitted for a concurrent stream.""" 117 118 def __init__( 119 self, 120 stream_name: str, 121 stream_namespace: Optional[str], 122 message_repository: MessageRepository, 123 ) -> None: 124 self._stream_name = stream_name 125 self._stream_namespace = stream_namespace 126 self._message_repository = message_repository 127 # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel 128 # state message rather than manage overall source state. This is also only temporary as we move to the resumable 129 # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state. 130 self._connector_state_manager = ConnectorStateManager() 131 self._has_closed_at_least_one_slice = False 132 133 @property 134 def state(self) -> MutableMapping[str, Any]: 135 return {NO_CURSOR_STATE_KEY: True} 136 137 def observe(self, record: Record) -> None: 138 pass 139 140 def close_partition(self, partition: Partition) -> None: 141 pass 142 143 def ensure_at_least_one_state_emitted(self) -> None: 144 """ 145 Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync 146 """ 147 148 self._connector_state_manager.update_state_for_stream( 149 self._stream_name, self._stream_namespace, self.state 150 ) 151 state_message = self._connector_state_manager.create_state_message( 152 self._stream_name, self._stream_namespace 153 ) 154 self._message_repository.emit_message(state_message) 155 156 def should_be_synced(self, record: Record) -> bool: 157 return True 158 159 def get_cursor_datetime_from_state( 160 self, stream_state: Mapping[str, Any] 161 ) -> datetime.datetime | None: 162 """Return now() if state indicates a completed full refresh, else None. 163 164 When the state has NO_CURSOR_STATE_KEY: True, it means the previous sync was a 165 completed full refresh. Returning now() indicates the cursor is "current" and 166 within any retention period, so we should use incremental sync. 167 168 For any other state format, return None to indicate this cursor cannot parse it, 169 allowing the incremental cursor to handle the state instead. 170 """ 171 if stream_state.get(NO_CURSOR_STATE_KEY): 172 return datetime.datetime.now(datetime.timezone.utc) 173 return None 174 175 176class ConcurrentCursor(Cursor): 177 _START_BOUNDARY = 0 178 _END_BOUNDARY = 1 179 180 def copy_without_state(self) -> "ConcurrentCursor": 181 return self.__class__( 182 stream_name=self._stream_name, 183 stream_namespace=self._stream_namespace, 184 stream_state={}, 185 message_repository=NoopMessageRepository(), 186 connector_state_manager=ConnectorStateManager(), 187 connector_state_converter=self._connector_state_converter, 188 cursor_field=self._cursor_field, 189 slice_boundary_fields=self._slice_boundary_fields, 190 start=self._start, 191 end_provider=self._end_provider, 192 lookback_window=self._lookback_window, 193 slice_range=self._slice_range, 194 cursor_granularity=self._cursor_granularity, 195 clamping_strategy=self._clamping_strategy, 196 ) 197 198 def __init__( 199 self, 200 stream_name: str, 201 stream_namespace: Optional[str], 202 stream_state: Any, 203 message_repository: MessageRepository, 204 connector_state_manager: ConnectorStateManager, 205 connector_state_converter: AbstractStreamStateConverter, 206 cursor_field: CursorField, 207 slice_boundary_fields: Optional[Tuple[str, str]], 208 start: Optional[CursorValueType], 209 end_provider: Callable[[], CursorValueType], 210 lookback_window: Optional[GapType] = None, 211 slice_range: Optional[GapType] = None, 212 cursor_granularity: Optional[GapType] = None, 213 clamping_strategy: ClampingStrategy = NoClamping(), 214 ) -> None: 215 self._stream_name = stream_name 216 self._stream_namespace = stream_namespace 217 self._message_repository = message_repository 218 self._connector_state_converter = connector_state_converter 219 self._connector_state_manager = connector_state_manager 220 self._cursor_field = cursor_field 221 # To see some example where the slice boundaries might not be defined, check https://github.com/airbytehq/airbyte/blob/1ce84d6396e446e1ac2377362446e3fb94509461/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py#L363-L379 222 self._slice_boundary_fields = slice_boundary_fields 223 self._start = start 224 self._end_provider = end_provider 225 self.start, self._concurrent_state = self._get_concurrent_state(stream_state) 226 self._lookback_window = lookback_window 227 self._slice_range = slice_range 228 self._most_recent_cursor_value_per_partition: MutableMapping[ 229 Union[StreamSlice, Mapping[str, Any], None], Any 230 ] = {} 231 self._has_closed_at_least_one_slice = False 232 self._cursor_granularity = cursor_granularity 233 # Flag to track if the logger has been triggered (per stream) 234 self._should_be_synced_logger_triggered = False 235 self._clamping_strategy = clamping_strategy 236 self._is_ascending_order = True 237 238 # A lock is required when closing a partition because updating the cursor's concurrent_state is 239 # not thread safe. When multiple partitions are being closed by the cursor at the same time, it is 240 # possible for one partition to update concurrent_state after a second partition has already read 241 # the previous state. This can lead to the second partition overwriting the previous one's state. 242 self._lock = threading.Lock() 243 244 @property 245 def state(self) -> MutableMapping[str, Any]: 246 return self._connector_state_converter.convert_to_state_message( 247 self.cursor_field, self._concurrent_state 248 ) 249 250 @property 251 def cursor_field(self) -> CursorField: 252 return self._cursor_field 253 254 @property 255 def _slice_boundary_fields_wrapper(self) -> Tuple[str, str]: 256 return ( 257 self._slice_boundary_fields 258 if self._slice_boundary_fields 259 else ( 260 self._connector_state_converter.START_KEY, 261 self._connector_state_converter.END_KEY, 262 ) 263 ) 264 265 def _get_concurrent_state( 266 self, state: MutableMapping[str, Any] 267 ) -> Tuple[CursorValueType, MutableMapping[str, Any]]: 268 if self._connector_state_converter.is_state_message_compatible(state): 269 partitioned_state = self._connector_state_converter.deserialize(state) 270 slices_from_partitioned_state = partitioned_state.get("slices", []) 271 272 value_from_partitioned_state = None 273 if slices_from_partitioned_state: 274 # We assume here that the slices have been already merged 275 first_slice = slices_from_partitioned_state[0] 276 value_from_partitioned_state = ( 277 first_slice[self._connector_state_converter.MOST_RECENT_RECORD_KEY] 278 if self._connector_state_converter.MOST_RECENT_RECORD_KEY in first_slice 279 else first_slice[self._connector_state_converter.END_KEY] 280 ) 281 return ( 282 value_from_partitioned_state 283 or self._start 284 or self._connector_state_converter.zero_value, 285 partitioned_state, 286 ) 287 return self._connector_state_converter.convert_from_sequential_state( 288 self._cursor_field, state, self._start 289 ) 290 291 def observe(self, record: Record) -> None: 292 # Because observe writes to the most_recent_cursor_value_per_partition mapping, 293 # it is not thread-safe. However, this shouldn't lead to concurrency issues because 294 # observe() is only invoked by PartitionReader.process_partition(). Since the map is 295 # broken down according to partition, concurrent threads processing only read/write 296 # from different keys which avoids any conflicts. 297 # 298 # If we were to add thread safety, we should implement a lock per-partition 299 # which is instantiated during stream_slices() 300 most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get( 301 record.associated_slice 302 ) 303 try: 304 cursor_value = self._extract_cursor_value(record) 305 306 if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value: 307 self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value 308 elif most_recent_cursor_value > cursor_value: 309 self._is_ascending_order = False 310 except ValueError: 311 self._log_for_record_without_cursor_value() 312 313 def _extract_cursor_value(self, record: Record) -> Any: 314 return self._connector_state_converter.parse_value(self._cursor_field.extract_value(record)) 315 316 def close_partition(self, partition: Partition) -> None: 317 with self._lock: 318 slice_count_before = len(self._concurrent_state.get("slices", [])) 319 self._add_slice_to_state(partition) 320 if slice_count_before < len( 321 self._concurrent_state["slices"] 322 ): # only emit if at least one slice has been processed 323 self._merge_partitions() 324 self._emit_state_message() 325 self._has_closed_at_least_one_slice = True 326 327 def _add_slice_to_state(self, partition: Partition) -> None: 328 most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get( 329 partition.to_slice() 330 ) 331 332 if self._slice_boundary_fields: 333 if "slices" not in self._concurrent_state: 334 raise RuntimeError( 335 f"The state for stream {self._stream_name} should have at least one slice to delineate the sync start time, but no slices are present. This is unexpected. Please contact Support." 336 ) 337 self._concurrent_state["slices"].append( 338 { 339 self._connector_state_converter.START_KEY: self._extract_from_slice( 340 partition, self._slice_boundary_fields[self._START_BOUNDARY] 341 ), 342 self._connector_state_converter.END_KEY: self._extract_from_slice( 343 partition, self._slice_boundary_fields[self._END_BOUNDARY] 344 ), 345 self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value, 346 } 347 ) 348 elif most_recent_cursor_value: 349 if self._has_closed_at_least_one_slice: 350 # If we track state value using records cursor field, we can only do that if there is one partition. This is because we save 351 # the state every time we close a partition. We assume that if there are multiple slices, they need to be providing 352 # boundaries. There are cases where partitions could not have boundaries: 353 # * The cursor should be per-partition 354 # * The stream state is actually the parent stream state 355 # There might be other cases not listed above. Those are not supported today hence the stream should not use this cursor for 356 # state management. For the specific user that was affected with this issue, we need to: 357 # * Fix state tracking (which is currently broken) 358 # * Make the new version available 359 # * (Probably) ask the user to reset the stream to avoid data loss 360 raise ValueError( 361 "Given that slice_boundary_fields is not defined and that per-partition state is not supported, only one slice is " 362 "expected. Please contact the Airbyte team." 363 ) 364 365 self._concurrent_state["slices"].append( 366 { 367 self._connector_state_converter.START_KEY: self.start, 368 self._connector_state_converter.END_KEY: most_recent_cursor_value, 369 self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value, 370 } 371 ) 372 373 def _emit_state_message(self) -> None: 374 self._connector_state_manager.update_state_for_stream( 375 self._stream_name, 376 self._stream_namespace, 377 self.state, 378 ) 379 state_message = self._connector_state_manager.create_state_message( 380 self._stream_name, self._stream_namespace 381 ) 382 self._message_repository.emit_message(state_message) 383 384 def _merge_partitions(self) -> None: 385 self._concurrent_state["slices"] = self._connector_state_converter.merge_intervals( 386 self._concurrent_state["slices"] 387 ) 388 389 def _extract_from_slice(self, partition: Partition, key: str) -> CursorValueType: 390 try: 391 _slice = partition.to_slice() 392 if not _slice: 393 raise KeyError(f"Could not find key `{key}` in empty slice") 394 return self._connector_state_converter.parse_value(_slice[key]) # type: ignore # we expect the devs to specify a key that would return a CursorValueType 395 except KeyError as exception: 396 raise KeyError( 397 f"Partition is expected to have key `{key}` but could not be found" 398 ) from exception 399 400 def ensure_at_least_one_state_emitted(self) -> None: 401 """ 402 The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be 403 called. 404 """ 405 self._emit_state_message() 406 407 def stream_slices(self) -> Iterable[StreamSlice]: 408 """ 409 Generating slices based on a few parameters: 410 * lookback_window: Buffer to remove from END_KEY of the highest slice 411 * slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created 412 * start: `_split_per_slice_range` will clip any value to `self._start which means that: 413 * if upper is less than self._start, no slices will be generated 414 * if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case) 415 416 Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be 417 inclusive in the API that is queried. 418 """ 419 self._merge_partitions() 420 421 if self._start is not None and self._is_start_before_first_slice(): 422 yield from self._split_per_slice_range( 423 self._start, 424 self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY], 425 False, 426 ) 427 428 if len(self._concurrent_state["slices"]) == 1: 429 yield from self._split_per_slice_range( 430 self._calculate_lower_boundary_of_last_slice( 431 self._concurrent_state["slices"][0][self._connector_state_converter.END_KEY] 432 ), 433 self._end_provider(), 434 True, 435 ) 436 elif len(self._concurrent_state["slices"]) > 1: 437 for i in range(len(self._concurrent_state["slices"]) - 1): 438 if self._cursor_granularity: 439 yield from self._split_per_slice_range( 440 self._concurrent_state["slices"][i][self._connector_state_converter.END_KEY] 441 + self._cursor_granularity, 442 self._concurrent_state["slices"][i + 1][ 443 self._connector_state_converter.START_KEY 444 ], 445 False, 446 ) 447 else: 448 yield from self._split_per_slice_range( 449 self._concurrent_state["slices"][i][ 450 self._connector_state_converter.END_KEY 451 ], 452 self._concurrent_state["slices"][i + 1][ 453 self._connector_state_converter.START_KEY 454 ], 455 False, 456 ) 457 yield from self._split_per_slice_range( 458 self._calculate_lower_boundary_of_last_slice( 459 self._concurrent_state["slices"][-1][self._connector_state_converter.END_KEY] 460 ), 461 self._end_provider(), 462 True, 463 ) 464 else: 465 raise ValueError("Expected at least one slice") 466 467 def _is_start_before_first_slice(self) -> bool: 468 return ( 469 self._start is not None 470 and self._start 471 < self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY] 472 ) 473 474 def _calculate_lower_boundary_of_last_slice( 475 self, lower_boundary: CursorValueType 476 ) -> CursorValueType: 477 if self._lookback_window: 478 return lower_boundary - self._lookback_window 479 return lower_boundary 480 481 def _split_per_slice_range( 482 self, lower: CursorValueType, upper: CursorValueType, upper_is_end: bool 483 ) -> Iterable[StreamSlice]: 484 if lower >= upper: 485 return 486 487 if self._start and upper < self._start: 488 return 489 490 lower = max(lower, self._start) if self._start else lower 491 if not self._slice_range or self._evaluate_upper_safely(lower, self._slice_range) >= upper: 492 clamped_lower = self._clamping_strategy.clamp(lower) 493 clamped_upper = self._clamping_strategy.clamp(upper) 494 start_value, end_value = ( 495 (clamped_lower, clamped_upper - self._cursor_granularity) 496 if self._cursor_granularity and not upper_is_end 497 else (clamped_lower, clamped_upper) 498 ) 499 yield StreamSlice( 500 partition={}, 501 cursor_slice={ 502 self._slice_boundary_fields_wrapper[ 503 self._START_BOUNDARY 504 ]: self._connector_state_converter.output_format(start_value), 505 self._slice_boundary_fields_wrapper[ 506 self._END_BOUNDARY 507 ]: self._connector_state_converter.output_format(end_value), 508 }, 509 ) 510 else: 511 stop_processing = False 512 current_lower_boundary = lower 513 while not stop_processing: 514 current_upper_boundary = min( 515 self._evaluate_upper_safely(current_lower_boundary, self._slice_range), upper 516 ) 517 has_reached_upper_boundary = current_upper_boundary >= upper 518 519 clamped_upper = ( 520 self._clamping_strategy.clamp(current_upper_boundary) 521 if current_upper_boundary != upper 522 else current_upper_boundary 523 ) 524 clamped_lower = self._clamping_strategy.clamp(current_lower_boundary) 525 if clamped_lower >= clamped_upper: 526 # clamping collapsed both values which means that it is time to stop processing 527 # FIXME should this be replace by proper end_provider 528 break 529 start_value, end_value = ( 530 (clamped_lower, clamped_upper - self._cursor_granularity) 531 if self._cursor_granularity 532 and (not upper_is_end or not has_reached_upper_boundary) 533 else (clamped_lower, clamped_upper) 534 ) 535 yield StreamSlice( 536 partition={}, 537 cursor_slice={ 538 self._slice_boundary_fields_wrapper[ 539 self._START_BOUNDARY 540 ]: self._connector_state_converter.output_format(start_value), 541 self._slice_boundary_fields_wrapper[ 542 self._END_BOUNDARY 543 ]: self._connector_state_converter.output_format(end_value), 544 }, 545 ) 546 current_lower_boundary = clamped_upper 547 if current_upper_boundary >= upper: 548 stop_processing = True 549 550 def _evaluate_upper_safely(self, lower: CursorValueType, step: GapType) -> CursorValueType: 551 """ 552 Given that we set the default step at datetime.timedelta.max, we will generate an OverflowError when evaluating the next start_date 553 This method assumes that users would never enter a step that would generate an overflow. Given that would be the case, the code 554 would have broken anyway. 555 """ 556 try: 557 return lower + step 558 except OverflowError: 559 return self._end_provider() 560 561 def should_be_synced(self, record: Record) -> bool: 562 """ 563 Determines if a record should be synced based on its cursor value. 564 :param record: The record to evaluate 565 566 :return: True if the record's cursor value falls within the sync boundaries 567 """ 568 try: 569 record_cursor_value: CursorValueType = self._extract_cursor_value(record) 570 except ValueError: 571 self._log_for_record_without_cursor_value() 572 return True 573 return self.start <= record_cursor_value <= self._end_provider() 574 575 def _log_for_record_without_cursor_value(self) -> None: 576 if not self._should_be_synced_logger_triggered: 577 LOGGER.warning( 578 f"Could not find cursor field `{self.cursor_field.cursor_field_key}` in record for stream {self._stream_name}. The incremental sync will assume it needs to be synced" 579 ) 580 self._should_be_synced_logger_triggered = True 581 582 def reduce_slice_range(self, stream_slice: StreamSlice) -> StreamSlice: 583 # In theory, we might be more flexible here meaning that it doesn't need to be in ascending order but it just 584 # needs to be ordered. For now though, we will only support ascending order. 585 if not self._is_ascending_order: 586 LOGGER.warning( 587 "Attempting to reduce slice while records are not returned in incremental order might lead to missing records" 588 ) 589 590 if stream_slice in self._most_recent_cursor_value_per_partition: 591 return StreamSlice( 592 partition=stream_slice.partition, 593 cursor_slice={ 594 self._slice_boundary_fields_wrapper[ 595 self._START_BOUNDARY 596 ]: self._connector_state_converter.output_format( 597 self._most_recent_cursor_value_per_partition[stream_slice] 598 ), 599 self._slice_boundary_fields_wrapper[ 600 self._END_BOUNDARY 601 ]: stream_slice.cursor_slice[ 602 self._slice_boundary_fields_wrapper[self._END_BOUNDARY] 603 ], 604 }, 605 extra_fields=stream_slice.extra_fields, 606 ) 607 else: 608 return stream_slice 609 610 def get_cursor_datetime_from_state( 611 self, stream_state: Mapping[str, Any] 612 ) -> datetime.datetime | None: 613 """Extract and parse the cursor datetime from the given stream state. 614 615 For concurrent cursors, the state can be in two formats: 616 1. Sequential/legacy format: {cursor_field: cursor_value} 617 2. Concurrent format: {state_type: "date-range", slices: [...]} 618 619 Returns the cursor datetime if present and parseable, otherwise returns None. 620 """ 621 # Check if state is in concurrent format (need to convert to dict for type compatibility) 622 mutable_state: MutableMapping[str, Any] = dict(stream_state) 623 if self._connector_state_converter.is_state_message_compatible(mutable_state): 624 slices = stream_state.get("slices", []) 625 if not slices: 626 return None 627 # Get the most recent cursor value from the first slice (after merging) 628 first_slice = slices[0] 629 cursor_value = first_slice.get( 630 self._connector_state_converter.MOST_RECENT_RECORD_KEY 631 ) or first_slice.get(self._connector_state_converter.END_KEY) 632 if not cursor_value: 633 return None 634 try: 635 parsed_value = self._connector_state_converter.parse_value(cursor_value) 636 if isinstance(parsed_value, datetime.datetime): 637 return parsed_value 638 return None 639 except (ValueError, TypeError): 640 return None 641 642 # Sequential/legacy format: {cursor_field: cursor_value} 643 cursor_value = stream_state.get(self._cursor_field.cursor_field_key) 644 if not cursor_value: 645 return None 646 try: 647 parsed_value = self._connector_state_converter.parse_value(cursor_value) 648 if isinstance(parsed_value, datetime.datetime): 649 return parsed_value 650 return None 651 except (ValueError, TypeError): 652 return None
42class CursorField: 43 def __init__( 44 self, cursor_field_key: str, supports_catalog_defined_cursor_field: bool = False 45 ) -> None: 46 self.cursor_field_key = cursor_field_key 47 self.supports_catalog_defined_cursor_field = supports_catalog_defined_cursor_field 48 49 def extract_value(self, record: Record) -> Any: 50 cursor_value = record.data.get(self.cursor_field_key) 51 if cursor_value is None: 52 raise ValueError(f"Could not find cursor field {self.cursor_field_key} in record") 53 return cursor_value # type: ignore # we assume that the value the path points at is a comparable
49 def extract_value(self, record: Record) -> Any: 50 cursor_value = record.data.get(self.cursor_field_key) 51 if cursor_value is None: 52 raise ValueError(f"Could not find cursor field {self.cursor_field_key} in record") 53 return cursor_value # type: ignore # we assume that the value the path points at is a comparable
56class Cursor(StreamSlicer, ABC): 57 @property 58 @abstractmethod 59 def state(self) -> MutableMapping[str, Any]: ... 60 61 @abstractmethod 62 def observe(self, record: Record) -> None: 63 """ 64 Indicate to the cursor that the record has been emitted 65 """ 66 raise NotImplementedError() 67 68 @abstractmethod 69 def close_partition(self, partition: Partition) -> None: 70 """ 71 Indicate to the cursor that the partition has been successfully processed 72 """ 73 raise NotImplementedError() 74 75 @abstractmethod 76 def ensure_at_least_one_state_emitted(self) -> None: 77 """ 78 State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per 79 stream. Hence, if no partitions are generated, this method needs to be called. 80 """ 81 raise NotImplementedError() 82 83 @abstractmethod 84 def should_be_synced(self, record: Record) -> bool: 85 pass 86 87 def stream_slices(self) -> Iterable[StreamSlice]: 88 """ 89 Default placeholder implementation of generate_slices. 90 Subclasses can override this method to provide actual behavior. 91 """ 92 yield StreamSlice(partition={}, cursor_slice={}) 93 94 def get_cursor_datetime_from_state( 95 self, stream_state: Mapping[str, Any] 96 ) -> datetime.datetime | None: 97 """Extract and parse the cursor datetime from the given stream state. 98 99 This method is used by StateDelegatingStream to validate cursor age against 100 an API's data retention period. Subclasses should implement this method to 101 extract the cursor value from their specific state structure and parse it 102 into a datetime object. 103 104 Returns None if the cursor cannot be extracted or parsed, which will cause 105 StateDelegatingStream to fall back to full refresh (safe default). 106 107 Raises NotImplementedError by default - subclasses must implement this method 108 if they want to support cursor age validation with api_retention_period. 109 """ 110 raise NotImplementedError( 111 f"{self.__class__.__name__} does not implement get_cursor_datetime_from_state. " 112 f"Cursor age validation with api_retention_period is not supported for this cursor type." 113 )
Slices the stream into chunks that can be fetched independently. Slices enable state checkpointing and data retrieval parallelization.
61 @abstractmethod 62 def observe(self, record: Record) -> None: 63 """ 64 Indicate to the cursor that the record has been emitted 65 """ 66 raise NotImplementedError()
Indicate to the cursor that the record has been emitted
68 @abstractmethod 69 def close_partition(self, partition: Partition) -> None: 70 """ 71 Indicate to the cursor that the partition has been successfully processed 72 """ 73 raise NotImplementedError()
Indicate to the cursor that the partition has been successfully processed
75 @abstractmethod 76 def ensure_at_least_one_state_emitted(self) -> None: 77 """ 78 State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per 79 stream. Hence, if no partitions are generated, this method needs to be called. 80 """ 81 raise NotImplementedError()
State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per stream. Hence, if no partitions are generated, this method needs to be called.
87 def stream_slices(self) -> Iterable[StreamSlice]: 88 """ 89 Default placeholder implementation of generate_slices. 90 Subclasses can override this method to provide actual behavior. 91 """ 92 yield StreamSlice(partition={}, cursor_slice={})
Default placeholder implementation of generate_slices. Subclasses can override this method to provide actual behavior.
94 def get_cursor_datetime_from_state( 95 self, stream_state: Mapping[str, Any] 96 ) -> datetime.datetime | None: 97 """Extract and parse the cursor datetime from the given stream state. 98 99 This method is used by StateDelegatingStream to validate cursor age against 100 an API's data retention period. Subclasses should implement this method to 101 extract the cursor value from their specific state structure and parse it 102 into a datetime object. 103 104 Returns None if the cursor cannot be extracted or parsed, which will cause 105 StateDelegatingStream to fall back to full refresh (safe default). 106 107 Raises NotImplementedError by default - subclasses must implement this method 108 if they want to support cursor age validation with api_retention_period. 109 """ 110 raise NotImplementedError( 111 f"{self.__class__.__name__} does not implement get_cursor_datetime_from_state. " 112 f"Cursor age validation with api_retention_period is not supported for this cursor type." 113 )
Extract and parse the cursor datetime from the given stream state.
This method is used by StateDelegatingStream to validate cursor age against an API's data retention period. Subclasses should implement this method to extract the cursor value from their specific state structure and parse it into a datetime object.
Returns None if the cursor cannot be extracted or parsed, which will cause StateDelegatingStream to fall back to full refresh (safe default).
Raises NotImplementedError by default - subclasses must implement this method if they want to support cursor age validation with api_retention_period.
116class FinalStateCursor(Cursor): 117 """Cursor that is used to guarantee at least one state message is emitted for a concurrent stream.""" 118 119 def __init__( 120 self, 121 stream_name: str, 122 stream_namespace: Optional[str], 123 message_repository: MessageRepository, 124 ) -> None: 125 self._stream_name = stream_name 126 self._stream_namespace = stream_namespace 127 self._message_repository = message_repository 128 # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel 129 # state message rather than manage overall source state. This is also only temporary as we move to the resumable 130 # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state. 131 self._connector_state_manager = ConnectorStateManager() 132 self._has_closed_at_least_one_slice = False 133 134 @property 135 def state(self) -> MutableMapping[str, Any]: 136 return {NO_CURSOR_STATE_KEY: True} 137 138 def observe(self, record: Record) -> None: 139 pass 140 141 def close_partition(self, partition: Partition) -> None: 142 pass 143 144 def ensure_at_least_one_state_emitted(self) -> None: 145 """ 146 Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync 147 """ 148 149 self._connector_state_manager.update_state_for_stream( 150 self._stream_name, self._stream_namespace, self.state 151 ) 152 state_message = self._connector_state_manager.create_state_message( 153 self._stream_name, self._stream_namespace 154 ) 155 self._message_repository.emit_message(state_message) 156 157 def should_be_synced(self, record: Record) -> bool: 158 return True 159 160 def get_cursor_datetime_from_state( 161 self, stream_state: Mapping[str, Any] 162 ) -> datetime.datetime | None: 163 """Return now() if state indicates a completed full refresh, else None. 164 165 When the state has NO_CURSOR_STATE_KEY: True, it means the previous sync was a 166 completed full refresh. Returning now() indicates the cursor is "current" and 167 within any retention period, so we should use incremental sync. 168 169 For any other state format, return None to indicate this cursor cannot parse it, 170 allowing the incremental cursor to handle the state instead. 171 """ 172 if stream_state.get(NO_CURSOR_STATE_KEY): 173 return datetime.datetime.now(datetime.timezone.utc) 174 return None
Cursor that is used to guarantee at least one state message is emitted for a concurrent stream.
119 def __init__( 120 self, 121 stream_name: str, 122 stream_namespace: Optional[str], 123 message_repository: MessageRepository, 124 ) -> None: 125 self._stream_name = stream_name 126 self._stream_namespace = stream_namespace 127 self._message_repository = message_repository 128 # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel 129 # state message rather than manage overall source state. This is also only temporary as we move to the resumable 130 # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state. 131 self._connector_state_manager = ConnectorStateManager() 132 self._has_closed_at_least_one_slice = False
Indicate to the cursor that the partition has been successfully processed
144 def ensure_at_least_one_state_emitted(self) -> None: 145 """ 146 Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync 147 """ 148 149 self._connector_state_manager.update_state_for_stream( 150 self._stream_name, self._stream_namespace, self.state 151 ) 152 state_message = self._connector_state_manager.create_state_message( 153 self._stream_name, self._stream_namespace 154 ) 155 self._message_repository.emit_message(state_message)
Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync
160 def get_cursor_datetime_from_state( 161 self, stream_state: Mapping[str, Any] 162 ) -> datetime.datetime | None: 163 """Return now() if state indicates a completed full refresh, else None. 164 165 When the state has NO_CURSOR_STATE_KEY: True, it means the previous sync was a 166 completed full refresh. Returning now() indicates the cursor is "current" and 167 within any retention period, so we should use incremental sync. 168 169 For any other state format, return None to indicate this cursor cannot parse it, 170 allowing the incremental cursor to handle the state instead. 171 """ 172 if stream_state.get(NO_CURSOR_STATE_KEY): 173 return datetime.datetime.now(datetime.timezone.utc) 174 return None
Return now() if state indicates a completed full refresh, else None.
When the state has NO_CURSOR_STATE_KEY: True, it means the previous sync was a completed full refresh. Returning now() indicates the cursor is "current" and within any retention period, so we should use incremental sync.
For any other state format, return None to indicate this cursor cannot parse it, allowing the incremental cursor to handle the state instead.
Inherited Members
177class ConcurrentCursor(Cursor): 178 _START_BOUNDARY = 0 179 _END_BOUNDARY = 1 180 181 def copy_without_state(self) -> "ConcurrentCursor": 182 return self.__class__( 183 stream_name=self._stream_name, 184 stream_namespace=self._stream_namespace, 185 stream_state={}, 186 message_repository=NoopMessageRepository(), 187 connector_state_manager=ConnectorStateManager(), 188 connector_state_converter=self._connector_state_converter, 189 cursor_field=self._cursor_field, 190 slice_boundary_fields=self._slice_boundary_fields, 191 start=self._start, 192 end_provider=self._end_provider, 193 lookback_window=self._lookback_window, 194 slice_range=self._slice_range, 195 cursor_granularity=self._cursor_granularity, 196 clamping_strategy=self._clamping_strategy, 197 ) 198 199 def __init__( 200 self, 201 stream_name: str, 202 stream_namespace: Optional[str], 203 stream_state: Any, 204 message_repository: MessageRepository, 205 connector_state_manager: ConnectorStateManager, 206 connector_state_converter: AbstractStreamStateConverter, 207 cursor_field: CursorField, 208 slice_boundary_fields: Optional[Tuple[str, str]], 209 start: Optional[CursorValueType], 210 end_provider: Callable[[], CursorValueType], 211 lookback_window: Optional[GapType] = None, 212 slice_range: Optional[GapType] = None, 213 cursor_granularity: Optional[GapType] = None, 214 clamping_strategy: ClampingStrategy = NoClamping(), 215 ) -> None: 216 self._stream_name = stream_name 217 self._stream_namespace = stream_namespace 218 self._message_repository = message_repository 219 self._connector_state_converter = connector_state_converter 220 self._connector_state_manager = connector_state_manager 221 self._cursor_field = cursor_field 222 # To see some example where the slice boundaries might not be defined, check https://github.com/airbytehq/airbyte/blob/1ce84d6396e446e1ac2377362446e3fb94509461/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py#L363-L379 223 self._slice_boundary_fields = slice_boundary_fields 224 self._start = start 225 self._end_provider = end_provider 226 self.start, self._concurrent_state = self._get_concurrent_state(stream_state) 227 self._lookback_window = lookback_window 228 self._slice_range = slice_range 229 self._most_recent_cursor_value_per_partition: MutableMapping[ 230 Union[StreamSlice, Mapping[str, Any], None], Any 231 ] = {} 232 self._has_closed_at_least_one_slice = False 233 self._cursor_granularity = cursor_granularity 234 # Flag to track if the logger has been triggered (per stream) 235 self._should_be_synced_logger_triggered = False 236 self._clamping_strategy = clamping_strategy 237 self._is_ascending_order = True 238 239 # A lock is required when closing a partition because updating the cursor's concurrent_state is 240 # not thread safe. When multiple partitions are being closed by the cursor at the same time, it is 241 # possible for one partition to update concurrent_state after a second partition has already read 242 # the previous state. This can lead to the second partition overwriting the previous one's state. 243 self._lock = threading.Lock() 244 245 @property 246 def state(self) -> MutableMapping[str, Any]: 247 return self._connector_state_converter.convert_to_state_message( 248 self.cursor_field, self._concurrent_state 249 ) 250 251 @property 252 def cursor_field(self) -> CursorField: 253 return self._cursor_field 254 255 @property 256 def _slice_boundary_fields_wrapper(self) -> Tuple[str, str]: 257 return ( 258 self._slice_boundary_fields 259 if self._slice_boundary_fields 260 else ( 261 self._connector_state_converter.START_KEY, 262 self._connector_state_converter.END_KEY, 263 ) 264 ) 265 266 def _get_concurrent_state( 267 self, state: MutableMapping[str, Any] 268 ) -> Tuple[CursorValueType, MutableMapping[str, Any]]: 269 if self._connector_state_converter.is_state_message_compatible(state): 270 partitioned_state = self._connector_state_converter.deserialize(state) 271 slices_from_partitioned_state = partitioned_state.get("slices", []) 272 273 value_from_partitioned_state = None 274 if slices_from_partitioned_state: 275 # We assume here that the slices have been already merged 276 first_slice = slices_from_partitioned_state[0] 277 value_from_partitioned_state = ( 278 first_slice[self._connector_state_converter.MOST_RECENT_RECORD_KEY] 279 if self._connector_state_converter.MOST_RECENT_RECORD_KEY in first_slice 280 else first_slice[self._connector_state_converter.END_KEY] 281 ) 282 return ( 283 value_from_partitioned_state 284 or self._start 285 or self._connector_state_converter.zero_value, 286 partitioned_state, 287 ) 288 return self._connector_state_converter.convert_from_sequential_state( 289 self._cursor_field, state, self._start 290 ) 291 292 def observe(self, record: Record) -> None: 293 # Because observe writes to the most_recent_cursor_value_per_partition mapping, 294 # it is not thread-safe. However, this shouldn't lead to concurrency issues because 295 # observe() is only invoked by PartitionReader.process_partition(). Since the map is 296 # broken down according to partition, concurrent threads processing only read/write 297 # from different keys which avoids any conflicts. 298 # 299 # If we were to add thread safety, we should implement a lock per-partition 300 # which is instantiated during stream_slices() 301 most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get( 302 record.associated_slice 303 ) 304 try: 305 cursor_value = self._extract_cursor_value(record) 306 307 if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value: 308 self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value 309 elif most_recent_cursor_value > cursor_value: 310 self._is_ascending_order = False 311 except ValueError: 312 self._log_for_record_without_cursor_value() 313 314 def _extract_cursor_value(self, record: Record) -> Any: 315 return self._connector_state_converter.parse_value(self._cursor_field.extract_value(record)) 316 317 def close_partition(self, partition: Partition) -> None: 318 with self._lock: 319 slice_count_before = len(self._concurrent_state.get("slices", [])) 320 self._add_slice_to_state(partition) 321 if slice_count_before < len( 322 self._concurrent_state["slices"] 323 ): # only emit if at least one slice has been processed 324 self._merge_partitions() 325 self._emit_state_message() 326 self._has_closed_at_least_one_slice = True 327 328 def _add_slice_to_state(self, partition: Partition) -> None: 329 most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get( 330 partition.to_slice() 331 ) 332 333 if self._slice_boundary_fields: 334 if "slices" not in self._concurrent_state: 335 raise RuntimeError( 336 f"The state for stream {self._stream_name} should have at least one slice to delineate the sync start time, but no slices are present. This is unexpected. Please contact Support." 337 ) 338 self._concurrent_state["slices"].append( 339 { 340 self._connector_state_converter.START_KEY: self._extract_from_slice( 341 partition, self._slice_boundary_fields[self._START_BOUNDARY] 342 ), 343 self._connector_state_converter.END_KEY: self._extract_from_slice( 344 partition, self._slice_boundary_fields[self._END_BOUNDARY] 345 ), 346 self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value, 347 } 348 ) 349 elif most_recent_cursor_value: 350 if self._has_closed_at_least_one_slice: 351 # If we track state value using records cursor field, we can only do that if there is one partition. This is because we save 352 # the state every time we close a partition. We assume that if there are multiple slices, they need to be providing 353 # boundaries. There are cases where partitions could not have boundaries: 354 # * The cursor should be per-partition 355 # * The stream state is actually the parent stream state 356 # There might be other cases not listed above. Those are not supported today hence the stream should not use this cursor for 357 # state management. For the specific user that was affected with this issue, we need to: 358 # * Fix state tracking (which is currently broken) 359 # * Make the new version available 360 # * (Probably) ask the user to reset the stream to avoid data loss 361 raise ValueError( 362 "Given that slice_boundary_fields is not defined and that per-partition state is not supported, only one slice is " 363 "expected. Please contact the Airbyte team." 364 ) 365 366 self._concurrent_state["slices"].append( 367 { 368 self._connector_state_converter.START_KEY: self.start, 369 self._connector_state_converter.END_KEY: most_recent_cursor_value, 370 self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value, 371 } 372 ) 373 374 def _emit_state_message(self) -> None: 375 self._connector_state_manager.update_state_for_stream( 376 self._stream_name, 377 self._stream_namespace, 378 self.state, 379 ) 380 state_message = self._connector_state_manager.create_state_message( 381 self._stream_name, self._stream_namespace 382 ) 383 self._message_repository.emit_message(state_message) 384 385 def _merge_partitions(self) -> None: 386 self._concurrent_state["slices"] = self._connector_state_converter.merge_intervals( 387 self._concurrent_state["slices"] 388 ) 389 390 def _extract_from_slice(self, partition: Partition, key: str) -> CursorValueType: 391 try: 392 _slice = partition.to_slice() 393 if not _slice: 394 raise KeyError(f"Could not find key `{key}` in empty slice") 395 return self._connector_state_converter.parse_value(_slice[key]) # type: ignore # we expect the devs to specify a key that would return a CursorValueType 396 except KeyError as exception: 397 raise KeyError( 398 f"Partition is expected to have key `{key}` but could not be found" 399 ) from exception 400 401 def ensure_at_least_one_state_emitted(self) -> None: 402 """ 403 The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be 404 called. 405 """ 406 self._emit_state_message() 407 408 def stream_slices(self) -> Iterable[StreamSlice]: 409 """ 410 Generating slices based on a few parameters: 411 * lookback_window: Buffer to remove from END_KEY of the highest slice 412 * slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created 413 * start: `_split_per_slice_range` will clip any value to `self._start which means that: 414 * if upper is less than self._start, no slices will be generated 415 * if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case) 416 417 Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be 418 inclusive in the API that is queried. 419 """ 420 self._merge_partitions() 421 422 if self._start is not None and self._is_start_before_first_slice(): 423 yield from self._split_per_slice_range( 424 self._start, 425 self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY], 426 False, 427 ) 428 429 if len(self._concurrent_state["slices"]) == 1: 430 yield from self._split_per_slice_range( 431 self._calculate_lower_boundary_of_last_slice( 432 self._concurrent_state["slices"][0][self._connector_state_converter.END_KEY] 433 ), 434 self._end_provider(), 435 True, 436 ) 437 elif len(self._concurrent_state["slices"]) > 1: 438 for i in range(len(self._concurrent_state["slices"]) - 1): 439 if self._cursor_granularity: 440 yield from self._split_per_slice_range( 441 self._concurrent_state["slices"][i][self._connector_state_converter.END_KEY] 442 + self._cursor_granularity, 443 self._concurrent_state["slices"][i + 1][ 444 self._connector_state_converter.START_KEY 445 ], 446 False, 447 ) 448 else: 449 yield from self._split_per_slice_range( 450 self._concurrent_state["slices"][i][ 451 self._connector_state_converter.END_KEY 452 ], 453 self._concurrent_state["slices"][i + 1][ 454 self._connector_state_converter.START_KEY 455 ], 456 False, 457 ) 458 yield from self._split_per_slice_range( 459 self._calculate_lower_boundary_of_last_slice( 460 self._concurrent_state["slices"][-1][self._connector_state_converter.END_KEY] 461 ), 462 self._end_provider(), 463 True, 464 ) 465 else: 466 raise ValueError("Expected at least one slice") 467 468 def _is_start_before_first_slice(self) -> bool: 469 return ( 470 self._start is not None 471 and self._start 472 < self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY] 473 ) 474 475 def _calculate_lower_boundary_of_last_slice( 476 self, lower_boundary: CursorValueType 477 ) -> CursorValueType: 478 if self._lookback_window: 479 return lower_boundary - self._lookback_window 480 return lower_boundary 481 482 def _split_per_slice_range( 483 self, lower: CursorValueType, upper: CursorValueType, upper_is_end: bool 484 ) -> Iterable[StreamSlice]: 485 if lower >= upper: 486 return 487 488 if self._start and upper < self._start: 489 return 490 491 lower = max(lower, self._start) if self._start else lower 492 if not self._slice_range or self._evaluate_upper_safely(lower, self._slice_range) >= upper: 493 clamped_lower = self._clamping_strategy.clamp(lower) 494 clamped_upper = self._clamping_strategy.clamp(upper) 495 start_value, end_value = ( 496 (clamped_lower, clamped_upper - self._cursor_granularity) 497 if self._cursor_granularity and not upper_is_end 498 else (clamped_lower, clamped_upper) 499 ) 500 yield StreamSlice( 501 partition={}, 502 cursor_slice={ 503 self._slice_boundary_fields_wrapper[ 504 self._START_BOUNDARY 505 ]: self._connector_state_converter.output_format(start_value), 506 self._slice_boundary_fields_wrapper[ 507 self._END_BOUNDARY 508 ]: self._connector_state_converter.output_format(end_value), 509 }, 510 ) 511 else: 512 stop_processing = False 513 current_lower_boundary = lower 514 while not stop_processing: 515 current_upper_boundary = min( 516 self._evaluate_upper_safely(current_lower_boundary, self._slice_range), upper 517 ) 518 has_reached_upper_boundary = current_upper_boundary >= upper 519 520 clamped_upper = ( 521 self._clamping_strategy.clamp(current_upper_boundary) 522 if current_upper_boundary != upper 523 else current_upper_boundary 524 ) 525 clamped_lower = self._clamping_strategy.clamp(current_lower_boundary) 526 if clamped_lower >= clamped_upper: 527 # clamping collapsed both values which means that it is time to stop processing 528 # FIXME should this be replace by proper end_provider 529 break 530 start_value, end_value = ( 531 (clamped_lower, clamped_upper - self._cursor_granularity) 532 if self._cursor_granularity 533 and (not upper_is_end or not has_reached_upper_boundary) 534 else (clamped_lower, clamped_upper) 535 ) 536 yield StreamSlice( 537 partition={}, 538 cursor_slice={ 539 self._slice_boundary_fields_wrapper[ 540 self._START_BOUNDARY 541 ]: self._connector_state_converter.output_format(start_value), 542 self._slice_boundary_fields_wrapper[ 543 self._END_BOUNDARY 544 ]: self._connector_state_converter.output_format(end_value), 545 }, 546 ) 547 current_lower_boundary = clamped_upper 548 if current_upper_boundary >= upper: 549 stop_processing = True 550 551 def _evaluate_upper_safely(self, lower: CursorValueType, step: GapType) -> CursorValueType: 552 """ 553 Given that we set the default step at datetime.timedelta.max, we will generate an OverflowError when evaluating the next start_date 554 This method assumes that users would never enter a step that would generate an overflow. Given that would be the case, the code 555 would have broken anyway. 556 """ 557 try: 558 return lower + step 559 except OverflowError: 560 return self._end_provider() 561 562 def should_be_synced(self, record: Record) -> bool: 563 """ 564 Determines if a record should be synced based on its cursor value. 565 :param record: The record to evaluate 566 567 :return: True if the record's cursor value falls within the sync boundaries 568 """ 569 try: 570 record_cursor_value: CursorValueType = self._extract_cursor_value(record) 571 except ValueError: 572 self._log_for_record_without_cursor_value() 573 return True 574 return self.start <= record_cursor_value <= self._end_provider() 575 576 def _log_for_record_without_cursor_value(self) -> None: 577 if not self._should_be_synced_logger_triggered: 578 LOGGER.warning( 579 f"Could not find cursor field `{self.cursor_field.cursor_field_key}` in record for stream {self._stream_name}. The incremental sync will assume it needs to be synced" 580 ) 581 self._should_be_synced_logger_triggered = True 582 583 def reduce_slice_range(self, stream_slice: StreamSlice) -> StreamSlice: 584 # In theory, we might be more flexible here meaning that it doesn't need to be in ascending order but it just 585 # needs to be ordered. For now though, we will only support ascending order. 586 if not self._is_ascending_order: 587 LOGGER.warning( 588 "Attempting to reduce slice while records are not returned in incremental order might lead to missing records" 589 ) 590 591 if stream_slice in self._most_recent_cursor_value_per_partition: 592 return StreamSlice( 593 partition=stream_slice.partition, 594 cursor_slice={ 595 self._slice_boundary_fields_wrapper[ 596 self._START_BOUNDARY 597 ]: self._connector_state_converter.output_format( 598 self._most_recent_cursor_value_per_partition[stream_slice] 599 ), 600 self._slice_boundary_fields_wrapper[ 601 self._END_BOUNDARY 602 ]: stream_slice.cursor_slice[ 603 self._slice_boundary_fields_wrapper[self._END_BOUNDARY] 604 ], 605 }, 606 extra_fields=stream_slice.extra_fields, 607 ) 608 else: 609 return stream_slice 610 611 def get_cursor_datetime_from_state( 612 self, stream_state: Mapping[str, Any] 613 ) -> datetime.datetime | None: 614 """Extract and parse the cursor datetime from the given stream state. 615 616 For concurrent cursors, the state can be in two formats: 617 1. Sequential/legacy format: {cursor_field: cursor_value} 618 2. Concurrent format: {state_type: "date-range", slices: [...]} 619 620 Returns the cursor datetime if present and parseable, otherwise returns None. 621 """ 622 # Check if state is in concurrent format (need to convert to dict for type compatibility) 623 mutable_state: MutableMapping[str, Any] = dict(stream_state) 624 if self._connector_state_converter.is_state_message_compatible(mutable_state): 625 slices = stream_state.get("slices", []) 626 if not slices: 627 return None 628 # Get the most recent cursor value from the first slice (after merging) 629 first_slice = slices[0] 630 cursor_value = first_slice.get( 631 self._connector_state_converter.MOST_RECENT_RECORD_KEY 632 ) or first_slice.get(self._connector_state_converter.END_KEY) 633 if not cursor_value: 634 return None 635 try: 636 parsed_value = self._connector_state_converter.parse_value(cursor_value) 637 if isinstance(parsed_value, datetime.datetime): 638 return parsed_value 639 return None 640 except (ValueError, TypeError): 641 return None 642 643 # Sequential/legacy format: {cursor_field: cursor_value} 644 cursor_value = stream_state.get(self._cursor_field.cursor_field_key) 645 if not cursor_value: 646 return None 647 try: 648 parsed_value = self._connector_state_converter.parse_value(cursor_value) 649 if isinstance(parsed_value, datetime.datetime): 650 return parsed_value 651 return None 652 except (ValueError, TypeError): 653 return None
Slices the stream into chunks that can be fetched independently. Slices enable state checkpointing and data retrieval parallelization.
199 def __init__( 200 self, 201 stream_name: str, 202 stream_namespace: Optional[str], 203 stream_state: Any, 204 message_repository: MessageRepository, 205 connector_state_manager: ConnectorStateManager, 206 connector_state_converter: AbstractStreamStateConverter, 207 cursor_field: CursorField, 208 slice_boundary_fields: Optional[Tuple[str, str]], 209 start: Optional[CursorValueType], 210 end_provider: Callable[[], CursorValueType], 211 lookback_window: Optional[GapType] = None, 212 slice_range: Optional[GapType] = None, 213 cursor_granularity: Optional[GapType] = None, 214 clamping_strategy: ClampingStrategy = NoClamping(), 215 ) -> None: 216 self._stream_name = stream_name 217 self._stream_namespace = stream_namespace 218 self._message_repository = message_repository 219 self._connector_state_converter = connector_state_converter 220 self._connector_state_manager = connector_state_manager 221 self._cursor_field = cursor_field 222 # To see some example where the slice boundaries might not be defined, check https://github.com/airbytehq/airbyte/blob/1ce84d6396e446e1ac2377362446e3fb94509461/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py#L363-L379 223 self._slice_boundary_fields = slice_boundary_fields 224 self._start = start 225 self._end_provider = end_provider 226 self.start, self._concurrent_state = self._get_concurrent_state(stream_state) 227 self._lookback_window = lookback_window 228 self._slice_range = slice_range 229 self._most_recent_cursor_value_per_partition: MutableMapping[ 230 Union[StreamSlice, Mapping[str, Any], None], Any 231 ] = {} 232 self._has_closed_at_least_one_slice = False 233 self._cursor_granularity = cursor_granularity 234 # Flag to track if the logger has been triggered (per stream) 235 self._should_be_synced_logger_triggered = False 236 self._clamping_strategy = clamping_strategy 237 self._is_ascending_order = True 238 239 # A lock is required when closing a partition because updating the cursor's concurrent_state is 240 # not thread safe. When multiple partitions are being closed by the cursor at the same time, it is 241 # possible for one partition to update concurrent_state after a second partition has already read 242 # the previous state. This can lead to the second partition overwriting the previous one's state. 243 self._lock = threading.Lock()
181 def copy_without_state(self) -> "ConcurrentCursor": 182 return self.__class__( 183 stream_name=self._stream_name, 184 stream_namespace=self._stream_namespace, 185 stream_state={}, 186 message_repository=NoopMessageRepository(), 187 connector_state_manager=ConnectorStateManager(), 188 connector_state_converter=self._connector_state_converter, 189 cursor_field=self._cursor_field, 190 slice_boundary_fields=self._slice_boundary_fields, 191 start=self._start, 192 end_provider=self._end_provider, 193 lookback_window=self._lookback_window, 194 slice_range=self._slice_range, 195 cursor_granularity=self._cursor_granularity, 196 clamping_strategy=self._clamping_strategy, 197 )
292 def observe(self, record: Record) -> None: 293 # Because observe writes to the most_recent_cursor_value_per_partition mapping, 294 # it is not thread-safe. However, this shouldn't lead to concurrency issues because 295 # observe() is only invoked by PartitionReader.process_partition(). Since the map is 296 # broken down according to partition, concurrent threads processing only read/write 297 # from different keys which avoids any conflicts. 298 # 299 # If we were to add thread safety, we should implement a lock per-partition 300 # which is instantiated during stream_slices() 301 most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get( 302 record.associated_slice 303 ) 304 try: 305 cursor_value = self._extract_cursor_value(record) 306 307 if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value: 308 self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value 309 elif most_recent_cursor_value > cursor_value: 310 self._is_ascending_order = False 311 except ValueError: 312 self._log_for_record_without_cursor_value()
Indicate to the cursor that the record has been emitted
317 def close_partition(self, partition: Partition) -> None: 318 with self._lock: 319 slice_count_before = len(self._concurrent_state.get("slices", [])) 320 self._add_slice_to_state(partition) 321 if slice_count_before < len( 322 self._concurrent_state["slices"] 323 ): # only emit if at least one slice has been processed 324 self._merge_partitions() 325 self._emit_state_message() 326 self._has_closed_at_least_one_slice = True
Indicate to the cursor that the partition has been successfully processed
401 def ensure_at_least_one_state_emitted(self) -> None: 402 """ 403 The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be 404 called. 405 """ 406 self._emit_state_message()
The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be called.
408 def stream_slices(self) -> Iterable[StreamSlice]: 409 """ 410 Generating slices based on a few parameters: 411 * lookback_window: Buffer to remove from END_KEY of the highest slice 412 * slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created 413 * start: `_split_per_slice_range` will clip any value to `self._start which means that: 414 * if upper is less than self._start, no slices will be generated 415 * if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case) 416 417 Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be 418 inclusive in the API that is queried. 419 """ 420 self._merge_partitions() 421 422 if self._start is not None and self._is_start_before_first_slice(): 423 yield from self._split_per_slice_range( 424 self._start, 425 self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY], 426 False, 427 ) 428 429 if len(self._concurrent_state["slices"]) == 1: 430 yield from self._split_per_slice_range( 431 self._calculate_lower_boundary_of_last_slice( 432 self._concurrent_state["slices"][0][self._connector_state_converter.END_KEY] 433 ), 434 self._end_provider(), 435 True, 436 ) 437 elif len(self._concurrent_state["slices"]) > 1: 438 for i in range(len(self._concurrent_state["slices"]) - 1): 439 if self._cursor_granularity: 440 yield from self._split_per_slice_range( 441 self._concurrent_state["slices"][i][self._connector_state_converter.END_KEY] 442 + self._cursor_granularity, 443 self._concurrent_state["slices"][i + 1][ 444 self._connector_state_converter.START_KEY 445 ], 446 False, 447 ) 448 else: 449 yield from self._split_per_slice_range( 450 self._concurrent_state["slices"][i][ 451 self._connector_state_converter.END_KEY 452 ], 453 self._concurrent_state["slices"][i + 1][ 454 self._connector_state_converter.START_KEY 455 ], 456 False, 457 ) 458 yield from self._split_per_slice_range( 459 self._calculate_lower_boundary_of_last_slice( 460 self._concurrent_state["slices"][-1][self._connector_state_converter.END_KEY] 461 ), 462 self._end_provider(), 463 True, 464 ) 465 else: 466 raise ValueError("Expected at least one slice")
Generating slices based on a few parameters:
- lookback_window: Buffer to remove from END_KEY of the highest slice
- slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created
- start:
_split_per_slice_rangewill clip any value to `self._start which means that:- if upper is less than self._start, no slices will be generated
- if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case)
Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be inclusive in the API that is queried.
562 def should_be_synced(self, record: Record) -> bool: 563 """ 564 Determines if a record should be synced based on its cursor value. 565 :param record: The record to evaluate 566 567 :return: True if the record's cursor value falls within the sync boundaries 568 """ 569 try: 570 record_cursor_value: CursorValueType = self._extract_cursor_value(record) 571 except ValueError: 572 self._log_for_record_without_cursor_value() 573 return True 574 return self.start <= record_cursor_value <= self._end_provider()
Determines if a record should be synced based on its cursor value.
Parameters
- record: The record to evaluate
Returns
True if the record's cursor value falls within the sync boundaries
583 def reduce_slice_range(self, stream_slice: StreamSlice) -> StreamSlice: 584 # In theory, we might be more flexible here meaning that it doesn't need to be in ascending order but it just 585 # needs to be ordered. For now though, we will only support ascending order. 586 if not self._is_ascending_order: 587 LOGGER.warning( 588 "Attempting to reduce slice while records are not returned in incremental order might lead to missing records" 589 ) 590 591 if stream_slice in self._most_recent_cursor_value_per_partition: 592 return StreamSlice( 593 partition=stream_slice.partition, 594 cursor_slice={ 595 self._slice_boundary_fields_wrapper[ 596 self._START_BOUNDARY 597 ]: self._connector_state_converter.output_format( 598 self._most_recent_cursor_value_per_partition[stream_slice] 599 ), 600 self._slice_boundary_fields_wrapper[ 601 self._END_BOUNDARY 602 ]: stream_slice.cursor_slice[ 603 self._slice_boundary_fields_wrapper[self._END_BOUNDARY] 604 ], 605 }, 606 extra_fields=stream_slice.extra_fields, 607 ) 608 else: 609 return stream_slice
611 def get_cursor_datetime_from_state( 612 self, stream_state: Mapping[str, Any] 613 ) -> datetime.datetime | None: 614 """Extract and parse the cursor datetime from the given stream state. 615 616 For concurrent cursors, the state can be in two formats: 617 1. Sequential/legacy format: {cursor_field: cursor_value} 618 2. Concurrent format: {state_type: "date-range", slices: [...]} 619 620 Returns the cursor datetime if present and parseable, otherwise returns None. 621 """ 622 # Check if state is in concurrent format (need to convert to dict for type compatibility) 623 mutable_state: MutableMapping[str, Any] = dict(stream_state) 624 if self._connector_state_converter.is_state_message_compatible(mutable_state): 625 slices = stream_state.get("slices", []) 626 if not slices: 627 return None 628 # Get the most recent cursor value from the first slice (after merging) 629 first_slice = slices[0] 630 cursor_value = first_slice.get( 631 self._connector_state_converter.MOST_RECENT_RECORD_KEY 632 ) or first_slice.get(self._connector_state_converter.END_KEY) 633 if not cursor_value: 634 return None 635 try: 636 parsed_value = self._connector_state_converter.parse_value(cursor_value) 637 if isinstance(parsed_value, datetime.datetime): 638 return parsed_value 639 return None 640 except (ValueError, TypeError): 641 return None 642 643 # Sequential/legacy format: {cursor_field: cursor_value} 644 cursor_value = stream_state.get(self._cursor_field.cursor_field_key) 645 if not cursor_value: 646 return None 647 try: 648 parsed_value = self._connector_state_converter.parse_value(cursor_value) 649 if isinstance(parsed_value, datetime.datetime): 650 return parsed_value 651 return None 652 except (ValueError, TypeError): 653 return None
Extract and parse the cursor datetime from the given stream state.
For concurrent cursors, the state can be in two formats:
- Sequential/legacy format: {cursor_field: cursor_value}
- Concurrent format: {state_type: "date-range", slices: [...]}
Returns the cursor datetime if present and parseable, otherwise returns None.