airbyte_cdk.sources.declarative.incremental
1# 2# Copyright (c) 2022 Airbyte, Inc., all rights reserved. 3# 4 5from airbyte_cdk.sources.declarative.incremental.concurrent_partition_cursor import ( 6 ConcurrentCursorFactory, 7 ConcurrentPerPartitionCursor, 8) 9from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor 10from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor 11from airbyte_cdk.sources.declarative.incremental.global_substream_cursor import ( 12 GlobalSubstreamCursor, 13) 14from airbyte_cdk.sources.declarative.incremental.per_partition_cursor import ( 15 CursorFactory, 16 PerPartitionCursor, 17) 18from airbyte_cdk.sources.declarative.incremental.per_partition_with_global import ( 19 PerPartitionWithGlobalCursor, 20) 21from airbyte_cdk.sources.declarative.incremental.resumable_full_refresh_cursor import ( 22 ChildPartitionResumableFullRefreshCursor, 23 ResumableFullRefreshCursor, 24) 25 26__all__ = [ 27 "CursorFactory", 28 "ConcurrentCursorFactory", 29 "ConcurrentPerPartitionCursor", 30 "DatetimeBasedCursor", 31 "DeclarativeCursor", 32 "GlobalSubstreamCursor", 33 "PerPartitionCursor", 34 "PerPartitionWithGlobalCursor", 35 "ResumableFullRefreshCursor", 36 "ChildPartitionResumableFullRefreshCursor", 37]
20class CursorFactory: 21 def __init__(self, create_function: Callable[[], DeclarativeCursor]): 22 self._create_function = create_function 23 24 def create(self) -> DeclarativeCursor: 25 return self._create_function()
35class ConcurrentCursorFactory: 36 def __init__(self, create_function: Callable[..., ConcurrentCursor]): 37 self._create_function = create_function 38 39 def create( 40 self, stream_state: Mapping[str, Any], runtime_lookback_window: Optional[timedelta] 41 ) -> ConcurrentCursor: 42 return self._create_function( 43 stream_state=stream_state, runtime_lookback_window=runtime_lookback_window 44 )
47class ConcurrentPerPartitionCursor(Cursor): 48 """ 49 Manages state per partition when a stream has many partitions, preventing data loss or duplication. 50 51 Attributes: 52 DEFAULT_MAX_PARTITIONS_NUMBER (int): Maximum number of partitions to retain in memory (default is 10,000). 53 54 - **Partition Limitation Logic** 55 Ensures the number of tracked partitions does not exceed the specified limit to prevent memory overuse. Oldest partitions are removed when the limit is reached. 56 57 - **Global Cursor Fallback** 58 New partitions use global state as the initial state to progress the state for deleted or new partitions. The history data added after the initial sync will be missing. 59 60 CurrentPerPartitionCursor expects the state of the ConcurrentCursor to follow the format {cursor_field: cursor_value}. 61 """ 62 63 DEFAULT_MAX_PARTITIONS_NUMBER = 25_000 64 SWITCH_TO_GLOBAL_LIMIT = 10_000 65 _NO_STATE: Mapping[str, Any] = {} 66 _NO_CURSOR_STATE: Mapping[str, Any] = {} 67 _GLOBAL_STATE_KEY = "state" 68 _PERPARTITION_STATE_KEY = "states" 69 _IS_PARTITION_DUPLICATION_LOGGED = False 70 _PARENT_STATE = 0 71 _GENERATION_SEQUENCE = 1 72 73 def __init__( 74 self, 75 cursor_factory: ConcurrentCursorFactory, 76 partition_router: PartitionRouter, 77 stream_name: str, 78 stream_namespace: Optional[str], 79 stream_state: Any, 80 message_repository: MessageRepository, 81 connector_state_manager: ConnectorStateManager, 82 connector_state_converter: AbstractStreamStateConverter, 83 cursor_field: CursorField, 84 use_global_cursor: bool = False, 85 attempt_to_create_cursor_if_not_provided: bool = False, 86 ) -> None: 87 self._global_cursor: Optional[StreamState] = {} 88 self._stream_name = stream_name 89 self._stream_namespace = stream_namespace 90 self._message_repository = message_repository 91 self._connector_state_manager = connector_state_manager 92 self._connector_state_converter = connector_state_converter 93 self._cursor_field = cursor_field 94 95 self._cursor_factory = cursor_factory 96 self._partition_router = partition_router 97 98 # The dict is ordered to ensure that once the maximum number of partitions is reached, 99 # the oldest partitions can be efficiently removed, maintaining the most recent partitions. 100 self._cursor_per_partition: OrderedDict[str, ConcurrentCursor] = OrderedDict() 101 self._semaphore_per_partition: OrderedDict[str, threading.Semaphore] = OrderedDict() 102 103 # Parent-state tracking: store each partition’s parent state in creation order 104 self._partition_parent_state_map: OrderedDict[str, tuple[Mapping[str, Any], int]] = ( 105 OrderedDict() 106 ) 107 self._parent_state: Optional[StreamState] = None 108 109 # Tracks when the last slice for partition is emitted 110 self._partitions_done_generating_stream_slices: set[str] = set() 111 # Used to track the index of partitions that are not closed yet 112 self._processing_partitions_indexes: List[int] = list() 113 self._generated_partitions_count: int = 0 114 # Dictionary to map partition keys to their index 115 self._partition_key_to_index: dict[str, int] = {} 116 117 self._lock = threading.Lock() 118 self._lookback_window: int = 0 119 self._new_global_cursor: Optional[StreamState] = None 120 self._number_of_partitions: int = 0 121 self._use_global_cursor: bool = use_global_cursor 122 self._partition_serializer = PerPartitionKeySerializer() 123 124 # Track the last time a state message was emitted 125 self._last_emission_time: float = 0.0 126 self._timer = Timer() 127 128 self._set_initial_state(stream_state) 129 130 # FIXME this is a temporary field the time of the migration from declarative cursors to concurrent ones 131 self._attempt_to_create_cursor_if_not_provided = attempt_to_create_cursor_if_not_provided 132 133 @property 134 def cursor_field(self) -> CursorField: 135 return self._cursor_field 136 137 @property 138 def state(self) -> MutableMapping[str, Any]: 139 state: dict[str, Any] = {"use_global_cursor": self._use_global_cursor} 140 if not self._use_global_cursor: 141 states = [] 142 for partition_tuple, cursor in self._cursor_per_partition.items(): 143 if cursor.state: 144 states.append( 145 { 146 "partition": self._to_dict(partition_tuple), 147 "cursor": copy.deepcopy(cursor.state), 148 } 149 ) 150 state[self._PERPARTITION_STATE_KEY] = states 151 152 if self._global_cursor: 153 state[self._GLOBAL_STATE_KEY] = self._global_cursor 154 if self._lookback_window is not None: 155 state["lookback_window"] = self._lookback_window 156 if self._parent_state is not None: 157 state["parent_state"] = self._parent_state 158 return state 159 160 def close_partition(self, partition: Partition) -> None: 161 # Attempt to retrieve the stream slice 162 stream_slice: Optional[StreamSlice] = partition.to_slice() # type: ignore[assignment] 163 164 # Ensure stream_slice is not None 165 if stream_slice is None: 166 raise ValueError("stream_slice cannot be None") 167 168 partition_key = self._to_partition_key(stream_slice.partition) 169 with self._lock: 170 self._semaphore_per_partition[partition_key].acquire() 171 if not self._use_global_cursor: 172 self._cursor_per_partition[partition_key].close_partition(partition=partition) 173 cursor = self._cursor_per_partition[partition_key] 174 if ( 175 partition_key in self._partitions_done_generating_stream_slices 176 and self._semaphore_per_partition[partition_key]._value == 0 177 ): 178 self._update_global_cursor(cursor.state[self.cursor_field.cursor_field_key]) 179 180 # Clean up the partition if it is fully processed 181 self._cleanup_if_done(partition_key) 182 183 self._check_and_update_parent_state() 184 185 self._emit_state_message() 186 187 def _check_and_update_parent_state(self) -> None: 188 last_closed_state = None 189 190 while self._partition_parent_state_map: 191 earliest_key, (candidate_state, candidate_seq) = next( 192 iter(self._partition_parent_state_map.items()) 193 ) 194 195 # if any partition that started <= candidate_seq is still open, we must wait 196 if ( 197 self._processing_partitions_indexes 198 and self._processing_partitions_indexes[0] <= candidate_seq 199 ): 200 break 201 202 # safe to pop 203 self._partition_parent_state_map.popitem(last=False) 204 last_closed_state = candidate_state 205 206 if last_closed_state is not None: 207 self._parent_state = last_closed_state 208 209 def ensure_at_least_one_state_emitted(self) -> None: 210 """ 211 The platform expects at least one state message on successful syncs. Hence, whatever happens, we expect this method to be 212 called. 213 """ 214 if not any( 215 semaphore_item[1]._value for semaphore_item in self._semaphore_per_partition.items() 216 ): 217 self._global_cursor = self._new_global_cursor 218 self._lookback_window = self._timer.finish() 219 self._parent_state = self._partition_router.get_stream_state() 220 self._emit_state_message(throttle=False) 221 222 def _throttle_state_message(self) -> Optional[float]: 223 """ 224 Throttles the state message emission to once every 600 seconds. 225 """ 226 current_time = time.time() 227 if current_time - self._last_emission_time <= 600: 228 return None 229 return current_time 230 231 def _emit_state_message(self, throttle: bool = True) -> None: 232 if throttle: 233 current_time = self._throttle_state_message() 234 if current_time is None: 235 return 236 self._last_emission_time = current_time 237 # Skip state emit for global cursor if parent state is empty 238 if self._use_global_cursor and not self._parent_state: 239 return 240 241 self._connector_state_manager.update_state_for_stream( 242 self._stream_name, 243 self._stream_namespace, 244 self.state, 245 ) 246 state_message = self._connector_state_manager.create_state_message( 247 self._stream_name, self._stream_namespace 248 ) 249 self._message_repository.emit_message(state_message) 250 251 def stream_slices(self) -> Iterable[StreamSlice]: 252 if self._timer.is_running(): 253 raise RuntimeError("stream_slices has been executed more than once.") 254 255 slices = self._partition_router.stream_slices() 256 self._timer.start() 257 for partition, last, parent_state in iterate_with_last_flag_and_state( 258 slices, self._partition_router.get_stream_state 259 ): 260 yield from self._generate_slices_from_partition(partition, parent_state) 261 262 def _generate_slices_from_partition( 263 self, partition: StreamSlice, parent_state: Mapping[str, Any] 264 ) -> Iterable[StreamSlice]: 265 # Ensure the maximum number of partitions is not exceeded 266 self._ensure_partition_limit() 267 268 partition_key = self._to_partition_key(partition.partition) 269 270 cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition)) 271 if not cursor: 272 cursor = self._create_cursor( 273 self._global_cursor, 274 self._lookback_window if self._global_cursor else 0, 275 ) 276 with self._lock: 277 self._number_of_partitions += 1 278 self._cursor_per_partition[partition_key] = cursor 279 280 if partition_key in self._semaphore_per_partition: 281 if not self._IS_PARTITION_DUPLICATION_LOGGED: 282 logger.warning(f"Partition duplication detected for stream {self._stream_name}") 283 self._IS_PARTITION_DUPLICATION_LOGGED = True 284 return 285 else: 286 self._semaphore_per_partition[partition_key] = threading.Semaphore(0) 287 288 with self._lock: 289 seq = self._generated_partitions_count 290 self._generated_partitions_count += 1 291 self._processing_partitions_indexes.append(seq) 292 self._partition_key_to_index[partition_key] = seq 293 294 if ( 295 len(self._partition_parent_state_map) == 0 296 or self._partition_parent_state_map[ 297 next(reversed(self._partition_parent_state_map)) 298 ][self._PARENT_STATE] 299 != parent_state 300 ): 301 self._partition_parent_state_map[partition_key] = (deepcopy(parent_state), seq) 302 303 for cursor_slice, is_last_slice, _ in iterate_with_last_flag_and_state( 304 cursor.stream_slices(), 305 lambda: None, 306 ): 307 self._semaphore_per_partition[partition_key].release() 308 if is_last_slice: 309 self._partitions_done_generating_stream_slices.add(partition_key) 310 yield StreamSlice( 311 partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields 312 ) 313 314 def _ensure_partition_limit(self) -> None: 315 """ 316 Ensure the maximum number of partitions does not exceed the predefined limit. 317 318 Steps: 319 1. Attempt to remove partitions that are marked as finished in `_finished_partitions`. 320 These partitions are considered processed and safe to delete. 321 2. If the limit is still exceeded and no finished partitions are available for removal, 322 remove the oldest partition unconditionally. We expect failed partitions to be removed. 323 324 Logging: 325 - Logs a warning each time a partition is removed, indicating whether it was finished 326 or removed due to being the oldest. 327 """ 328 if not self._use_global_cursor and self.limit_reached(): 329 logger.info( 330 f"Exceeded the 'SWITCH_TO_GLOBAL_LIMIT' of {self.SWITCH_TO_GLOBAL_LIMIT}. " 331 f"Switching to global cursor for {self._stream_name}." 332 ) 333 self._use_global_cursor = True 334 335 with self._lock: 336 while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1: 337 # Try removing finished partitions first 338 for partition_key in list(self._cursor_per_partition.keys()): 339 if partition_key not in self._partition_key_to_index: 340 oldest_partition = self._cursor_per_partition.pop( 341 partition_key 342 ) # Remove the oldest partition 343 logger.debug( 344 f"The maximum number of partitions has been reached. Dropping the oldest finished partition: {oldest_partition}. Over limit: {self._number_of_partitions - self.DEFAULT_MAX_PARTITIONS_NUMBER}." 345 ) 346 break 347 else: 348 # If no finished partitions can be removed, fall back to removing the oldest partition 349 oldest_partition = self._cursor_per_partition.popitem(last=False)[ 350 1 351 ] # Remove the oldest partition 352 logger.warning( 353 f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._number_of_partitions - self.DEFAULT_MAX_PARTITIONS_NUMBER}." 354 ) 355 356 def _set_initial_state(self, stream_state: StreamState) -> None: 357 """ 358 Initialize the cursor's state using the provided `stream_state`. 359 360 This method supports global and per-partition state initialization. 361 362 - **Global State**: If `states` is missing, the `state` is treated as global and applied to all partitions. 363 The `global state` holds a single cursor position representing the latest processed record across all partitions. 364 365 - **Lookback Window**: Configured via `lookback_window`, it defines the period (in seconds) for reprocessing records. 366 This ensures robustness in case of upstream data delays or reordering. If not specified, it defaults to 0. 367 368 - **Per-Partition State**: If `states` is present, each partition's cursor state is initialized separately. 369 370 - **Parent State**: (if available) Used to initialize partition routers based on parent streams. 371 372 Args: 373 stream_state (StreamState): The state of the streams to be set. The format of the stream state should be: 374 { 375 "states": [ 376 { 377 "partition": { 378 "partition_key": "value" 379 }, 380 "cursor": { 381 "last_updated": "2023-05-27T00:00:00Z" 382 } 383 } 384 ], 385 "state": { 386 "last_updated": "2023-05-27T00:00:00Z" 387 }, 388 lookback_window: 10, 389 "parent_state": { 390 "parent_stream_name": { 391 "last_updated": "2023-05-27T00:00:00Z" 392 } 393 } 394 } 395 """ 396 if not stream_state: 397 return 398 399 if ( 400 self._PERPARTITION_STATE_KEY not in stream_state 401 and self._GLOBAL_STATE_KEY not in stream_state 402 ): 403 # We assume that `stream_state` is in a global format that can be applied to all partitions. 404 # Example: {"global_state_format_key": "global_state_format_value"} 405 self._set_global_state(stream_state) 406 407 else: 408 self._use_global_cursor = stream_state.get("use_global_cursor", False) 409 410 self._lookback_window = int(stream_state.get("lookback_window", 0)) 411 412 for state in stream_state.get(self._PERPARTITION_STATE_KEY, []): 413 self._number_of_partitions += 1 414 self._cursor_per_partition[self._to_partition_key(state["partition"])] = ( 415 self._create_cursor(state["cursor"]) 416 ) 417 418 # set default state for missing partitions if it is per partition with fallback to global 419 if self._GLOBAL_STATE_KEY in stream_state: 420 self._set_global_state(stream_state[self._GLOBAL_STATE_KEY]) 421 422 # Set initial parent state 423 if stream_state.get("parent_state"): 424 self._parent_state = stream_state["parent_state"] 425 426 # Set parent state for partition routers based on parent streams 427 self._partition_router.set_initial_state(stream_state) 428 429 def _set_global_state(self, stream_state: Mapping[str, Any]) -> None: 430 """ 431 Initializes the global cursor state from the provided stream state. 432 433 If the cursor field key is present in the stream state, its value is parsed, 434 formatted, and stored as the global cursor. This ensures consistency in state 435 representation across partitions. 436 """ 437 if self.cursor_field.cursor_field_key in stream_state: 438 global_state_value = stream_state[self.cursor_field.cursor_field_key] 439 final_format_global_state_value = self._connector_state_converter.output_format( 440 self._connector_state_converter.parse_value(global_state_value) 441 ) 442 443 fixed_global_state = { 444 self.cursor_field.cursor_field_key: final_format_global_state_value 445 } 446 447 self._global_cursor = deepcopy(fixed_global_state) 448 self._new_global_cursor = deepcopy(fixed_global_state) 449 450 def observe(self, record: Record) -> None: 451 if not record.associated_slice: 452 raise ValueError( 453 "Invalid state as stream slices that are emitted should refer to an existing cursor" 454 ) 455 456 # if the current record has no cursor value, we cannot meaningfully update the state based on it, so there is nothing more to do 457 try: 458 record_cursor_value = self._cursor_field.extract_value(record) 459 except ValueError: 460 return 461 462 record_cursor = self._connector_state_converter.output_format( 463 self._connector_state_converter.parse_value(record_cursor_value) 464 ) 465 self._update_global_cursor(record_cursor) 466 if not self._use_global_cursor: 467 self._cursor_per_partition[ 468 self._to_partition_key(record.associated_slice.partition) 469 ].observe(record) 470 471 def _update_global_cursor(self, value: Any) -> None: 472 if ( 473 self._new_global_cursor is None 474 or self._new_global_cursor[self.cursor_field.cursor_field_key] < value 475 ): 476 self._new_global_cursor = {self.cursor_field.cursor_field_key: copy.deepcopy(value)} 477 478 def _cleanup_if_done(self, partition_key: str) -> None: 479 """ 480 Free every in-memory structure that belonged to a completed partition: 481 cursor, semaphore, flag inside `_finished_partitions` 482 """ 483 if not ( 484 partition_key in self._partitions_done_generating_stream_slices 485 and self._semaphore_per_partition[partition_key]._value == 0 486 ): 487 return 488 489 self._semaphore_per_partition.pop(partition_key, None) 490 self._partitions_done_generating_stream_slices.discard(partition_key) 491 492 seq = self._partition_key_to_index.pop(partition_key) 493 self._processing_partitions_indexes.remove(seq) 494 495 logger.debug(f"Partition {partition_key} fully processed and cleaned up.") 496 497 def _to_partition_key(self, partition: Mapping[str, Any]) -> str: 498 return self._partition_serializer.to_partition_key(partition) 499 500 def _to_dict(self, partition_key: str) -> Mapping[str, Any]: 501 return self._partition_serializer.to_partition(partition_key) 502 503 def _create_cursor( 504 self, cursor_state: Any, runtime_lookback_window: int = 0 505 ) -> ConcurrentCursor: 506 cursor = self._cursor_factory.create( 507 stream_state=deepcopy(cursor_state), 508 runtime_lookback_window=timedelta(seconds=runtime_lookback_window), 509 ) 510 return cursor 511 512 def should_be_synced(self, record: Record) -> bool: 513 return self._get_cursor(record).should_be_synced(record) 514 515 def _get_cursor(self, record: Record) -> ConcurrentCursor: 516 if not record.associated_slice: 517 raise ValueError( 518 "Invalid state as stream slices that are emitted should refer to an existing cursor" 519 ) 520 521 if self._use_global_cursor: 522 return self._create_cursor( 523 self._global_cursor, 524 self._lookback_window if self._global_cursor else 0, 525 ) 526 527 partition_key = self._to_partition_key(record.associated_slice.partition) 528 if ( 529 partition_key not in self._cursor_per_partition 530 and not self._attempt_to_create_cursor_if_not_provided 531 ): 532 raise ValueError( 533 "Invalid state as stream slices that are emitted should refer to an existing cursor" 534 ) 535 elif partition_key not in self._cursor_per_partition: 536 return self._create_cursor( 537 self._global_cursor, 538 self._lookback_window if self._global_cursor else 0, 539 ) 540 else: 541 return self._cursor_per_partition[partition_key] 542 543 def limit_reached(self) -> bool: 544 return self._number_of_partitions > self.SWITCH_TO_GLOBAL_LIMIT
Manages state per partition when a stream has many partitions, preventing data loss or duplication.
Attributes:
DEFAULT_MAX_PARTITIONS_NUMBER (int): Maximum number of partitions to retain in memory (default is 10,000).
Partition Limitation Logic Ensures the number of tracked partitions does not exceed the specified limit to prevent memory overuse. Oldest partitions are removed when the limit is reached.
Global Cursor Fallback New partitions use global state as the initial state to progress the state for deleted or new partitions. The history data added after the initial sync will be missing.
CurrentPerPartitionCursor expects the state of the ConcurrentCursor to follow the format {cursor_field: cursor_value}.
73 def __init__( 74 self, 75 cursor_factory: ConcurrentCursorFactory, 76 partition_router: PartitionRouter, 77 stream_name: str, 78 stream_namespace: Optional[str], 79 stream_state: Any, 80 message_repository: MessageRepository, 81 connector_state_manager: ConnectorStateManager, 82 connector_state_converter: AbstractStreamStateConverter, 83 cursor_field: CursorField, 84 use_global_cursor: bool = False, 85 attempt_to_create_cursor_if_not_provided: bool = False, 86 ) -> None: 87 self._global_cursor: Optional[StreamState] = {} 88 self._stream_name = stream_name 89 self._stream_namespace = stream_namespace 90 self._message_repository = message_repository 91 self._connector_state_manager = connector_state_manager 92 self._connector_state_converter = connector_state_converter 93 self._cursor_field = cursor_field 94 95 self._cursor_factory = cursor_factory 96 self._partition_router = partition_router 97 98 # The dict is ordered to ensure that once the maximum number of partitions is reached, 99 # the oldest partitions can be efficiently removed, maintaining the most recent partitions. 100 self._cursor_per_partition: OrderedDict[str, ConcurrentCursor] = OrderedDict() 101 self._semaphore_per_partition: OrderedDict[str, threading.Semaphore] = OrderedDict() 102 103 # Parent-state tracking: store each partition’s parent state in creation order 104 self._partition_parent_state_map: OrderedDict[str, tuple[Mapping[str, Any], int]] = ( 105 OrderedDict() 106 ) 107 self._parent_state: Optional[StreamState] = None 108 109 # Tracks when the last slice for partition is emitted 110 self._partitions_done_generating_stream_slices: set[str] = set() 111 # Used to track the index of partitions that are not closed yet 112 self._processing_partitions_indexes: List[int] = list() 113 self._generated_partitions_count: int = 0 114 # Dictionary to map partition keys to their index 115 self._partition_key_to_index: dict[str, int] = {} 116 117 self._lock = threading.Lock() 118 self._lookback_window: int = 0 119 self._new_global_cursor: Optional[StreamState] = None 120 self._number_of_partitions: int = 0 121 self._use_global_cursor: bool = use_global_cursor 122 self._partition_serializer = PerPartitionKeySerializer() 123 124 # Track the last time a state message was emitted 125 self._last_emission_time: float = 0.0 126 self._timer = Timer() 127 128 self._set_initial_state(stream_state) 129 130 # FIXME this is a temporary field the time of the migration from declarative cursors to concurrent ones 131 self._attempt_to_create_cursor_if_not_provided = attempt_to_create_cursor_if_not_provided
137 @property 138 def state(self) -> MutableMapping[str, Any]: 139 state: dict[str, Any] = {"use_global_cursor": self._use_global_cursor} 140 if not self._use_global_cursor: 141 states = [] 142 for partition_tuple, cursor in self._cursor_per_partition.items(): 143 if cursor.state: 144 states.append( 145 { 146 "partition": self._to_dict(partition_tuple), 147 "cursor": copy.deepcopy(cursor.state), 148 } 149 ) 150 state[self._PERPARTITION_STATE_KEY] = states 151 152 if self._global_cursor: 153 state[self._GLOBAL_STATE_KEY] = self._global_cursor 154 if self._lookback_window is not None: 155 state["lookback_window"] = self._lookback_window 156 if self._parent_state is not None: 157 state["parent_state"] = self._parent_state 158 return state
160 def close_partition(self, partition: Partition) -> None: 161 # Attempt to retrieve the stream slice 162 stream_slice: Optional[StreamSlice] = partition.to_slice() # type: ignore[assignment] 163 164 # Ensure stream_slice is not None 165 if stream_slice is None: 166 raise ValueError("stream_slice cannot be None") 167 168 partition_key = self._to_partition_key(stream_slice.partition) 169 with self._lock: 170 self._semaphore_per_partition[partition_key].acquire() 171 if not self._use_global_cursor: 172 self._cursor_per_partition[partition_key].close_partition(partition=partition) 173 cursor = self._cursor_per_partition[partition_key] 174 if ( 175 partition_key in self._partitions_done_generating_stream_slices 176 and self._semaphore_per_partition[partition_key]._value == 0 177 ): 178 self._update_global_cursor(cursor.state[self.cursor_field.cursor_field_key]) 179 180 # Clean up the partition if it is fully processed 181 self._cleanup_if_done(partition_key) 182 183 self._check_and_update_parent_state() 184 185 self._emit_state_message()
Indicate to the cursor that the partition has been successfully processed
209 def ensure_at_least_one_state_emitted(self) -> None: 210 """ 211 The platform expects at least one state message on successful syncs. Hence, whatever happens, we expect this method to be 212 called. 213 """ 214 if not any( 215 semaphore_item[1]._value for semaphore_item in self._semaphore_per_partition.items() 216 ): 217 self._global_cursor = self._new_global_cursor 218 self._lookback_window = self._timer.finish() 219 self._parent_state = self._partition_router.get_stream_state() 220 self._emit_state_message(throttle=False)
The platform expects at least one state message on successful syncs. Hence, whatever happens, we expect this method to be called.
251 def stream_slices(self) -> Iterable[StreamSlice]: 252 if self._timer.is_running(): 253 raise RuntimeError("stream_slices has been executed more than once.") 254 255 slices = self._partition_router.stream_slices() 256 self._timer.start() 257 for partition, last, parent_state in iterate_with_last_flag_and_state( 258 slices, self._partition_router.get_stream_state 259 ): 260 yield from self._generate_slices_from_partition(partition, parent_state)
Default placeholder implementation of generate_slices. Subclasses can override this method to provide actual behavior.
450 def observe(self, record: Record) -> None: 451 if not record.associated_slice: 452 raise ValueError( 453 "Invalid state as stream slices that are emitted should refer to an existing cursor" 454 ) 455 456 # if the current record has no cursor value, we cannot meaningfully update the state based on it, so there is nothing more to do 457 try: 458 record_cursor_value = self._cursor_field.extract_value(record) 459 except ValueError: 460 return 461 462 record_cursor = self._connector_state_converter.output_format( 463 self._connector_state_converter.parse_value(record_cursor_value) 464 ) 465 self._update_global_cursor(record_cursor) 466 if not self._use_global_cursor: 467 self._cursor_per_partition[ 468 self._to_partition_key(record.associated_slice.partition) 469 ].observe(record)
Indicate to the cursor that the record has been emitted
28@dataclass 29class DatetimeBasedCursor(DeclarativeCursor): 30 """ 31 Slices the stream over a datetime range and create a state with format {<cursor_field>: <datetime> } 32 33 Given a start time, end time, a step function, and an optional lookback window, 34 the stream slicer will partition the date range from start time - lookback window to end time. 35 36 The step function is defined as a string of the form ISO8601 duration 37 38 The timestamp format accepts the same format codes as datetime.strfptime, which are 39 all the format codes required by the 1989 C standard. 40 Full list of accepted format codes: https://man7.org/linux/man-pages/man3/strftime.3.html 41 42 Attributes: 43 start_datetime (Union[MinMaxDatetime, str]): the datetime that determines the earliest record that should be synced 44 end_datetime (Optional[Union[MinMaxDatetime, str]]): the datetime that determines the last record that should be synced 45 cursor_field (Union[InterpolatedString, str]): record's cursor field 46 datetime_format (str): format of the datetime 47 step (Optional[str]): size of the timewindow (ISO8601 duration) 48 cursor_granularity (Optional[str]): smallest increment the datetime_format has (ISO 8601 duration) that will be used to ensure that the start of a slice does not overlap with the end of the previous one 49 config (Config): connection config 50 start_time_option (Optional[RequestOption]): request option for start time 51 end_time_option (Optional[RequestOption]): request option for end time 52 partition_field_start (Optional[str]): partition start time field 53 partition_field_end (Optional[str]): stream slice end time field 54 lookback_window (Optional[InterpolatedString]): how many days before start_datetime to read data for (ISO8601 duration) 55 """ 56 57 start_datetime: Union[MinMaxDatetime, str] 58 cursor_field: Union[InterpolatedString, str] 59 datetime_format: str 60 config: Config 61 parameters: InitVar[Mapping[str, Any]] 62 _highest_observed_cursor_field_value: Optional[str] = field( 63 repr=False, default=None 64 ) # tracks the latest observed datetime, which may not be safe to emit in the case of out-of-order records 65 _cursor: Optional[str] = field( 66 repr=False, default=None 67 ) # tracks the latest observed datetime that is appropriate to emit as stream state 68 end_datetime: Optional[Union[MinMaxDatetime, str]] = None 69 step: Optional[Union[InterpolatedString, str]] = None 70 cursor_granularity: Optional[str] = None 71 start_time_option: Optional[RequestOption] = None 72 end_time_option: Optional[RequestOption] = None 73 partition_field_start: Optional[str] = None 74 partition_field_end: Optional[str] = None 75 lookback_window: Optional[Union[InterpolatedString, str]] = None 76 message_repository: Optional[MessageRepository] = None 77 is_compare_strictly: Optional[bool] = False 78 cursor_datetime_formats: List[str] = field(default_factory=lambda: []) 79 80 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 81 if (self.step and not self.cursor_granularity) or ( 82 not self.step and self.cursor_granularity 83 ): 84 raise ValueError( 85 f"If step is defined, cursor_granularity should be as well and vice-versa. " 86 f"Right now, step is `{self.step}` and cursor_granularity is `{self.cursor_granularity}`" 87 ) 88 self._start_datetime = MinMaxDatetime.create(self.start_datetime, parameters) 89 self._end_datetime = ( 90 None if not self.end_datetime else MinMaxDatetime.create(self.end_datetime, parameters) 91 ) 92 93 self._timezone = datetime.timezone.utc 94 self._interpolation = JinjaInterpolation() 95 96 self._step = ( 97 self._parse_timedelta( 98 InterpolatedString.create(self.step, parameters=parameters).eval(self.config) 99 ) 100 if self.step 101 else datetime.timedelta.max 102 ) 103 self._cursor_granularity = self._parse_timedelta(self.cursor_granularity) 104 self.cursor_field = InterpolatedString.create(self.cursor_field, parameters=parameters) 105 self._lookback_window = ( 106 InterpolatedString.create(self.lookback_window, parameters=parameters) 107 if self.lookback_window 108 else None 109 ) 110 self._partition_field_start = InterpolatedString.create( 111 self.partition_field_start or "start_time", parameters=parameters 112 ) 113 self._partition_field_end = InterpolatedString.create( 114 self.partition_field_end or "end_time", parameters=parameters 115 ) 116 self._parser = DatetimeParser() 117 118 # If datetime format is not specified then start/end datetime should inherit it from the stream slicer 119 if not self._start_datetime.datetime_format: 120 self._start_datetime.datetime_format = self.datetime_format 121 if self._end_datetime and not self._end_datetime.datetime_format: 122 self._end_datetime.datetime_format = self.datetime_format 123 124 if not self.cursor_datetime_formats: 125 self.cursor_datetime_formats = [self.datetime_format] 126 127 _validate_component_request_option_paths( 128 self.config, self.start_time_option, self.end_time_option 129 ) 130 131 def get_stream_state(self) -> StreamState: 132 return {self.cursor_field.eval(self.config): self._cursor} if self._cursor else {} # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__ 133 134 def set_initial_state(self, stream_state: StreamState) -> None: 135 """ 136 Cursors are not initialized with their state. As state is needed in order to function properly, this method should be called 137 before calling anything else 138 139 :param stream_state: The state of the stream as returned by get_stream_state 140 """ 141 self._cursor = ( 142 stream_state.get(self.cursor_field.eval(self.config)) if stream_state else None # type: ignore [union-attr] 143 ) 144 145 def observe(self, stream_slice: StreamSlice, record: Record) -> None: 146 """ 147 Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read. 148 149 :param stream_slice: The current slice, which may or may not contain the most recently observed record 150 :param record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the 151 stream state may need to be deferred depending on whether the source reliably orders records by the cursor field. 152 """ 153 record_cursor_value = record.get(self.cursor_field.eval(self.config)) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__ 154 # if the current record has no cursor value, we cannot meaningfully update the state based on it, so there is nothing more to do 155 if not record_cursor_value: 156 return 157 158 start_field = self._partition_field_start.eval(self.config) 159 end_field = self._partition_field_end.eval(self.config) 160 is_highest_observed_cursor_value = ( 161 not self._highest_observed_cursor_field_value 162 or self.parse_date(record_cursor_value) 163 > self.parse_date(self._highest_observed_cursor_field_value) 164 ) 165 if ( 166 self._is_within_daterange_boundaries( 167 record, 168 stream_slice.get(start_field), # type: ignore [arg-type] 169 stream_slice.get(end_field), # type: ignore [arg-type] 170 ) 171 and is_highest_observed_cursor_value 172 ): 173 self._highest_observed_cursor_field_value = record_cursor_value 174 175 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 176 if stream_slice.partition: 177 raise ValueError( 178 f"Stream slice {stream_slice} should not have a partition. Got {stream_slice.partition}." 179 ) 180 cursor_value_str_by_cursor_value_datetime = dict( 181 map( 182 # we need to ensure the cursor value is preserved as is in the state else the CATs might complain of something like 183 # 2023-01-04T17:30:19.000Z' <= '2023-01-04T17:30:19.000000Z' 184 lambda datetime_str: (self.parse_date(datetime_str), datetime_str), # type: ignore # because of the filter on the next line, this will only be called with a str 185 filter( 186 lambda item: item, [self._cursor, self._highest_observed_cursor_field_value] 187 ), 188 ) 189 ) 190 self._cursor = ( 191 cursor_value_str_by_cursor_value_datetime[ 192 max(cursor_value_str_by_cursor_value_datetime.keys()) 193 ] 194 if cursor_value_str_by_cursor_value_datetime 195 else None 196 ) 197 198 def stream_slices(self) -> Iterable[StreamSlice]: 199 """ 200 Partition the daterange into slices of size = step. 201 202 The start of the window is the minimum datetime between start_datetime - lookback_window and the stream_state's datetime 203 The end of the window is the minimum datetime between the start of the window and end_datetime. 204 205 :return: 206 """ 207 end_datetime = self.select_best_end_datetime() 208 start_datetime = self._calculate_earliest_possible_value(self.select_best_end_datetime()) 209 return self._partition_daterange(start_datetime, end_datetime, self._step) 210 211 def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: 212 # Datetime based cursors operate over slices made up of datetime ranges. Stream state is based on the progress 213 # through each slice and does not belong to a specific slice. We just return stream state as it is. 214 return self.get_stream_state() 215 216 def _calculate_earliest_possible_value( 217 self, end_datetime: datetime.datetime 218 ) -> datetime.datetime: 219 lookback_delta = self._parse_timedelta( 220 self._lookback_window.eval(self.config) if self._lookback_window else "P0D" 221 ) 222 earliest_possible_start_datetime = min( 223 self._start_datetime.get_datetime(self.config), end_datetime 224 ) 225 try: 226 cursor_datetime = ( 227 self._calculate_cursor_datetime_from_state(self.get_stream_state()) - lookback_delta 228 ) 229 except OverflowError: 230 # cursor_datetime defers to the minimum date if it does not exist in the state. Trying to subtract 231 # a timedelta from the minimum datetime results in an OverflowError 232 cursor_datetime = self._calculate_cursor_datetime_from_state(self.get_stream_state()) 233 return max(earliest_possible_start_datetime, cursor_datetime) 234 235 def select_best_end_datetime(self) -> datetime.datetime: 236 """ 237 Returns the optimal end datetime. 238 This method compares the current datetime with a pre-configured end datetime 239 and returns the earlier of the two. If no pre-configured end datetime is set, 240 the current datetime is returned. 241 242 :return datetime.datetime: The best end datetime, which is either the current datetime or the pre-configured end datetime, whichever is earlier. 243 """ 244 now = datetime.datetime.now(tz=self._timezone) 245 if not self._end_datetime: 246 return now 247 return min(self._end_datetime.get_datetime(self.config), now) 248 249 def _calculate_cursor_datetime_from_state( 250 self, stream_state: Mapping[str, Any] 251 ) -> datetime.datetime: 252 if self.cursor_field.eval(self.config, stream_state=stream_state) in stream_state: # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__ 253 return self.parse_date(stream_state[self.cursor_field.eval(self.config)]) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__ 254 return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc) 255 256 def _format_datetime(self, dt: datetime.datetime) -> str: 257 return self._parser.format(dt, self.datetime_format) 258 259 def _partition_daterange( 260 self, 261 start: datetime.datetime, 262 end: datetime.datetime, 263 step: Union[datetime.timedelta, Duration], 264 ) -> List[StreamSlice]: 265 start_field = self._partition_field_start.eval(self.config) 266 end_field = self._partition_field_end.eval(self.config) 267 dates = [] 268 269 while self._is_within_date_range(start, end): 270 next_start = self._evaluate_next_start_date_safely(start, step) 271 end_date = self._get_date(next_start - self._cursor_granularity, end, min) 272 dates.append( 273 StreamSlice( 274 partition={}, 275 cursor_slice={ 276 start_field: self._format_datetime(start), 277 end_field: self._format_datetime(end_date), 278 }, 279 ) 280 ) 281 start = next_start 282 return dates 283 284 def _is_within_date_range(self, start: datetime.datetime, end: datetime.datetime) -> bool: 285 if self.is_compare_strictly: 286 return start < end 287 return start <= end 288 289 def _evaluate_next_start_date_safely( 290 self, start: datetime.datetime, step: datetime.timedelta 291 ) -> datetime.datetime: 292 """ 293 Given that we set the default step at datetime.timedelta.max, we will generate an OverflowError when evaluating the next start_date 294 This method assumes that users would never enter a step that would generate an overflow. Given that would be the case, the code 295 would have broken anyway. 296 """ 297 try: 298 return start + step 299 except OverflowError: 300 return datetime.datetime.max.replace(tzinfo=datetime.timezone.utc) 301 302 def _get_date( 303 self, 304 cursor_value: datetime.datetime, 305 default_date: datetime.datetime, 306 comparator: Callable[[datetime.datetime, datetime.datetime], datetime.datetime], 307 ) -> datetime.datetime: 308 cursor_date = cursor_value or default_date 309 return comparator(cursor_date, default_date) 310 311 def parse_date(self, date: str) -> datetime.datetime: 312 for datetime_format in self.cursor_datetime_formats + [self.datetime_format]: 313 try: 314 return self._parser.parse(date, datetime_format) 315 except ValueError: 316 pass 317 raise ValueError(f"No format in {self.cursor_datetime_formats} matching {date}") 318 319 @classmethod 320 def _parse_timedelta(cls, time_str: Optional[str]) -> Union[datetime.timedelta, Duration]: 321 """ 322 :return Parses an ISO 8601 durations into datetime.timedelta or Duration objects. 323 """ 324 if not time_str: 325 return datetime.timedelta(0) 326 return parse_duration(time_str) 327 328 def get_request_params( 329 self, 330 *, 331 stream_state: Optional[StreamState] = None, 332 stream_slice: Optional[StreamSlice] = None, 333 next_page_token: Optional[Mapping[str, Any]] = None, 334 ) -> Mapping[str, Any]: 335 return self._get_request_options(RequestOptionType.request_parameter, stream_slice) 336 337 def get_request_headers( 338 self, 339 *, 340 stream_state: Optional[StreamState] = None, 341 stream_slice: Optional[StreamSlice] = None, 342 next_page_token: Optional[Mapping[str, Any]] = None, 343 ) -> Mapping[str, Any]: 344 return self._get_request_options(RequestOptionType.header, stream_slice) 345 346 def get_request_body_data( 347 self, 348 *, 349 stream_state: Optional[StreamState] = None, 350 stream_slice: Optional[StreamSlice] = None, 351 next_page_token: Optional[Mapping[str, Any]] = None, 352 ) -> Mapping[str, Any]: 353 return self._get_request_options(RequestOptionType.body_data, stream_slice) 354 355 def get_request_body_json( 356 self, 357 *, 358 stream_state: Optional[StreamState] = None, 359 stream_slice: Optional[StreamSlice] = None, 360 next_page_token: Optional[Mapping[str, Any]] = None, 361 ) -> Mapping[str, Any]: 362 return self._get_request_options(RequestOptionType.body_json, stream_slice) 363 364 def request_kwargs(self) -> Mapping[str, Any]: 365 # Never update kwargs 366 return {} 367 368 def _get_request_options( 369 self, option_type: RequestOptionType, stream_slice: Optional[StreamSlice] 370 ) -> Mapping[str, Any]: 371 options: MutableMapping[str, Any] = {} 372 if not stream_slice: 373 return options 374 375 if self.start_time_option and self.start_time_option.inject_into == option_type: 376 start_time_value = stream_slice.get(self._partition_field_start.eval(self.config)) 377 self.start_time_option.inject_into_request(options, start_time_value, self.config) 378 379 if self.end_time_option and self.end_time_option.inject_into == option_type: 380 end_time_value = stream_slice.get(self._partition_field_end.eval(self.config)) 381 self.end_time_option.inject_into_request(options, end_time_value, self.config) 382 383 return options 384 385 def should_be_synced(self, record: Record) -> bool: 386 cursor_field = self.cursor_field.eval(self.config) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__ 387 record_cursor_value = record.get(cursor_field) 388 if not record_cursor_value: 389 self._send_log( 390 Level.WARN, 391 f"Could not find cursor field `{cursor_field}` in record. The incremental sync will assume it needs to be synced", 392 ) 393 return True 394 latest_possible_cursor_value = self.select_best_end_datetime() 395 earliest_possible_cursor_value = self._calculate_earliest_possible_value( 396 latest_possible_cursor_value 397 ) 398 return self._is_within_daterange_boundaries( 399 record, earliest_possible_cursor_value, latest_possible_cursor_value 400 ) 401 402 def _is_within_daterange_boundaries( 403 self, 404 record: Record, 405 start_datetime_boundary: Union[datetime.datetime, str], 406 end_datetime_boundary: Union[datetime.datetime, str], 407 ) -> bool: 408 cursor_field = self.cursor_field.eval(self.config) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__ 409 record_cursor_value = record.get(cursor_field) 410 if not record_cursor_value: 411 self._send_log( 412 Level.WARN, 413 f"Could not find cursor field `{cursor_field}` in record. The record will not be considered when emitting sync state", 414 ) 415 return False 416 if isinstance(start_datetime_boundary, str): 417 start_datetime_boundary = self.parse_date(start_datetime_boundary) 418 if isinstance(end_datetime_boundary, str): 419 end_datetime_boundary = self.parse_date(end_datetime_boundary) 420 return ( 421 start_datetime_boundary <= self.parse_date(record_cursor_value) <= end_datetime_boundary 422 ) 423 424 def _send_log(self, level: Level, message: str) -> None: 425 if self.message_repository: 426 self.message_repository.emit_message( 427 AirbyteMessage( 428 type=Type.LOG, 429 log=AirbyteLogMessage(level=level, message=message), 430 ) 431 ) 432 433 def set_runtime_lookback_window(self, lookback_window_in_seconds: int) -> None: 434 """ 435 Updates the lookback window based on a given number of seconds if the new duration 436 is greater than the currently configured lookback window. 437 438 :param lookback_window_in_seconds: The lookback duration in seconds to potentially update to. 439 """ 440 runtime_lookback_window = duration_isoformat(timedelta(seconds=lookback_window_in_seconds)) 441 config_lookback = parse_duration( 442 self._lookback_window.eval(self.config) if self._lookback_window else "P0D" 443 ) 444 445 # Check if the new runtime lookback window is greater than the current config lookback 446 if parse_duration(runtime_lookback_window) > config_lookback: 447 self._lookback_window = InterpolatedString.create( 448 runtime_lookback_window, parameters={} 449 )
Slices the stream over a datetime range and create a state with format {
Given a start time, end time, a step function, and an optional lookback window, the stream slicer will partition the date range from start time - lookback window to end time.
The step function is defined as a string of the form ISO8601 duration
The timestamp format accepts the same format codes as datetime.strfptime, which are all the format codes required by the 1989 C standard. Full list of accepted format codes: https://man7.org/linux/man-pages/man3/strftime.3.html
Attributes:
- start_datetime (Union[MinMaxDatetime, str]): the datetime that determines the earliest record that should be synced
- end_datetime (Optional[Union[MinMaxDatetime, str]]): the datetime that determines the last record that should be synced
- cursor_field (Union[InterpolatedString, str]): record's cursor field
- datetime_format (str): format of the datetime
- step (Optional[str]): size of the timewindow (ISO8601 duration)
- cursor_granularity (Optional[str]): smallest increment the datetime_format has (ISO 8601 duration) that will be used to ensure that the start of a slice does not overlap with the end of the previous one
- config (Config): connection config
- start_time_option (Optional[RequestOption]): request option for start time
- end_time_option (Optional[RequestOption]): request option for end time
- partition_field_start (Optional[str]): partition start time field
- partition_field_end (Optional[str]): stream slice end time field
- lookback_window (Optional[InterpolatedString]): how many days before start_datetime to read data for (ISO8601 duration)
131 def get_stream_state(self) -> StreamState: 132 return {self.cursor_field.eval(self.config): self._cursor} if self._cursor else {} # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__
Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it is used for two things:
- Interpolation of the requests
- Transformation of records
- Saving the state
For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that allows for emitting the state to the platform.
134 def set_initial_state(self, stream_state: StreamState) -> None: 135 """ 136 Cursors are not initialized with their state. As state is needed in order to function properly, this method should be called 137 before calling anything else 138 139 :param stream_state: The state of the stream as returned by get_stream_state 140 """ 141 self._cursor = ( 142 stream_state.get(self.cursor_field.eval(self.config)) if stream_state else None # type: ignore [union-attr] 143 )
Cursors are not initialized with their state. As state is needed in order to function properly, this method should be called before calling anything else
Parameters
- stream_state: The state of the stream as returned by get_stream_state
145 def observe(self, stream_slice: StreamSlice, record: Record) -> None: 146 """ 147 Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read. 148 149 :param stream_slice: The current slice, which may or may not contain the most recently observed record 150 :param record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the 151 stream state may need to be deferred depending on whether the source reliably orders records by the cursor field. 152 """ 153 record_cursor_value = record.get(self.cursor_field.eval(self.config)) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__ 154 # if the current record has no cursor value, we cannot meaningfully update the state based on it, so there is nothing more to do 155 if not record_cursor_value: 156 return 157 158 start_field = self._partition_field_start.eval(self.config) 159 end_field = self._partition_field_end.eval(self.config) 160 is_highest_observed_cursor_value = ( 161 not self._highest_observed_cursor_field_value 162 or self.parse_date(record_cursor_value) 163 > self.parse_date(self._highest_observed_cursor_field_value) 164 ) 165 if ( 166 self._is_within_daterange_boundaries( 167 record, 168 stream_slice.get(start_field), # type: ignore [arg-type] 169 stream_slice.get(end_field), # type: ignore [arg-type] 170 ) 171 and is_highest_observed_cursor_value 172 ): 173 self._highest_observed_cursor_field_value = record_cursor_value
Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read.
Parameters
- stream_slice: The current slice, which may or may not contain the most recently observed record
- record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the stream state may need to be deferred depending on whether the source reliably orders records by the cursor field.
175 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 176 if stream_slice.partition: 177 raise ValueError( 178 f"Stream slice {stream_slice} should not have a partition. Got {stream_slice.partition}." 179 ) 180 cursor_value_str_by_cursor_value_datetime = dict( 181 map( 182 # we need to ensure the cursor value is preserved as is in the state else the CATs might complain of something like 183 # 2023-01-04T17:30:19.000Z' <= '2023-01-04T17:30:19.000000Z' 184 lambda datetime_str: (self.parse_date(datetime_str), datetime_str), # type: ignore # because of the filter on the next line, this will only be called with a str 185 filter( 186 lambda item: item, [self._cursor, self._highest_observed_cursor_field_value] 187 ), 188 ) 189 ) 190 self._cursor = ( 191 cursor_value_str_by_cursor_value_datetime[ 192 max(cursor_value_str_by_cursor_value_datetime.keys()) 193 ] 194 if cursor_value_str_by_cursor_value_datetime 195 else None 196 )
Update state based on the stream slice. Note that stream_slice.cursor_slice
and most_recent_record.associated_slice
are expected
to be the same but we make it explicit here that stream_slice
should be leveraged to update the state. We do not pass in the
latest record, since cursor instances should maintain the relevant internal state on their own.
Parameters
- stream_slice: slice to close
198 def stream_slices(self) -> Iterable[StreamSlice]: 199 """ 200 Partition the daterange into slices of size = step. 201 202 The start of the window is the minimum datetime between start_datetime - lookback_window and the stream_state's datetime 203 The end of the window is the minimum datetime between the start of the window and end_datetime. 204 205 :return: 206 """ 207 end_datetime = self.select_best_end_datetime() 208 start_datetime = self._calculate_earliest_possible_value(self.select_best_end_datetime()) 209 return self._partition_daterange(start_datetime, end_datetime, self._step)
Partition the daterange into slices of size = step.
The start of the window is the minimum datetime between start_datetime - lookback_window and the stream_state's datetime The end of the window is the minimum datetime between the start of the window and end_datetime.
Returns
211 def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: 212 # Datetime based cursors operate over slices made up of datetime ranges. Stream state is based on the progress 213 # through each slice and does not belong to a specific slice. We just return stream state as it is. 214 return self.get_stream_state()
Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of a specific parent delineated by the incoming slice's partition object.
235 def select_best_end_datetime(self) -> datetime.datetime: 236 """ 237 Returns the optimal end datetime. 238 This method compares the current datetime with a pre-configured end datetime 239 and returns the earlier of the two. If no pre-configured end datetime is set, 240 the current datetime is returned. 241 242 :return datetime.datetime: The best end datetime, which is either the current datetime or the pre-configured end datetime, whichever is earlier. 243 """ 244 now = datetime.datetime.now(tz=self._timezone) 245 if not self._end_datetime: 246 return now 247 return min(self._end_datetime.get_datetime(self.config), now)
Returns the optimal end datetime. This method compares the current datetime with a pre-configured end datetime and returns the earlier of the two. If no pre-configured end datetime is set, the current datetime is returned.
Returns
The best end datetime, which is either the current datetime or the pre-configured end datetime, whichever is earlier.
311 def parse_date(self, date: str) -> datetime.datetime: 312 for datetime_format in self.cursor_datetime_formats + [self.datetime_format]: 313 try: 314 return self._parser.parse(date, datetime_format) 315 except ValueError: 316 pass 317 raise ValueError(f"No format in {self.cursor_datetime_formats} matching {date}")
328 def get_request_params( 329 self, 330 *, 331 stream_state: Optional[StreamState] = None, 332 stream_slice: Optional[StreamSlice] = None, 333 next_page_token: Optional[Mapping[str, Any]] = None, 334 ) -> Mapping[str, Any]: 335 return self._get_request_options(RequestOptionType.request_parameter, stream_slice)
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
337 def get_request_headers( 338 self, 339 *, 340 stream_state: Optional[StreamState] = None, 341 stream_slice: Optional[StreamSlice] = None, 342 next_page_token: Optional[Mapping[str, Any]] = None, 343 ) -> Mapping[str, Any]: 344 return self._get_request_options(RequestOptionType.header, stream_slice)
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
346 def get_request_body_data( 347 self, 348 *, 349 stream_state: Optional[StreamState] = None, 350 stream_slice: Optional[StreamSlice] = None, 351 next_page_token: Optional[Mapping[str, Any]] = None, 352 ) -> Mapping[str, Any]: 353 return self._get_request_options(RequestOptionType.body_data, stream_slice)
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
355 def get_request_body_json( 356 self, 357 *, 358 stream_state: Optional[StreamState] = None, 359 stream_slice: Optional[StreamSlice] = None, 360 next_page_token: Optional[Mapping[str, Any]] = None, 361 ) -> Mapping[str, Any]: 362 return self._get_request_options(RequestOptionType.body_json, stream_slice)
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
385 def should_be_synced(self, record: Record) -> bool: 386 cursor_field = self.cursor_field.eval(self.config) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__ 387 record_cursor_value = record.get(cursor_field) 388 if not record_cursor_value: 389 self._send_log( 390 Level.WARN, 391 f"Could not find cursor field `{cursor_field}` in record. The incremental sync will assume it needs to be synced", 392 ) 393 return True 394 latest_possible_cursor_value = self.select_best_end_datetime() 395 earliest_possible_cursor_value = self._calculate_earliest_possible_value( 396 latest_possible_cursor_value 397 ) 398 return self._is_within_daterange_boundaries( 399 record, earliest_possible_cursor_value, latest_possible_cursor_value 400 )
Evaluating if a record should be synced allows for filtering and stop condition on pagination
433 def set_runtime_lookback_window(self, lookback_window_in_seconds: int) -> None: 434 """ 435 Updates the lookback window based on a given number of seconds if the new duration 436 is greater than the currently configured lookback window. 437 438 :param lookback_window_in_seconds: The lookback duration in seconds to potentially update to. 439 """ 440 runtime_lookback_window = duration_isoformat(timedelta(seconds=lookback_window_in_seconds)) 441 config_lookback = parse_duration( 442 self._lookback_window.eval(self.config) if self._lookback_window else "P0D" 443 ) 444 445 # Check if the new runtime lookback window is greater than the current config lookback 446 if parse_duration(runtime_lookback_window) > config_lookback: 447 self._lookback_window = InterpolatedString.create( 448 runtime_lookback_window, parameters={} 449 )
Updates the lookback window based on a given number of seconds if the new duration is greater than the currently configured lookback window.
Parameters
- lookback_window_in_seconds: The lookback duration in seconds to potentially update to.
10class DeclarativeCursor(Cursor, StreamSlicer, ABC): 11 """ 12 DeclarativeCursors are components that allow for checkpointing syncs. In addition to managing the fetching and updating of 13 state, declarative cursors also manage stream slicing and injecting slice values into outbound requests. 14 """
DeclarativeCursors are components that allow for checkpointing syncs. In addition to managing the fetching and updating of state, declarative cursors also manage stream slicing and injecting slice values into outbound requests.
Inherited Members
72class GlobalSubstreamCursor(DeclarativeCursor): 73 """ 74 The GlobalSubstreamCursor is designed to track the state of substreams using a single global cursor. 75 This class is beneficial for streams with many partitions, as it allows the state to be managed globally 76 instead of per partition, simplifying state management and reducing the size of state messages. 77 78 This cursor is activated by setting the `global_substream_cursor` parameter for incremental sync. 79 80 Warnings: 81 - This class enforces a minimal lookback window for substream based on the duration of the previous sync to avoid losing records. This lookback ensures that any records added or updated during the sync are captured in subsequent syncs. 82 - The global cursor is updated only at the end of the sync. If the sync ends prematurely (e.g., due to an exception), the state will not be updated. 83 - When using the `incremental_dependency` option, the sync will progress through parent records, preventing the sync from getting infinitely stuck. However, it is crucial to understand the requirements for both the `global_substream_cursor` and `incremental_dependency` options to avoid data loss. 84 """ 85 86 def __init__(self, stream_cursor: DatetimeBasedCursor, partition_router: PartitionRouter): 87 self._stream_cursor = stream_cursor 88 self._partition_router = partition_router 89 self._timer = Timer() 90 self._lock = threading.Lock() 91 self._slice_semaphore = threading.Semaphore( 92 0 93 ) # Start with 0, indicating no slices being tracked 94 self._all_slices_yielded = False 95 self._lookback_window: Optional[int] = None 96 self._current_partition: Optional[Mapping[str, Any]] = None 97 self._last_slice: bool = False 98 self._parent_state: Optional[Mapping[str, Any]] = None 99 100 def start_slices_generation(self) -> None: 101 self._timer.start() 102 103 def stream_slices(self) -> Iterable[StreamSlice]: 104 """ 105 Generates stream slices, ensuring the last slice is properly flagged and processed. 106 107 This method creates a sequence of stream slices by iterating over partitions and cursor slices. 108 It holds onto one slice in memory to set `_all_slices_yielded` to `True` before yielding the 109 final slice. A semaphore is used to track the processing of slices, ensuring that `close_slice` 110 is called only after all slices have been processed. 111 112 We expect the following events: 113 * Yields all the slices except the last one. At this point, `close_slice` won't actually close the global slice as `self._all_slices_yielded == False` 114 * Release the semaphore one last time before setting `self._all_slices_yielded = True`. This will cause `close_slice` to know about all the slices before we indicate that all slices have been yielded so the left side of `if self._all_slices_yielded and self._slice_semaphore._value == 0` will be false if not everything is closed 115 * Setting `self._all_slices_yielded = True`. We do that before actually yielding the last slice as the caller of `stream_slices` might stop iterating at any point and hence the code after `yield` might not be executed 116 * Yield the last slice. At that point, once there are as many slices yielded as closes, the global slice will be closed too 117 """ 118 slice_generator = ( 119 StreamSlice( 120 partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields 121 ) 122 for partition in self._partition_router.stream_slices() 123 for cursor_slice in self._stream_cursor.stream_slices() 124 ) 125 126 self.start_slices_generation() 127 for slice, last, state in iterate_with_last_flag_and_state( 128 slice_generator, self._partition_router.get_stream_state 129 ): 130 self._parent_state = state 131 self.register_slice(last) 132 yield slice 133 self._parent_state = self._partition_router.get_stream_state() 134 135 def generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[StreamSlice]: 136 slice_generator = ( 137 StreamSlice( 138 partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields 139 ) 140 for cursor_slice in self._stream_cursor.stream_slices() 141 ) 142 143 yield from slice_generator 144 145 def register_slice(self, last: bool) -> None: 146 """ 147 Tracks the processing of a stream slice. 148 149 Releases the semaphore for each slice. If it's the last slice (`last=True`), 150 sets `_all_slices_yielded` to `True` to indicate no more slices will be processed. 151 152 Args: 153 last (bool): True if the current slice is the last in the sequence. 154 """ 155 self._slice_semaphore.release() 156 if last: 157 self._all_slices_yielded = True 158 159 def set_initial_state(self, stream_state: StreamState) -> None: 160 """ 161 Set the initial state for the cursors. 162 163 This method initializes the state for the global cursor using the provided stream state. 164 165 Additionally, it sets the parent state for partition routers that are based on parent streams. If a partition router 166 does not have parent streams, this step will be skipped due to the default PartitionRouter implementation. 167 168 Args: 169 stream_state (StreamState): The state of the streams to be set. The format of the stream state should be: 170 { 171 "state": { 172 "last_updated": "2023-05-27T00:00:00Z" 173 }, 174 "parent_state": { 175 "parent_stream_name": { 176 "last_updated": "2023-05-27T00:00:00Z" 177 } 178 }, 179 "lookback_window": 132 180 } 181 """ 182 if not stream_state: 183 return 184 185 if "lookback_window" in stream_state: 186 self._lookback_window = stream_state["lookback_window"] 187 self._inject_lookback_into_stream_cursor(stream_state["lookback_window"]) 188 189 if "state" in stream_state: 190 self._stream_cursor.set_initial_state(stream_state["state"]) 191 elif "states" not in stream_state: 192 # We assume that `stream_state` is in the old global format 193 # Example: {"global_state_format_key": "global_state_format_value"} 194 self._stream_cursor.set_initial_state(stream_state) 195 196 # Set parent state for partition routers based on parent streams 197 self._partition_router.set_initial_state(stream_state) 198 199 def _inject_lookback_into_stream_cursor(self, lookback_window: int) -> None: 200 """ 201 Modifies the stream cursor's lookback window based on the duration of the previous sync. 202 This adjustment ensures the cursor is set to the minimal lookback window necessary for 203 avoiding missing data. 204 205 Parameters: 206 lookback_window (int): The lookback duration in seconds to be set, derived from 207 the previous sync. 208 209 Raises: 210 ValueError: If the cursor does not support dynamic lookback window adjustments. 211 """ 212 if hasattr(self._stream_cursor, "set_runtime_lookback_window"): 213 self._stream_cursor.set_runtime_lookback_window(lookback_window) 214 else: 215 raise ValueError( 216 "The cursor class for Global Substream Cursor does not have a set_runtime_lookback_window method" 217 ) 218 219 def observe(self, stream_slice: StreamSlice, record: Record) -> None: 220 self._stream_cursor.observe( 221 StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), record 222 ) 223 224 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 225 """ 226 Close the current stream slice. 227 228 This method is called when a stream slice is completed. For the global parent cursor, we close the child cursor 229 only after reading all slices. This ensures that we do not miss any child records from a later parent record 230 if the child cursor is earlier than a record from the first parent record. 231 232 Args: 233 stream_slice (StreamSlice): The stream slice to be closed. 234 *args (Any): Additional arguments. 235 """ 236 with self._lock: 237 self._slice_semaphore.acquire() 238 if self._all_slices_yielded and self._slice_semaphore._value == 0: 239 self._lookback_window = self._timer.finish() 240 self._stream_cursor.close_slice( 241 StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), *args 242 ) 243 244 def get_stream_state(self) -> StreamState: 245 state: dict[str, Any] = {"state": self._stream_cursor.get_stream_state()} 246 247 if self._parent_state: 248 state["parent_state"] = self._parent_state 249 250 if self._lookback_window is not None: 251 state["lookback_window"] = self._lookback_window 252 253 return state 254 255 def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: 256 # stream_slice is ignored as cursor is global 257 return self._stream_cursor.get_stream_state() 258 259 def get_request_params( 260 self, 261 *, 262 stream_state: Optional[StreamState] = None, 263 stream_slice: Optional[StreamSlice] = None, 264 next_page_token: Optional[Mapping[str, Any]] = None, 265 ) -> Mapping[str, Any]: 266 if stream_slice: 267 return self._partition_router.get_request_params( # type: ignore # this always returns a mapping 268 stream_state=stream_state, 269 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 270 next_page_token=next_page_token, 271 ) | self._stream_cursor.get_request_params( 272 stream_state=stream_state, 273 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 274 next_page_token=next_page_token, 275 ) 276 else: 277 raise ValueError("A partition needs to be provided in order to get request params") 278 279 def get_request_headers( 280 self, 281 *, 282 stream_state: Optional[StreamState] = None, 283 stream_slice: Optional[StreamSlice] = None, 284 next_page_token: Optional[Mapping[str, Any]] = None, 285 ) -> Mapping[str, Any]: 286 if stream_slice: 287 return self._partition_router.get_request_headers( # type: ignore # this always returns a mapping 288 stream_state=stream_state, 289 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 290 next_page_token=next_page_token, 291 ) | self._stream_cursor.get_request_headers( 292 stream_state=stream_state, 293 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 294 next_page_token=next_page_token, 295 ) 296 else: 297 raise ValueError("A partition needs to be provided in order to get request headers") 298 299 def get_request_body_data( 300 self, 301 *, 302 stream_state: Optional[StreamState] = None, 303 stream_slice: Optional[StreamSlice] = None, 304 next_page_token: Optional[Mapping[str, Any]] = None, 305 ) -> Union[Mapping[str, Any], str]: 306 if stream_slice: 307 return self._partition_router.get_request_body_data( # type: ignore # this always returns a mapping 308 stream_state=stream_state, 309 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 310 next_page_token=next_page_token, 311 ) | self._stream_cursor.get_request_body_data( 312 stream_state=stream_state, 313 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 314 next_page_token=next_page_token, 315 ) 316 else: 317 raise ValueError("A partition needs to be provided in order to get request body data") 318 319 def get_request_body_json( 320 self, 321 *, 322 stream_state: Optional[StreamState] = None, 323 stream_slice: Optional[StreamSlice] = None, 324 next_page_token: Optional[Mapping[str, Any]] = None, 325 ) -> Mapping[str, Any]: 326 if stream_slice: 327 return self._partition_router.get_request_body_json( # type: ignore # this always returns a mapping 328 stream_state=stream_state, 329 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 330 next_page_token=next_page_token, 331 ) | self._stream_cursor.get_request_body_json( 332 stream_state=stream_state, 333 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 334 next_page_token=next_page_token, 335 ) 336 else: 337 raise ValueError("A partition needs to be provided in order to get request body json") 338 339 def should_be_synced(self, record: Record) -> bool: 340 return self._stream_cursor.should_be_synced(self._convert_record_to_cursor_record(record)) 341 342 @staticmethod 343 def _convert_record_to_cursor_record(record: Record) -> Record: 344 return Record( 345 data=record.data, 346 stream_name=record.stream_name, 347 associated_slice=StreamSlice( 348 partition={}, cursor_slice=record.associated_slice.cursor_slice 349 ) 350 if record.associated_slice 351 else None, 352 )
The GlobalSubstreamCursor is designed to track the state of substreams using a single global cursor. This class is beneficial for streams with many partitions, as it allows the state to be managed globally instead of per partition, simplifying state management and reducing the size of state messages.
This cursor is activated by setting the global_substream_cursor
parameter for incremental sync.
Warnings:
- This class enforces a minimal lookback window for substream based on the duration of the previous sync to avoid losing records. This lookback ensures that any records added or updated during the sync are captured in subsequent syncs.
- The global cursor is updated only at the end of the sync. If the sync ends prematurely (e.g., due to an exception), the state will not be updated.
- When using the
incremental_dependency
option, the sync will progress through parent records, preventing the sync from getting infinitely stuck. However, it is crucial to understand the requirements for both theglobal_substream_cursor
andincremental_dependency
options to avoid data loss.
86 def __init__(self, stream_cursor: DatetimeBasedCursor, partition_router: PartitionRouter): 87 self._stream_cursor = stream_cursor 88 self._partition_router = partition_router 89 self._timer = Timer() 90 self._lock = threading.Lock() 91 self._slice_semaphore = threading.Semaphore( 92 0 93 ) # Start with 0, indicating no slices being tracked 94 self._all_slices_yielded = False 95 self._lookback_window: Optional[int] = None 96 self._current_partition: Optional[Mapping[str, Any]] = None 97 self._last_slice: bool = False 98 self._parent_state: Optional[Mapping[str, Any]] = None
103 def stream_slices(self) -> Iterable[StreamSlice]: 104 """ 105 Generates stream slices, ensuring the last slice is properly flagged and processed. 106 107 This method creates a sequence of stream slices by iterating over partitions and cursor slices. 108 It holds onto one slice in memory to set `_all_slices_yielded` to `True` before yielding the 109 final slice. A semaphore is used to track the processing of slices, ensuring that `close_slice` 110 is called only after all slices have been processed. 111 112 We expect the following events: 113 * Yields all the slices except the last one. At this point, `close_slice` won't actually close the global slice as `self._all_slices_yielded == False` 114 * Release the semaphore one last time before setting `self._all_slices_yielded = True`. This will cause `close_slice` to know about all the slices before we indicate that all slices have been yielded so the left side of `if self._all_slices_yielded and self._slice_semaphore._value == 0` will be false if not everything is closed 115 * Setting `self._all_slices_yielded = True`. We do that before actually yielding the last slice as the caller of `stream_slices` might stop iterating at any point and hence the code after `yield` might not be executed 116 * Yield the last slice. At that point, once there are as many slices yielded as closes, the global slice will be closed too 117 """ 118 slice_generator = ( 119 StreamSlice( 120 partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields 121 ) 122 for partition in self._partition_router.stream_slices() 123 for cursor_slice in self._stream_cursor.stream_slices() 124 ) 125 126 self.start_slices_generation() 127 for slice, last, state in iterate_with_last_flag_and_state( 128 slice_generator, self._partition_router.get_stream_state 129 ): 130 self._parent_state = state 131 self.register_slice(last) 132 yield slice 133 self._parent_state = self._partition_router.get_stream_state()
Generates stream slices, ensuring the last slice is properly flagged and processed.
This method creates a sequence of stream slices by iterating over partitions and cursor slices.
It holds onto one slice in memory to set _all_slices_yielded
to True
before yielding the
final slice. A semaphore is used to track the processing of slices, ensuring that close_slice
is called only after all slices have been processed.
We expect the following events:
- Yields all the slices except the last one. At this point,
close_slice
won't actually close the global slice asself._all_slices_yielded == False
- Release the semaphore one last time before setting
self._all_slices_yielded = True
. This will causeclose_slice
to know about all the slices before we indicate that all slices have been yielded so the left side ofif self._all_slices_yielded and self._slice_semaphore._value == 0
will be false if not everything is closed - Setting
self._all_slices_yielded = True
. We do that before actually yielding the last slice as the caller ofstream_slices
might stop iterating at any point and hence the code afteryield
might not be executed - Yield the last slice. At that point, once there are as many slices yielded as closes, the global slice will be closed too
135 def generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[StreamSlice]: 136 slice_generator = ( 137 StreamSlice( 138 partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields 139 ) 140 for cursor_slice in self._stream_cursor.stream_slices() 141 ) 142 143 yield from slice_generator
145 def register_slice(self, last: bool) -> None: 146 """ 147 Tracks the processing of a stream slice. 148 149 Releases the semaphore for each slice. If it's the last slice (`last=True`), 150 sets `_all_slices_yielded` to `True` to indicate no more slices will be processed. 151 152 Args: 153 last (bool): True if the current slice is the last in the sequence. 154 """ 155 self._slice_semaphore.release() 156 if last: 157 self._all_slices_yielded = True
Tracks the processing of a stream slice.
Releases the semaphore for each slice. If it's the last slice (last=True
),
sets _all_slices_yielded
to True
to indicate no more slices will be processed.
Arguments:
- last (bool): True if the current slice is the last in the sequence.
159 def set_initial_state(self, stream_state: StreamState) -> None: 160 """ 161 Set the initial state for the cursors. 162 163 This method initializes the state for the global cursor using the provided stream state. 164 165 Additionally, it sets the parent state for partition routers that are based on parent streams. If a partition router 166 does not have parent streams, this step will be skipped due to the default PartitionRouter implementation. 167 168 Args: 169 stream_state (StreamState): The state of the streams to be set. The format of the stream state should be: 170 { 171 "state": { 172 "last_updated": "2023-05-27T00:00:00Z" 173 }, 174 "parent_state": { 175 "parent_stream_name": { 176 "last_updated": "2023-05-27T00:00:00Z" 177 } 178 }, 179 "lookback_window": 132 180 } 181 """ 182 if not stream_state: 183 return 184 185 if "lookback_window" in stream_state: 186 self._lookback_window = stream_state["lookback_window"] 187 self._inject_lookback_into_stream_cursor(stream_state["lookback_window"]) 188 189 if "state" in stream_state: 190 self._stream_cursor.set_initial_state(stream_state["state"]) 191 elif "states" not in stream_state: 192 # We assume that `stream_state` is in the old global format 193 # Example: {"global_state_format_key": "global_state_format_value"} 194 self._stream_cursor.set_initial_state(stream_state) 195 196 # Set parent state for partition routers based on parent streams 197 self._partition_router.set_initial_state(stream_state)
Set the initial state for the cursors.
This method initializes the state for the global cursor using the provided stream state.
Additionally, it sets the parent state for partition routers that are based on parent streams. If a partition router does not have parent streams, this step will be skipped due to the default PartitionRouter implementation.
Arguments:
- stream_state (StreamState): The state of the streams to be set. The format of the stream state should be: { "state": { "last_updated": "2023-05-27T00:00:00Z" }, "parent_state": { "parent_stream_name": { "last_updated": "2023-05-27T00:00:00Z" } }, "lookback_window": 132 }
219 def observe(self, stream_slice: StreamSlice, record: Record) -> None: 220 self._stream_cursor.observe( 221 StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), record 222 )
Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read.
Parameters
- stream_slice: The current slice, which may or may not contain the most recently observed record
- record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the stream state may need to be deferred depending on whether the source reliably orders records by the cursor field.
224 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 225 """ 226 Close the current stream slice. 227 228 This method is called when a stream slice is completed. For the global parent cursor, we close the child cursor 229 only after reading all slices. This ensures that we do not miss any child records from a later parent record 230 if the child cursor is earlier than a record from the first parent record. 231 232 Args: 233 stream_slice (StreamSlice): The stream slice to be closed. 234 *args (Any): Additional arguments. 235 """ 236 with self._lock: 237 self._slice_semaphore.acquire() 238 if self._all_slices_yielded and self._slice_semaphore._value == 0: 239 self._lookback_window = self._timer.finish() 240 self._stream_cursor.close_slice( 241 StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), *args 242 )
Close the current stream slice.
This method is called when a stream slice is completed. For the global parent cursor, we close the child cursor only after reading all slices. This ensures that we do not miss any child records from a later parent record if the child cursor is earlier than a record from the first parent record.
Arguments:
- stream_slice (StreamSlice): The stream slice to be closed.
- *args (Any): Additional arguments.
244 def get_stream_state(self) -> StreamState: 245 state: dict[str, Any] = {"state": self._stream_cursor.get_stream_state()} 246 247 if self._parent_state: 248 state["parent_state"] = self._parent_state 249 250 if self._lookback_window is not None: 251 state["lookback_window"] = self._lookback_window 252 253 return state
Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it is used for two things:
- Interpolation of the requests
- Transformation of records
- Saving the state
For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that allows for emitting the state to the platform.
255 def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: 256 # stream_slice is ignored as cursor is global 257 return self._stream_cursor.get_stream_state()
Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of a specific parent delineated by the incoming slice's partition object.
259 def get_request_params( 260 self, 261 *, 262 stream_state: Optional[StreamState] = None, 263 stream_slice: Optional[StreamSlice] = None, 264 next_page_token: Optional[Mapping[str, Any]] = None, 265 ) -> Mapping[str, Any]: 266 if stream_slice: 267 return self._partition_router.get_request_params( # type: ignore # this always returns a mapping 268 stream_state=stream_state, 269 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 270 next_page_token=next_page_token, 271 ) | self._stream_cursor.get_request_params( 272 stream_state=stream_state, 273 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 274 next_page_token=next_page_token, 275 ) 276 else: 277 raise ValueError("A partition needs to be provided in order to get request params")
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
279 def get_request_headers( 280 self, 281 *, 282 stream_state: Optional[StreamState] = None, 283 stream_slice: Optional[StreamSlice] = None, 284 next_page_token: Optional[Mapping[str, Any]] = None, 285 ) -> Mapping[str, Any]: 286 if stream_slice: 287 return self._partition_router.get_request_headers( # type: ignore # this always returns a mapping 288 stream_state=stream_state, 289 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 290 next_page_token=next_page_token, 291 ) | self._stream_cursor.get_request_headers( 292 stream_state=stream_state, 293 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 294 next_page_token=next_page_token, 295 ) 296 else: 297 raise ValueError("A partition needs to be provided in order to get request headers")
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
299 def get_request_body_data( 300 self, 301 *, 302 stream_state: Optional[StreamState] = None, 303 stream_slice: Optional[StreamSlice] = None, 304 next_page_token: Optional[Mapping[str, Any]] = None, 305 ) -> Union[Mapping[str, Any], str]: 306 if stream_slice: 307 return self._partition_router.get_request_body_data( # type: ignore # this always returns a mapping 308 stream_state=stream_state, 309 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 310 next_page_token=next_page_token, 311 ) | self._stream_cursor.get_request_body_data( 312 stream_state=stream_state, 313 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 314 next_page_token=next_page_token, 315 ) 316 else: 317 raise ValueError("A partition needs to be provided in order to get request body data")
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
319 def get_request_body_json( 320 self, 321 *, 322 stream_state: Optional[StreamState] = None, 323 stream_slice: Optional[StreamSlice] = None, 324 next_page_token: Optional[Mapping[str, Any]] = None, 325 ) -> Mapping[str, Any]: 326 if stream_slice: 327 return self._partition_router.get_request_body_json( # type: ignore # this always returns a mapping 328 stream_state=stream_state, 329 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 330 next_page_token=next_page_token, 331 ) | self._stream_cursor.get_request_body_json( 332 stream_state=stream_state, 333 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 334 next_page_token=next_page_token, 335 ) 336 else: 337 raise ValueError("A partition needs to be provided in order to get request body json")
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
28class PerPartitionCursor(DeclarativeCursor): 29 """ 30 Manages state per partition when a stream has many partitions, to prevent data loss or duplication. 31 32 **Partition Limitation and Limit Reached Logic** 33 34 - **DEFAULT_MAX_PARTITIONS_NUMBER**: The maximum number of partitions to keep in memory (default is 10,000). 35 - **_cursor_per_partition**: An ordered dictionary that stores cursors for each partition. 36 - **_over_limit**: A counter that increments each time an oldest partition is removed when the limit is exceeded. 37 38 The class ensures that the number of partitions tracked does not exceed the `DEFAULT_MAX_PARTITIONS_NUMBER` to prevent excessive memory usage. 39 40 - When the number of partitions exceeds the limit, the oldest partitions are removed from `_cursor_per_partition`, and `_over_limit` is incremented accordingly. 41 - The `limit_reached` method returns `True` when `_over_limit` exceeds `DEFAULT_MAX_PARTITIONS_NUMBER`, indicating that the global cursor should be used instead of per-partition cursors. 42 43 This approach avoids unnecessary switching to a global cursor due to temporary spikes in partition counts, ensuring that switching is only done when a sustained high number of partitions is observed. 44 """ 45 46 DEFAULT_MAX_PARTITIONS_NUMBER = 10000 47 _NO_STATE: Mapping[str, Any] = {} 48 _NO_CURSOR_STATE: Mapping[str, Any] = {} 49 _KEY = 0 50 _VALUE = 1 51 _state_to_migrate_from: Mapping[str, Any] = {} 52 53 def __init__(self, cursor_factory: CursorFactory, partition_router: PartitionRouter): 54 self._cursor_factory = cursor_factory 55 self._partition_router = partition_router 56 # The dict is ordered to ensure that once the maximum number of partitions is reached, 57 # the oldest partitions can be efficiently removed, maintaining the most recent partitions. 58 self._cursor_per_partition: OrderedDict[str, DeclarativeCursor] = OrderedDict() 59 self._over_limit = 0 60 self._partition_serializer = PerPartitionKeySerializer() 61 62 def stream_slices(self) -> Iterable[StreamSlice]: 63 slices = self._partition_router.stream_slices() 64 for partition in slices: 65 yield from self.generate_slices_from_partition(partition) 66 67 def generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[StreamSlice]: 68 # Ensure the maximum number of partitions is not exceeded 69 self._ensure_partition_limit() 70 71 cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition)) 72 if not cursor: 73 partition_state = ( 74 self._state_to_migrate_from 75 if self._state_to_migrate_from 76 else self._NO_CURSOR_STATE 77 ) 78 cursor = self._create_cursor(partition_state) 79 self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor 80 81 for cursor_slice in cursor.stream_slices(): 82 yield StreamSlice( 83 partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields 84 ) 85 86 def _ensure_partition_limit(self) -> None: 87 """ 88 Ensure the maximum number of partitions is not exceeded. If so, the oldest added partition will be dropped. 89 """ 90 while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1: 91 self._over_limit += 1 92 oldest_partition = self._cursor_per_partition.popitem(last=False)[ 93 0 94 ] # Remove the oldest partition 95 logger.warning( 96 f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._over_limit}." 97 ) 98 99 def limit_reached(self) -> bool: 100 return self._over_limit > self.DEFAULT_MAX_PARTITIONS_NUMBER 101 102 def set_initial_state(self, stream_state: StreamState) -> None: 103 """ 104 Set the initial state for the cursors. 105 106 This method initializes the state for each partition cursor using the provided stream state. 107 If a partition state is provided in the stream state, it will update the corresponding partition cursor with this state. 108 109 Additionally, it sets the parent state for partition routers that are based on parent streams. If a partition router 110 does not have parent streams, this step will be skipped due to the default PartitionRouter implementation. 111 112 Args: 113 stream_state (StreamState): The state of the streams to be set. The format of the stream state should be: 114 { 115 "states": [ 116 { 117 "partition": { 118 "partition_key": "value" 119 }, 120 "cursor": { 121 "last_updated": "2023-05-27T00:00:00Z" 122 } 123 } 124 ], 125 "parent_state": { 126 "parent_stream_name": { 127 "last_updated": "2023-05-27T00:00:00Z" 128 } 129 } 130 } 131 """ 132 if not stream_state: 133 return 134 135 if "states" not in stream_state: 136 # We assume that `stream_state` is in a global format that can be applied to all partitions. 137 # Example: {"global_state_format_key": "global_state_format_value"} 138 self._state_to_migrate_from = stream_state 139 140 else: 141 for state in stream_state["states"]: 142 self._cursor_per_partition[self._to_partition_key(state["partition"])] = ( 143 self._create_cursor(state["cursor"]) 144 ) 145 146 # set default state for missing partitions if it is per partition with fallback to global 147 if "state" in stream_state: 148 self._state_to_migrate_from = stream_state["state"] 149 150 # Set parent state for partition routers based on parent streams 151 self._partition_router.set_initial_state(stream_state) 152 153 def observe(self, stream_slice: StreamSlice, record: Record) -> None: 154 self._cursor_per_partition[self._to_partition_key(stream_slice.partition)].observe( 155 StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), record 156 ) 157 158 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 159 try: 160 self._cursor_per_partition[self._to_partition_key(stream_slice.partition)].close_slice( 161 StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), *args 162 ) 163 except KeyError as exception: 164 raise ValueError( 165 f"Partition {str(exception)} could not be found in current state based on the record. This is unexpected because " 166 f"we should only update state for partitions that were emitted during `stream_slices`" 167 ) 168 169 def get_stream_state(self) -> StreamState: 170 states = [] 171 for partition_tuple, cursor in self._cursor_per_partition.items(): 172 cursor_state = cursor.get_stream_state() 173 if cursor_state: 174 states.append( 175 { 176 "partition": self._to_dict(partition_tuple), 177 "cursor": cursor_state, 178 } 179 ) 180 state: dict[str, Any] = {"states": states} 181 182 parent_state = self._partition_router.get_stream_state() 183 if parent_state: 184 state["parent_state"] = parent_state 185 return state 186 187 def _get_state_for_partition(self, partition: Mapping[str, Any]) -> Optional[StreamState]: 188 cursor = self._cursor_per_partition.get(self._to_partition_key(partition)) 189 if cursor: 190 return cursor.get_stream_state() 191 192 return None 193 194 @staticmethod 195 def _is_new_state(stream_state: Mapping[str, Any]) -> bool: 196 return not bool(stream_state) 197 198 def _to_partition_key(self, partition: Mapping[str, Any]) -> str: 199 return self._partition_serializer.to_partition_key(partition) 200 201 def _to_dict(self, partition_key: str) -> Mapping[str, Any]: 202 return self._partition_serializer.to_partition(partition_key) 203 204 def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: 205 if not stream_slice: 206 raise ValueError("A partition needs to be provided in order to extract a state") 207 208 if not stream_slice: 209 return None 210 211 return self._get_state_for_partition(stream_slice.partition) 212 213 def _create_cursor(self, cursor_state: Any) -> DeclarativeCursor: 214 cursor = self._cursor_factory.create() 215 cursor.set_initial_state(cursor_state) 216 return cursor 217 218 def get_request_params( 219 self, 220 *, 221 stream_state: Optional[StreamState] = None, 222 stream_slice: Optional[StreamSlice] = None, 223 next_page_token: Optional[Mapping[str, Any]] = None, 224 ) -> Mapping[str, Any]: 225 if stream_slice: 226 if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition: 227 self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition)) 228 return self._partition_router.get_request_params( # type: ignore # this always returns a mapping 229 stream_state=stream_state, 230 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 231 next_page_token=next_page_token, 232 ) | self._cursor_per_partition[ 233 self._to_partition_key(stream_slice.partition) 234 ].get_request_params( 235 stream_state=stream_state, 236 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 237 next_page_token=next_page_token, 238 ) 239 else: 240 raise ValueError("A partition needs to be provided in order to get request params") 241 242 def get_request_headers( 243 self, 244 *, 245 stream_state: Optional[StreamState] = None, 246 stream_slice: Optional[StreamSlice] = None, 247 next_page_token: Optional[Mapping[str, Any]] = None, 248 ) -> Mapping[str, Any]: 249 if stream_slice: 250 if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition: 251 self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition)) 252 return self._partition_router.get_request_headers( # type: ignore # this always returns a mapping 253 stream_state=stream_state, 254 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 255 next_page_token=next_page_token, 256 ) | self._cursor_per_partition[ 257 self._to_partition_key(stream_slice.partition) 258 ].get_request_headers( 259 stream_state=stream_state, 260 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 261 next_page_token=next_page_token, 262 ) 263 else: 264 raise ValueError("A partition needs to be provided in order to get request headers") 265 266 def get_request_body_data( 267 self, 268 *, 269 stream_state: Optional[StreamState] = None, 270 stream_slice: Optional[StreamSlice] = None, 271 next_page_token: Optional[Mapping[str, Any]] = None, 272 ) -> Union[Mapping[str, Any], str]: 273 if stream_slice: 274 if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition: 275 self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition)) 276 return self._partition_router.get_request_body_data( # type: ignore # this always returns a mapping 277 stream_state=stream_state, 278 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 279 next_page_token=next_page_token, 280 ) | self._cursor_per_partition[ 281 self._to_partition_key(stream_slice.partition) 282 ].get_request_body_data( 283 stream_state=stream_state, 284 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 285 next_page_token=next_page_token, 286 ) 287 else: 288 raise ValueError("A partition needs to be provided in order to get request body data") 289 290 def get_request_body_json( 291 self, 292 *, 293 stream_state: Optional[StreamState] = None, 294 stream_slice: Optional[StreamSlice] = None, 295 next_page_token: Optional[Mapping[str, Any]] = None, 296 ) -> Mapping[str, Any]: 297 if stream_slice: 298 if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition: 299 self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition)) 300 return self._partition_router.get_request_body_json( # type: ignore # this always returns a mapping 301 stream_state=stream_state, 302 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 303 next_page_token=next_page_token, 304 ) | self._cursor_per_partition[ 305 self._to_partition_key(stream_slice.partition) 306 ].get_request_body_json( 307 stream_state=stream_state, 308 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 309 next_page_token=next_page_token, 310 ) 311 else: 312 raise ValueError("A partition needs to be provided in order to get request body json") 313 314 def should_be_synced(self, record: Record) -> bool: 315 return self._get_cursor(record).should_be_synced( 316 self._convert_record_to_cursor_record(record) 317 ) 318 319 @staticmethod 320 def _convert_record_to_cursor_record(record: Record) -> Record: 321 return Record( 322 data=record.data, 323 stream_name=record.stream_name, 324 associated_slice=StreamSlice( 325 partition={}, cursor_slice=record.associated_slice.cursor_slice 326 ) 327 if record.associated_slice 328 else None, 329 ) 330 331 def _get_cursor(self, record: Record) -> DeclarativeCursor: 332 if not record.associated_slice: 333 raise ValueError( 334 "Invalid state as stream slices that are emitted should refer to an existing cursor" 335 ) 336 partition_key = self._to_partition_key(record.associated_slice.partition) 337 if partition_key not in self._cursor_per_partition: 338 self._create_cursor_for_partition(partition_key) 339 cursor = self._cursor_per_partition[partition_key] 340 return cursor 341 342 def _create_cursor_for_partition(self, partition_key: str) -> None: 343 """ 344 Dynamically creates and initializes a cursor for the specified partition. 345 346 This method is required for `ConcurrentPerPartitionCursor`. For concurrent cursors, 347 stream_slices is executed only for the concurrent cursor, so cursors per partition 348 are not created for the declarative cursor. This method ensures that a cursor is available 349 to create requests for the specified partition. The cursor is initialized 350 with the per-partition state if present in the initial state, or with the global state 351 adjusted by the lookback window, or with the state to migrate from. 352 353 Note: 354 This is a temporary workaround and should be removed once the declarative cursor 355 is decoupled from the concurrent cursor implementation. 356 357 Args: 358 partition_key (str): The unique identifier for the partition for which the cursor 359 needs to be created. 360 """ 361 partition_state = ( 362 self._state_to_migrate_from if self._state_to_migrate_from else self._NO_CURSOR_STATE 363 ) 364 cursor = self._create_cursor(partition_state) 365 366 self._cursor_per_partition[partition_key] = cursor
Manages state per partition when a stream has many partitions, to prevent data loss or duplication.
Partition Limitation and Limit Reached Logic
- DEFAULT_MAX_PARTITIONS_NUMBER: The maximum number of partitions to keep in memory (default is 10,000).
- _cursor_per_partition: An ordered dictionary that stores cursors for each partition.
- _over_limit: A counter that increments each time an oldest partition is removed when the limit is exceeded.
The class ensures that the number of partitions tracked does not exceed the DEFAULT_MAX_PARTITIONS_NUMBER
to prevent excessive memory usage.
- When the number of partitions exceeds the limit, the oldest partitions are removed from
_cursor_per_partition
, and_over_limit
is incremented accordingly. - The
limit_reached
method returnsTrue
when_over_limit
exceedsDEFAULT_MAX_PARTITIONS_NUMBER
, indicating that the global cursor should be used instead of per-partition cursors.
This approach avoids unnecessary switching to a global cursor due to temporary spikes in partition counts, ensuring that switching is only done when a sustained high number of partitions is observed.
53 def __init__(self, cursor_factory: CursorFactory, partition_router: PartitionRouter): 54 self._cursor_factory = cursor_factory 55 self._partition_router = partition_router 56 # The dict is ordered to ensure that once the maximum number of partitions is reached, 57 # the oldest partitions can be efficiently removed, maintaining the most recent partitions. 58 self._cursor_per_partition: OrderedDict[str, DeclarativeCursor] = OrderedDict() 59 self._over_limit = 0 60 self._partition_serializer = PerPartitionKeySerializer()
62 def stream_slices(self) -> Iterable[StreamSlice]: 63 slices = self._partition_router.stream_slices() 64 for partition in slices: 65 yield from self.generate_slices_from_partition(partition)
Defines stream slices
Returns
An iterable of stream slices
67 def generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[StreamSlice]: 68 # Ensure the maximum number of partitions is not exceeded 69 self._ensure_partition_limit() 70 71 cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition)) 72 if not cursor: 73 partition_state = ( 74 self._state_to_migrate_from 75 if self._state_to_migrate_from 76 else self._NO_CURSOR_STATE 77 ) 78 cursor = self._create_cursor(partition_state) 79 self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor 80 81 for cursor_slice in cursor.stream_slices(): 82 yield StreamSlice( 83 partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields 84 )
102 def set_initial_state(self, stream_state: StreamState) -> None: 103 """ 104 Set the initial state for the cursors. 105 106 This method initializes the state for each partition cursor using the provided stream state. 107 If a partition state is provided in the stream state, it will update the corresponding partition cursor with this state. 108 109 Additionally, it sets the parent state for partition routers that are based on parent streams. If a partition router 110 does not have parent streams, this step will be skipped due to the default PartitionRouter implementation. 111 112 Args: 113 stream_state (StreamState): The state of the streams to be set. The format of the stream state should be: 114 { 115 "states": [ 116 { 117 "partition": { 118 "partition_key": "value" 119 }, 120 "cursor": { 121 "last_updated": "2023-05-27T00:00:00Z" 122 } 123 } 124 ], 125 "parent_state": { 126 "parent_stream_name": { 127 "last_updated": "2023-05-27T00:00:00Z" 128 } 129 } 130 } 131 """ 132 if not stream_state: 133 return 134 135 if "states" not in stream_state: 136 # We assume that `stream_state` is in a global format that can be applied to all partitions. 137 # Example: {"global_state_format_key": "global_state_format_value"} 138 self._state_to_migrate_from = stream_state 139 140 else: 141 for state in stream_state["states"]: 142 self._cursor_per_partition[self._to_partition_key(state["partition"])] = ( 143 self._create_cursor(state["cursor"]) 144 ) 145 146 # set default state for missing partitions if it is per partition with fallback to global 147 if "state" in stream_state: 148 self._state_to_migrate_from = stream_state["state"] 149 150 # Set parent state for partition routers based on parent streams 151 self._partition_router.set_initial_state(stream_state)
Set the initial state for the cursors.
This method initializes the state for each partition cursor using the provided stream state. If a partition state is provided in the stream state, it will update the corresponding partition cursor with this state.
Additionally, it sets the parent state for partition routers that are based on parent streams. If a partition router does not have parent streams, this step will be skipped due to the default PartitionRouter implementation.
Arguments:
- stream_state (StreamState): The state of the streams to be set. The format of the stream state should be: { "states": [ { "partition": { "partition_key": "value" }, "cursor": { "last_updated": "2023-05-27T00:00:00Z" } } ], "parent_state": { "parent_stream_name": { "last_updated": "2023-05-27T00:00:00Z" } } }
153 def observe(self, stream_slice: StreamSlice, record: Record) -> None: 154 self._cursor_per_partition[self._to_partition_key(stream_slice.partition)].observe( 155 StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), record 156 )
Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read.
Parameters
- stream_slice: The current slice, which may or may not contain the most recently observed record
- record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the stream state may need to be deferred depending on whether the source reliably orders records by the cursor field.
158 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 159 try: 160 self._cursor_per_partition[self._to_partition_key(stream_slice.partition)].close_slice( 161 StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), *args 162 ) 163 except KeyError as exception: 164 raise ValueError( 165 f"Partition {str(exception)} could not be found in current state based on the record. This is unexpected because " 166 f"we should only update state for partitions that were emitted during `stream_slices`" 167 )
Update state based on the stream slice. Note that stream_slice.cursor_slice
and most_recent_record.associated_slice
are expected
to be the same but we make it explicit here that stream_slice
should be leveraged to update the state. We do not pass in the
latest record, since cursor instances should maintain the relevant internal state on their own.
Parameters
- stream_slice: slice to close
169 def get_stream_state(self) -> StreamState: 170 states = [] 171 for partition_tuple, cursor in self._cursor_per_partition.items(): 172 cursor_state = cursor.get_stream_state() 173 if cursor_state: 174 states.append( 175 { 176 "partition": self._to_dict(partition_tuple), 177 "cursor": cursor_state, 178 } 179 ) 180 state: dict[str, Any] = {"states": states} 181 182 parent_state = self._partition_router.get_stream_state() 183 if parent_state: 184 state["parent_state"] = parent_state 185 return state
Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it is used for two things:
- Interpolation of the requests
- Transformation of records
- Saving the state
For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that allows for emitting the state to the platform.
204 def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: 205 if not stream_slice: 206 raise ValueError("A partition needs to be provided in order to extract a state") 207 208 if not stream_slice: 209 return None 210 211 return self._get_state_for_partition(stream_slice.partition)
Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of a specific parent delineated by the incoming slice's partition object.
218 def get_request_params( 219 self, 220 *, 221 stream_state: Optional[StreamState] = None, 222 stream_slice: Optional[StreamSlice] = None, 223 next_page_token: Optional[Mapping[str, Any]] = None, 224 ) -> Mapping[str, Any]: 225 if stream_slice: 226 if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition: 227 self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition)) 228 return self._partition_router.get_request_params( # type: ignore # this always returns a mapping 229 stream_state=stream_state, 230 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 231 next_page_token=next_page_token, 232 ) | self._cursor_per_partition[ 233 self._to_partition_key(stream_slice.partition) 234 ].get_request_params( 235 stream_state=stream_state, 236 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 237 next_page_token=next_page_token, 238 ) 239 else: 240 raise ValueError("A partition needs to be provided in order to get request params")
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
242 def get_request_headers( 243 self, 244 *, 245 stream_state: Optional[StreamState] = None, 246 stream_slice: Optional[StreamSlice] = None, 247 next_page_token: Optional[Mapping[str, Any]] = None, 248 ) -> Mapping[str, Any]: 249 if stream_slice: 250 if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition: 251 self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition)) 252 return self._partition_router.get_request_headers( # type: ignore # this always returns a mapping 253 stream_state=stream_state, 254 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 255 next_page_token=next_page_token, 256 ) | self._cursor_per_partition[ 257 self._to_partition_key(stream_slice.partition) 258 ].get_request_headers( 259 stream_state=stream_state, 260 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 261 next_page_token=next_page_token, 262 ) 263 else: 264 raise ValueError("A partition needs to be provided in order to get request headers")
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
266 def get_request_body_data( 267 self, 268 *, 269 stream_state: Optional[StreamState] = None, 270 stream_slice: Optional[StreamSlice] = None, 271 next_page_token: Optional[Mapping[str, Any]] = None, 272 ) -> Union[Mapping[str, Any], str]: 273 if stream_slice: 274 if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition: 275 self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition)) 276 return self._partition_router.get_request_body_data( # type: ignore # this always returns a mapping 277 stream_state=stream_state, 278 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 279 next_page_token=next_page_token, 280 ) | self._cursor_per_partition[ 281 self._to_partition_key(stream_slice.partition) 282 ].get_request_body_data( 283 stream_state=stream_state, 284 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 285 next_page_token=next_page_token, 286 ) 287 else: 288 raise ValueError("A partition needs to be provided in order to get request body data")
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
290 def get_request_body_json( 291 self, 292 *, 293 stream_state: Optional[StreamState] = None, 294 stream_slice: Optional[StreamSlice] = None, 295 next_page_token: Optional[Mapping[str, Any]] = None, 296 ) -> Mapping[str, Any]: 297 if stream_slice: 298 if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition: 299 self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition)) 300 return self._partition_router.get_request_body_json( # type: ignore # this always returns a mapping 301 stream_state=stream_state, 302 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 303 next_page_token=next_page_token, 304 ) | self._cursor_per_partition[ 305 self._to_partition_key(stream_slice.partition) 306 ].get_request_body_json( 307 stream_state=stream_state, 308 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 309 next_page_token=next_page_token, 310 ) 311 else: 312 raise ValueError("A partition needs to be provided in order to get request body json")
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
21class PerPartitionWithGlobalCursor(DeclarativeCursor): 22 """ 23 Manages state for streams with multiple partitions, with an optional fallback to a global cursor when specific conditions are met. 24 25 This cursor handles partitioned streams by maintaining individual state per partition using `PerPartitionCursor`. If the number of partitions exceeds a defined limit, it switches to a global cursor (`GlobalSubstreamCursor`) to manage state more efficiently. 26 27 **Overview** 28 29 - **Partition-Based State**: Initially manages state per partition to ensure accurate processing of each partition's data. 30 - **Global Fallback**: Switches to a global cursor when the partition limit is exceeded to handle state management more effectively. 31 32 **Switching Logic** 33 34 - Monitors the number of partitions. 35 - If `PerPartitionCursor.limit_reached()` returns `True`, sets `_use_global_cursor` to `True`, activating the global cursor. 36 37 **Active Cursor Selection** 38 39 - Uses the `_get_active_cursor()` helper method to select the active cursor based on the `_use_global_cursor` flag. 40 - This simplifies the logic and ensures consistent cursor usage across methods. 41 42 **State Structure Example** 43 44 ```json 45 { 46 "states": [ 47 { 48 "partition": {"partition_key": "partition_1"}, 49 "cursor": {"cursor_field": "2021-01-15"} 50 }, 51 { 52 "partition": {"partition_key": "partition_2"}, 53 "cursor": {"cursor_field": "2021-02-14"} 54 } 55 ], 56 "state": { 57 "cursor_field": "2021-02-15" 58 }, 59 "use_global_cursor": false 60 } 61 ``` 62 63 In this example, the cursor is using partition-based state management (`"use_global_cursor": false`), maintaining separate cursor states for each partition. 64 65 **Usage Scenario** 66 67 Suitable for streams where the number of partitions may vary significantly, requiring dynamic switching between per-partition and global state management to ensure data consistency and efficient synchronization. 68 """ 69 70 def __init__( 71 self, 72 cursor_factory: CursorFactory, 73 partition_router: PartitionRouter, 74 stream_cursor: DatetimeBasedCursor, 75 ): 76 self._partition_router = partition_router 77 self._per_partition_cursor = PerPartitionCursor(cursor_factory, partition_router) 78 self._global_cursor = GlobalSubstreamCursor(stream_cursor, partition_router) 79 self._use_global_cursor = False 80 self._current_partition: Optional[Mapping[str, Any]] = None 81 self._last_slice: bool = False 82 self._parent_state: Optional[Mapping[str, Any]] = None 83 84 def _get_active_cursor(self) -> Union[PerPartitionCursor, GlobalSubstreamCursor]: 85 return self._global_cursor if self._use_global_cursor else self._per_partition_cursor 86 87 def stream_slices(self) -> Iterable[StreamSlice]: 88 self._global_cursor.start_slices_generation() 89 90 # Iterate through partitions and process slices 91 for partition, is_last_partition, parent_state in iterate_with_last_flag_and_state( 92 self._partition_router.stream_slices(), self._partition_router.get_stream_state 93 ): 94 # Generate slices for the current cursor and handle the last slice using the flag 95 self._parent_state = parent_state 96 for slice, is_last_slice, _ in iterate_with_last_flag_and_state( 97 self._get_active_cursor().generate_slices_from_partition(partition=partition), 98 lambda: None, 99 ): 100 self._global_cursor.register_slice(is_last_slice and is_last_partition) 101 yield slice 102 self._parent_state = self._partition_router.get_stream_state() 103 104 def set_initial_state(self, stream_state: StreamState) -> None: 105 """ 106 Set the initial state for the cursors. 107 """ 108 self._use_global_cursor = stream_state.get("use_global_cursor", False) 109 110 self._parent_state = stream_state.get("parent_state", {}) 111 112 self._global_cursor.set_initial_state(stream_state) 113 if not self._use_global_cursor: 114 self._per_partition_cursor.set_initial_state(stream_state) 115 116 def observe(self, stream_slice: StreamSlice, record: Record) -> None: 117 if not self._use_global_cursor and self._per_partition_cursor.limit_reached(): 118 self._use_global_cursor = True 119 120 if not self._use_global_cursor: 121 self._per_partition_cursor.observe(stream_slice, record) 122 self._global_cursor.observe(stream_slice, record) 123 124 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 125 if not self._use_global_cursor: 126 self._per_partition_cursor.close_slice(stream_slice, *args) 127 self._global_cursor.close_slice(stream_slice, *args) 128 129 def get_stream_state(self) -> StreamState: 130 final_state: MutableMapping[str, Any] = {"use_global_cursor": self._use_global_cursor} 131 132 final_state.update(self._global_cursor.get_stream_state()) 133 if not self._use_global_cursor: 134 final_state.update(self._per_partition_cursor.get_stream_state()) 135 136 final_state["parent_state"] = self._parent_state 137 if not final_state.get("parent_state"): 138 del final_state["parent_state"] 139 140 return final_state 141 142 def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: 143 return self._get_active_cursor().select_state(stream_slice) 144 145 def get_request_params( 146 self, 147 *, 148 stream_state: Optional[StreamState] = None, 149 stream_slice: Optional[StreamSlice] = None, 150 next_page_token: Optional[Mapping[str, Any]] = None, 151 ) -> Mapping[str, Any]: 152 return self._get_active_cursor().get_request_params( 153 stream_state=stream_state, 154 stream_slice=stream_slice, 155 next_page_token=next_page_token, 156 ) 157 158 def get_request_headers( 159 self, 160 *, 161 stream_state: Optional[StreamState] = None, 162 stream_slice: Optional[StreamSlice] = None, 163 next_page_token: Optional[Mapping[str, Any]] = None, 164 ) -> Mapping[str, Any]: 165 return self._get_active_cursor().get_request_headers( 166 stream_state=stream_state, 167 stream_slice=stream_slice, 168 next_page_token=next_page_token, 169 ) 170 171 def get_request_body_data( 172 self, 173 *, 174 stream_state: Optional[StreamState] = None, 175 stream_slice: Optional[StreamSlice] = None, 176 next_page_token: Optional[Mapping[str, Any]] = None, 177 ) -> Union[Mapping[str, Any], str]: 178 return self._get_active_cursor().get_request_body_data( 179 stream_state=stream_state, 180 stream_slice=stream_slice, 181 next_page_token=next_page_token, 182 ) 183 184 def get_request_body_json( 185 self, 186 *, 187 stream_state: Optional[StreamState] = None, 188 stream_slice: Optional[StreamSlice] = None, 189 next_page_token: Optional[Mapping[str, Any]] = None, 190 ) -> Mapping[str, Any]: 191 return self._get_active_cursor().get_request_body_json( 192 stream_state=stream_state, 193 stream_slice=stream_slice, 194 next_page_token=next_page_token, 195 ) 196 197 def should_be_synced(self, record: Record) -> bool: 198 return self._get_active_cursor().should_be_synced(record)
Manages state for streams with multiple partitions, with an optional fallback to a global cursor when specific conditions are met.
This cursor handles partitioned streams by maintaining individual state per partition using PerPartitionCursor
. If the number of partitions exceeds a defined limit, it switches to a global cursor (GlobalSubstreamCursor
) to manage state more efficiently.
Overview
- Partition-Based State: Initially manages state per partition to ensure accurate processing of each partition's data.
- Global Fallback: Switches to a global cursor when the partition limit is exceeded to handle state management more effectively.
Switching Logic
- Monitors the number of partitions.
- If
PerPartitionCursor.limit_reached()
returnsTrue
, sets_use_global_cursor
toTrue
, activating the global cursor.
Active Cursor Selection
- Uses the
_get_active_cursor()
helper method to select the active cursor based on the_use_global_cursor
flag. - This simplifies the logic and ensures consistent cursor usage across methods.
State Structure Example
{
"states": [
{
"partition": {"partition_key": "partition_1"},
"cursor": {"cursor_field": "2021-01-15"}
},
{
"partition": {"partition_key": "partition_2"},
"cursor": {"cursor_field": "2021-02-14"}
}
],
"state": {
"cursor_field": "2021-02-15"
},
"use_global_cursor": false
}
In this example, the cursor is using partition-based state management ("use_global_cursor": false
), maintaining separate cursor states for each partition.
Usage Scenario
Suitable for streams where the number of partitions may vary significantly, requiring dynamic switching between per-partition and global state management to ensure data consistency and efficient synchronization.
70 def __init__( 71 self, 72 cursor_factory: CursorFactory, 73 partition_router: PartitionRouter, 74 stream_cursor: DatetimeBasedCursor, 75 ): 76 self._partition_router = partition_router 77 self._per_partition_cursor = PerPartitionCursor(cursor_factory, partition_router) 78 self._global_cursor = GlobalSubstreamCursor(stream_cursor, partition_router) 79 self._use_global_cursor = False 80 self._current_partition: Optional[Mapping[str, Any]] = None 81 self._last_slice: bool = False 82 self._parent_state: Optional[Mapping[str, Any]] = None
87 def stream_slices(self) -> Iterable[StreamSlice]: 88 self._global_cursor.start_slices_generation() 89 90 # Iterate through partitions and process slices 91 for partition, is_last_partition, parent_state in iterate_with_last_flag_and_state( 92 self._partition_router.stream_slices(), self._partition_router.get_stream_state 93 ): 94 # Generate slices for the current cursor and handle the last slice using the flag 95 self._parent_state = parent_state 96 for slice, is_last_slice, _ in iterate_with_last_flag_and_state( 97 self._get_active_cursor().generate_slices_from_partition(partition=partition), 98 lambda: None, 99 ): 100 self._global_cursor.register_slice(is_last_slice and is_last_partition) 101 yield slice 102 self._parent_state = self._partition_router.get_stream_state()
Defines stream slices
Returns
An iterable of stream slices
104 def set_initial_state(self, stream_state: StreamState) -> None: 105 """ 106 Set the initial state for the cursors. 107 """ 108 self._use_global_cursor = stream_state.get("use_global_cursor", False) 109 110 self._parent_state = stream_state.get("parent_state", {}) 111 112 self._global_cursor.set_initial_state(stream_state) 113 if not self._use_global_cursor: 114 self._per_partition_cursor.set_initial_state(stream_state)
Set the initial state for the cursors.
116 def observe(self, stream_slice: StreamSlice, record: Record) -> None: 117 if not self._use_global_cursor and self._per_partition_cursor.limit_reached(): 118 self._use_global_cursor = True 119 120 if not self._use_global_cursor: 121 self._per_partition_cursor.observe(stream_slice, record) 122 self._global_cursor.observe(stream_slice, record)
Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read.
Parameters
- stream_slice: The current slice, which may or may not contain the most recently observed record
- record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the stream state may need to be deferred depending on whether the source reliably orders records by the cursor field.
124 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 125 if not self._use_global_cursor: 126 self._per_partition_cursor.close_slice(stream_slice, *args) 127 self._global_cursor.close_slice(stream_slice, *args)
Update state based on the stream slice. Note that stream_slice.cursor_slice
and most_recent_record.associated_slice
are expected
to be the same but we make it explicit here that stream_slice
should be leveraged to update the state. We do not pass in the
latest record, since cursor instances should maintain the relevant internal state on their own.
Parameters
- stream_slice: slice to close
129 def get_stream_state(self) -> StreamState: 130 final_state: MutableMapping[str, Any] = {"use_global_cursor": self._use_global_cursor} 131 132 final_state.update(self._global_cursor.get_stream_state()) 133 if not self._use_global_cursor: 134 final_state.update(self._per_partition_cursor.get_stream_state()) 135 136 final_state["parent_state"] = self._parent_state 137 if not final_state.get("parent_state"): 138 del final_state["parent_state"] 139 140 return final_state
Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it is used for two things:
- Interpolation of the requests
- Transformation of records
- Saving the state
For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that allows for emitting the state to the platform.
142 def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: 143 return self._get_active_cursor().select_state(stream_slice)
Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of a specific parent delineated by the incoming slice's partition object.
145 def get_request_params( 146 self, 147 *, 148 stream_state: Optional[StreamState] = None, 149 stream_slice: Optional[StreamSlice] = None, 150 next_page_token: Optional[Mapping[str, Any]] = None, 151 ) -> Mapping[str, Any]: 152 return self._get_active_cursor().get_request_params( 153 stream_state=stream_state, 154 stream_slice=stream_slice, 155 next_page_token=next_page_token, 156 )
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
158 def get_request_headers( 159 self, 160 *, 161 stream_state: Optional[StreamState] = None, 162 stream_slice: Optional[StreamSlice] = None, 163 next_page_token: Optional[Mapping[str, Any]] = None, 164 ) -> Mapping[str, Any]: 165 return self._get_active_cursor().get_request_headers( 166 stream_state=stream_state, 167 stream_slice=stream_slice, 168 next_page_token=next_page_token, 169 )
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
171 def get_request_body_data( 172 self, 173 *, 174 stream_state: Optional[StreamState] = None, 175 stream_slice: Optional[StreamSlice] = None, 176 next_page_token: Optional[Mapping[str, Any]] = None, 177 ) -> Union[Mapping[str, Any], str]: 178 return self._get_active_cursor().get_request_body_data( 179 stream_state=stream_state, 180 stream_slice=stream_slice, 181 next_page_token=next_page_token, 182 )
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
184 def get_request_body_json( 185 self, 186 *, 187 stream_state: Optional[StreamState] = None, 188 stream_slice: Optional[StreamSlice] = None, 189 next_page_token: Optional[Mapping[str, Any]] = None, 190 ) -> Mapping[str, Any]: 191 return self._get_active_cursor().get_request_body_json( 192 stream_state=stream_state, 193 stream_slice=stream_slice, 194 next_page_token=next_page_token, 195 )
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
12@dataclass 13class ResumableFullRefreshCursor(DeclarativeCursor): 14 parameters: InitVar[Mapping[str, Any]] 15 16 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 17 self._cursor: StreamState = {} 18 19 def get_stream_state(self) -> StreamState: 20 return self._cursor 21 22 def set_initial_state(self, stream_state: StreamState) -> None: 23 self._cursor = stream_state 24 25 def observe(self, stream_slice: StreamSlice, record: Record) -> None: 26 """ 27 Resumable full refresh manages state using a page number so it does not need to update state by observing incoming records. 28 """ 29 pass 30 31 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 32 # The ResumableFullRefreshCursor doesn't support nested streams yet so receiving a partition is unexpected 33 if stream_slice.partition: 34 raise ValueError( 35 f"Stream slice {stream_slice} should not have a partition. Got {stream_slice.partition}." 36 ) 37 self._cursor = stream_slice.cursor_slice 38 39 def should_be_synced(self, record: Record) -> bool: 40 """ 41 Unlike date-based cursors which filter out records outside slice boundaries, resumable full refresh records exist within pages 42 that don't have filterable bounds. We should always return them. 43 """ 44 return True 45 46 def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: 47 # A top-level RFR cursor only manages the state of a single partition 48 return self._cursor 49 50 def stream_slices(self) -> Iterable[StreamSlice]: 51 """ 52 Resumable full refresh cursors only return a single slice and can't perform partitioning because iteration is done per-page 53 along an unbounded set. 54 """ 55 yield from [StreamSlice(cursor_slice=self._cursor, partition={})] 56 57 # This is an interesting pattern that might not seem obvious at first glance. This cursor itself has no functional need to 58 # inject any request values into the outbound response because the up-to-date pagination state is already loaded and 59 # maintained by the paginator component 60 def get_request_params( 61 self, 62 *, 63 stream_state: Optional[StreamState] = None, 64 stream_slice: Optional[StreamSlice] = None, 65 next_page_token: Optional[Mapping[str, Any]] = None, 66 ) -> Mapping[str, Any]: 67 return {} 68 69 def get_request_headers( 70 self, 71 *, 72 stream_state: Optional[StreamState] = None, 73 stream_slice: Optional[StreamSlice] = None, 74 next_page_token: Optional[Mapping[str, Any]] = None, 75 ) -> Mapping[str, Any]: 76 return {} 77 78 def get_request_body_data( 79 self, 80 *, 81 stream_state: Optional[StreamState] = None, 82 stream_slice: Optional[StreamSlice] = None, 83 next_page_token: Optional[Mapping[str, Any]] = None, 84 ) -> Mapping[str, Any]: 85 return {} 86 87 def get_request_body_json( 88 self, 89 *, 90 stream_state: Optional[StreamState] = None, 91 stream_slice: Optional[StreamSlice] = None, 92 next_page_token: Optional[Mapping[str, Any]] = None, 93 ) -> Mapping[str, Any]: 94 return {}
Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it is used for two things:
- Interpolation of the requests
- Transformation of records
- Saving the state
For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that allows for emitting the state to the platform.
Cursors are not initialized with their state. As state is needed in order to function properly, this method should be called before calling anything else
Parameters
- stream_state: The state of the stream as returned by get_stream_state
25 def observe(self, stream_slice: StreamSlice, record: Record) -> None: 26 """ 27 Resumable full refresh manages state using a page number so it does not need to update state by observing incoming records. 28 """ 29 pass
Resumable full refresh manages state using a page number so it does not need to update state by observing incoming records.
31 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 32 # The ResumableFullRefreshCursor doesn't support nested streams yet so receiving a partition is unexpected 33 if stream_slice.partition: 34 raise ValueError( 35 f"Stream slice {stream_slice} should not have a partition. Got {stream_slice.partition}." 36 ) 37 self._cursor = stream_slice.cursor_slice
Update state based on the stream slice. Note that stream_slice.cursor_slice
and most_recent_record.associated_slice
are expected
to be the same but we make it explicit here that stream_slice
should be leveraged to update the state. We do not pass in the
latest record, since cursor instances should maintain the relevant internal state on their own.
Parameters
- stream_slice: slice to close
39 def should_be_synced(self, record: Record) -> bool: 40 """ 41 Unlike date-based cursors which filter out records outside slice boundaries, resumable full refresh records exist within pages 42 that don't have filterable bounds. We should always return them. 43 """ 44 return True
Unlike date-based cursors which filter out records outside slice boundaries, resumable full refresh records exist within pages that don't have filterable bounds. We should always return them.
46 def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: 47 # A top-level RFR cursor only manages the state of a single partition 48 return self._cursor
Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of a specific parent delineated by the incoming slice's partition object.
50 def stream_slices(self) -> Iterable[StreamSlice]: 51 """ 52 Resumable full refresh cursors only return a single slice and can't perform partitioning because iteration is done per-page 53 along an unbounded set. 54 """ 55 yield from [StreamSlice(cursor_slice=self._cursor, partition={})]
Resumable full refresh cursors only return a single slice and can't perform partitioning because iteration is done per-page along an unbounded set.
60 def get_request_params( 61 self, 62 *, 63 stream_state: Optional[StreamState] = None, 64 stream_slice: Optional[StreamSlice] = None, 65 next_page_token: Optional[Mapping[str, Any]] = None, 66 ) -> Mapping[str, Any]: 67 return {}
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
69 def get_request_headers( 70 self, 71 *, 72 stream_state: Optional[StreamState] = None, 73 stream_slice: Optional[StreamSlice] = None, 74 next_page_token: Optional[Mapping[str, Any]] = None, 75 ) -> Mapping[str, Any]: 76 return {}
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
78 def get_request_body_data( 79 self, 80 *, 81 stream_state: Optional[StreamState] = None, 82 stream_slice: Optional[StreamSlice] = None, 83 next_page_token: Optional[Mapping[str, Any]] = None, 84 ) -> Mapping[str, Any]: 85 return {}
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
87 def get_request_body_json( 88 self, 89 *, 90 stream_state: Optional[StreamState] = None, 91 stream_slice: Optional[StreamSlice] = None, 92 next_page_token: Optional[Mapping[str, Any]] = None, 93 ) -> Mapping[str, Any]: 94 return {}
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
97@dataclass 98class ChildPartitionResumableFullRefreshCursor(ResumableFullRefreshCursor): 99 """ 100 The Sub-stream Resumable Cursor for Full-Refresh substreams. 101 Follows the parent type `ResumableFullRefreshCursor` with a small override, 102 to provide the ability to close the substream's slice once it has finished processing. 103 104 Check the `close_slice` method overide for more info about the actual behaviour of this cursor. 105 """ 106 107 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 108 """ 109 Once the current slice has finished syncing: 110 - paginator returns None 111 - no more slices to process 112 113 we assume that the records are processed and emitted already, 114 thus we have to set the cursor to ` __ab_full_refresh_sync_complete: true `, 115 otherwise there is a risk of Inf. Loop processing the same slice. 116 """ 117 self._cursor = FULL_REFRESH_COMPLETE_STATE
The Sub-stream Resumable Cursor for Full-Refresh substreams.
Follows the parent type ResumableFullRefreshCursor
with a small override,
to provide the ability to close the substream's slice once it has finished processing.
Check the close_slice
method overide for more info about the actual behaviour of this cursor.
107 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 108 """ 109 Once the current slice has finished syncing: 110 - paginator returns None 111 - no more slices to process 112 113 we assume that the records are processed and emitted already, 114 thus we have to set the cursor to ` __ab_full_refresh_sync_complete: true `, 115 otherwise there is a risk of Inf. Loop processing the same slice. 116 """ 117 self._cursor = FULL_REFRESH_COMPLETE_STATE
Once the current slice has finished syncing:
- paginator returns None
- no more slices to process
we assume that the records are processed and emitted already,
thus we have to set the cursor to __ab_full_refresh_sync_complete: true
,
otherwise there is a risk of Inf. Loop processing the same slice.