airbyte_cdk.sources.declarative.incremental
1# 2# Copyright (c) 2022 Airbyte, Inc., all rights reserved. 3# 4 5from airbyte_cdk.sources.declarative.incremental.concurrent_partition_cursor import ( 6 ConcurrentCursorFactory, 7 ConcurrentPerPartitionCursor, 8) 9from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor 10from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor 11from airbyte_cdk.sources.declarative.incremental.global_substream_cursor import ( 12 GlobalSubstreamCursor, 13) 14from airbyte_cdk.sources.declarative.incremental.per_partition_cursor import ( 15 CursorFactory, 16 PerPartitionCursor, 17) 18from airbyte_cdk.sources.declarative.incremental.per_partition_with_global import ( 19 PerPartitionWithGlobalCursor, 20) 21from airbyte_cdk.sources.declarative.incremental.resumable_full_refresh_cursor import ( 22 ChildPartitionResumableFullRefreshCursor, 23 ResumableFullRefreshCursor, 24) 25 26__all__ = [ 27 "CursorFactory", 28 "ConcurrentCursorFactory", 29 "ConcurrentPerPartitionCursor", 30 "DatetimeBasedCursor", 31 "DeclarativeCursor", 32 "GlobalSubstreamCursor", 33 "PerPartitionCursor", 34 "PerPartitionWithGlobalCursor", 35 "ResumableFullRefreshCursor", 36 "ChildPartitionResumableFullRefreshCursor", 37]
20class CursorFactory: 21 def __init__(self, create_function: Callable[[], DeclarativeCursor]): 22 self._create_function = create_function 23 24 def create(self) -> DeclarativeCursor: 25 return self._create_function()
35class ConcurrentCursorFactory: 36 def __init__(self, create_function: Callable[..., ConcurrentCursor]): 37 self._create_function = create_function 38 39 def create( 40 self, stream_state: Mapping[str, Any], runtime_lookback_window: Optional[timedelta] 41 ) -> ConcurrentCursor: 42 return self._create_function( 43 stream_state=stream_state, runtime_lookback_window=runtime_lookback_window 44 )
47class ConcurrentPerPartitionCursor(Cursor): 48 """ 49 Manages state per partition when a stream has many partitions, preventing data loss or duplication. 50 51 Attributes: 52 DEFAULT_MAX_PARTITIONS_NUMBER (int): Maximum number of partitions to retain in memory (default is 10,000). 53 54 - **Partition Limitation Logic** 55 Ensures the number of tracked partitions does not exceed the specified limit to prevent memory overuse. Oldest partitions are removed when the limit is reached. 56 57 - **Global Cursor Fallback** 58 New partitions use global state as the initial state to progress the state for deleted or new partitions. The history data added after the initial sync will be missing. 59 60 CurrentPerPartitionCursor expects the state of the ConcurrentCursor to follow the format {cursor_field: cursor_value}. 61 """ 62 63 DEFAULT_MAX_PARTITIONS_NUMBER = 25_000 64 SWITCH_TO_GLOBAL_LIMIT = 10_000 65 _NO_STATE: Mapping[str, Any] = {} 66 _NO_CURSOR_STATE: Mapping[str, Any] = {} 67 _GLOBAL_STATE_KEY = "state" 68 _PERPARTITION_STATE_KEY = "states" 69 _KEY = 0 70 _VALUE = 1 71 72 def __init__( 73 self, 74 cursor_factory: ConcurrentCursorFactory, 75 partition_router: PartitionRouter, 76 stream_name: str, 77 stream_namespace: Optional[str], 78 stream_state: Any, 79 message_repository: MessageRepository, 80 connector_state_manager: ConnectorStateManager, 81 connector_state_converter: AbstractStreamStateConverter, 82 cursor_field: CursorField, 83 ) -> None: 84 self._global_cursor: Optional[StreamState] = {} 85 self._stream_name = stream_name 86 self._stream_namespace = stream_namespace 87 self._message_repository = message_repository 88 self._connector_state_manager = connector_state_manager 89 self._connector_state_converter = connector_state_converter 90 self._cursor_field = cursor_field 91 92 self._cursor_factory = cursor_factory 93 self._partition_router = partition_router 94 95 # The dict is ordered to ensure that once the maximum number of partitions is reached, 96 # the oldest partitions can be efficiently removed, maintaining the most recent partitions. 97 self._cursor_per_partition: OrderedDict[str, ConcurrentCursor] = OrderedDict() 98 self._semaphore_per_partition: OrderedDict[str, threading.Semaphore] = OrderedDict() 99 100 # Parent-state tracking: store each partition’s parent state in creation order 101 self._partition_parent_state_map: OrderedDict[str, Mapping[str, Any]] = OrderedDict() 102 103 self._finished_partitions: set[str] = set() 104 self._lock = threading.Lock() 105 self._timer = Timer() 106 self._new_global_cursor: Optional[StreamState] = None 107 self._lookback_window: int = 0 108 self._parent_state: Optional[StreamState] = None 109 self._number_of_partitions: int = 0 110 self._use_global_cursor: bool = False 111 self._partition_serializer = PerPartitionKeySerializer() 112 # Track the last time a state message was emitted 113 self._last_emission_time: float = 0.0 114 115 self._set_initial_state(stream_state) 116 117 @property 118 def cursor_field(self) -> CursorField: 119 return self._cursor_field 120 121 @property 122 def state(self) -> MutableMapping[str, Any]: 123 state: dict[str, Any] = {"use_global_cursor": self._use_global_cursor} 124 if not self._use_global_cursor: 125 states = [] 126 for partition_tuple, cursor in self._cursor_per_partition.items(): 127 if cursor.state: 128 states.append( 129 { 130 "partition": self._to_dict(partition_tuple), 131 "cursor": copy.deepcopy(cursor.state), 132 } 133 ) 134 state[self._PERPARTITION_STATE_KEY] = states 135 136 if self._global_cursor: 137 state[self._GLOBAL_STATE_KEY] = self._global_cursor 138 if self._lookback_window is not None: 139 state["lookback_window"] = self._lookback_window 140 if self._parent_state is not None: 141 state["parent_state"] = self._parent_state 142 return state 143 144 def close_partition(self, partition: Partition) -> None: 145 # Attempt to retrieve the stream slice 146 stream_slice: Optional[StreamSlice] = partition.to_slice() # type: ignore[assignment] 147 148 # Ensure stream_slice is not None 149 if stream_slice is None: 150 raise ValueError("stream_slice cannot be None") 151 152 partition_key = self._to_partition_key(stream_slice.partition) 153 with self._lock: 154 self._semaphore_per_partition[partition_key].acquire() 155 if not self._use_global_cursor: 156 self._cursor_per_partition[partition_key].close_partition(partition=partition) 157 cursor = self._cursor_per_partition[partition_key] 158 if ( 159 partition_key in self._finished_partitions 160 and self._semaphore_per_partition[partition_key]._value == 0 161 ): 162 self._update_global_cursor(cursor.state[self.cursor_field.cursor_field_key]) 163 164 self._check_and_update_parent_state() 165 166 self._emit_state_message() 167 168 def _check_and_update_parent_state(self) -> None: 169 """ 170 Pop the leftmost partition state from _partition_parent_state_map only if 171 *all partitions* up to (and including) that partition key in _semaphore_per_partition 172 are fully finished (i.e. in _finished_partitions and semaphore._value == 0). 173 Additionally, delete finished semaphores with a value of 0 to free up memory, 174 as they are only needed to track errors and completion status. 175 """ 176 last_closed_state = None 177 178 while self._partition_parent_state_map: 179 # Look at the earliest partition key in creation order 180 earliest_key = next(iter(self._partition_parent_state_map)) 181 182 # Verify ALL partitions from the left up to earliest_key are finished 183 all_left_finished = True 184 for p_key, sem in list( 185 self._semaphore_per_partition.items() 186 ): # Use list to allow modification during iteration 187 # If any earlier partition is still not finished, we must stop 188 if p_key not in self._finished_partitions or sem._value != 0: 189 all_left_finished = False 190 break 191 # Once we've reached earliest_key in the semaphore order, we can stop checking 192 if p_key == earliest_key: 193 break 194 195 # If the partitions up to earliest_key are not all finished, break the while-loop 196 if not all_left_finished: 197 break 198 199 # Pop the leftmost entry from parent-state map 200 _, closed_parent_state = self._partition_parent_state_map.popitem(last=False) 201 last_closed_state = closed_parent_state 202 203 # Clean up finished semaphores with value 0 up to and including earliest_key 204 for p_key in list(self._semaphore_per_partition.keys()): 205 sem = self._semaphore_per_partition[p_key] 206 if p_key in self._finished_partitions and sem._value == 0: 207 del self._semaphore_per_partition[p_key] 208 logger.debug(f"Deleted finished semaphore for partition {p_key} with value 0") 209 if p_key == earliest_key: 210 break 211 212 # Update _parent_state if we popped at least one partition 213 if last_closed_state is not None: 214 self._parent_state = last_closed_state 215 216 def ensure_at_least_one_state_emitted(self) -> None: 217 """ 218 The platform expects at least one state message on successful syncs. Hence, whatever happens, we expect this method to be 219 called. 220 """ 221 if not any( 222 semaphore_item[1]._value for semaphore_item in self._semaphore_per_partition.items() 223 ): 224 self._global_cursor = self._new_global_cursor 225 self._lookback_window = self._timer.finish() 226 self._parent_state = self._partition_router.get_stream_state() 227 self._emit_state_message(throttle=False) 228 229 def _throttle_state_message(self) -> Optional[float]: 230 """ 231 Throttles the state message emission to once every 60 seconds. 232 """ 233 current_time = time.time() 234 if current_time - self._last_emission_time <= 60: 235 return None 236 return current_time 237 238 def _emit_state_message(self, throttle: bool = True) -> None: 239 if throttle: 240 current_time = self._throttle_state_message() 241 if current_time is None: 242 return 243 self._last_emission_time = current_time 244 self._connector_state_manager.update_state_for_stream( 245 self._stream_name, 246 self._stream_namespace, 247 self.state, 248 ) 249 state_message = self._connector_state_manager.create_state_message( 250 self._stream_name, self._stream_namespace 251 ) 252 self._message_repository.emit_message(state_message) 253 254 def stream_slices(self) -> Iterable[StreamSlice]: 255 if self._timer.is_running(): 256 raise RuntimeError("stream_slices has been executed more than once.") 257 258 slices = self._partition_router.stream_slices() 259 self._timer.start() 260 for partition, last, parent_state in iterate_with_last_flag_and_state( 261 slices, self._partition_router.get_stream_state 262 ): 263 yield from self._generate_slices_from_partition(partition, parent_state) 264 265 def _generate_slices_from_partition( 266 self, partition: StreamSlice, parent_state: Mapping[str, Any] 267 ) -> Iterable[StreamSlice]: 268 # Ensure the maximum number of partitions is not exceeded 269 self._ensure_partition_limit() 270 271 partition_key = self._to_partition_key(partition.partition) 272 273 cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition)) 274 if not cursor: 275 cursor = self._create_cursor( 276 self._global_cursor, 277 self._lookback_window if self._global_cursor else 0, 278 ) 279 with self._lock: 280 self._number_of_partitions += 1 281 self._cursor_per_partition[partition_key] = cursor 282 self._semaphore_per_partition[partition_key] = threading.Semaphore(0) 283 284 with self._lock: 285 if ( 286 len(self._partition_parent_state_map) == 0 287 or self._partition_parent_state_map[ 288 next(reversed(self._partition_parent_state_map)) 289 ] 290 != parent_state 291 ): 292 self._partition_parent_state_map[partition_key] = deepcopy(parent_state) 293 294 for cursor_slice, is_last_slice, _ in iterate_with_last_flag_and_state( 295 cursor.stream_slices(), 296 lambda: None, 297 ): 298 self._semaphore_per_partition[partition_key].release() 299 if is_last_slice: 300 self._finished_partitions.add(partition_key) 301 yield StreamSlice( 302 partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields 303 ) 304 305 def _ensure_partition_limit(self) -> None: 306 """ 307 Ensure the maximum number of partitions does not exceed the predefined limit. 308 309 Steps: 310 1. Attempt to remove partitions that are marked as finished in `_finished_partitions`. 311 These partitions are considered processed and safe to delete. 312 2. If the limit is still exceeded and no finished partitions are available for removal, 313 remove the oldest partition unconditionally. We expect failed partitions to be removed. 314 315 Logging: 316 - Logs a warning each time a partition is removed, indicating whether it was finished 317 or removed due to being the oldest. 318 """ 319 if not self._use_global_cursor and self.limit_reached(): 320 logger.info( 321 f"Exceeded the 'SWITCH_TO_GLOBAL_LIMIT' of {self.SWITCH_TO_GLOBAL_LIMIT}. " 322 f"Switching to global cursor for {self._stream_name}." 323 ) 324 self._use_global_cursor = True 325 326 with self._lock: 327 while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1: 328 # Try removing finished partitions first 329 for partition_key in list(self._cursor_per_partition.keys()): 330 if partition_key in self._finished_partitions and ( 331 partition_key not in self._semaphore_per_partition 332 or self._semaphore_per_partition[partition_key]._value == 0 333 ): 334 oldest_partition = self._cursor_per_partition.pop( 335 partition_key 336 ) # Remove the oldest partition 337 logger.warning( 338 f"The maximum number of partitions has been reached. Dropping the oldest finished partition: {oldest_partition}. Over limit: {self._number_of_partitions - self.DEFAULT_MAX_PARTITIONS_NUMBER}." 339 ) 340 break 341 else: 342 # If no finished partitions can be removed, fall back to removing the oldest partition 343 oldest_partition = self._cursor_per_partition.popitem(last=False)[ 344 1 345 ] # Remove the oldest partition 346 logger.warning( 347 f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._number_of_partitions - self.DEFAULT_MAX_PARTITIONS_NUMBER}." 348 ) 349 350 def _set_initial_state(self, stream_state: StreamState) -> None: 351 """ 352 Initialize the cursor's state using the provided `stream_state`. 353 354 This method supports global and per-partition state initialization. 355 356 - **Global State**: If `states` is missing, the `state` is treated as global and applied to all partitions. 357 The `global state` holds a single cursor position representing the latest processed record across all partitions. 358 359 - **Lookback Window**: Configured via `lookback_window`, it defines the period (in seconds) for reprocessing records. 360 This ensures robustness in case of upstream data delays or reordering. If not specified, it defaults to 0. 361 362 - **Per-Partition State**: If `states` is present, each partition's cursor state is initialized separately. 363 364 - **Parent State**: (if available) Used to initialize partition routers based on parent streams. 365 366 Args: 367 stream_state (StreamState): The state of the streams to be set. The format of the stream state should be: 368 { 369 "states": [ 370 { 371 "partition": { 372 "partition_key": "value" 373 }, 374 "cursor": { 375 "last_updated": "2023-05-27T00:00:00Z" 376 } 377 } 378 ], 379 "state": { 380 "last_updated": "2023-05-27T00:00:00Z" 381 }, 382 lookback_window: 10, 383 "parent_state": { 384 "parent_stream_name": { 385 "last_updated": "2023-05-27T00:00:00Z" 386 } 387 } 388 } 389 """ 390 if not stream_state: 391 return 392 393 if ( 394 self._PERPARTITION_STATE_KEY not in stream_state 395 and self._GLOBAL_STATE_KEY not in stream_state 396 ): 397 # We assume that `stream_state` is in a global format that can be applied to all partitions. 398 # Example: {"global_state_format_key": "global_state_format_value"} 399 self._set_global_state(stream_state) 400 401 else: 402 self._use_global_cursor = stream_state.get("use_global_cursor", False) 403 404 self._lookback_window = int(stream_state.get("lookback_window", 0)) 405 406 for state in stream_state.get(self._PERPARTITION_STATE_KEY, []): 407 self._number_of_partitions += 1 408 self._cursor_per_partition[self._to_partition_key(state["partition"])] = ( 409 self._create_cursor(state["cursor"]) 410 ) 411 412 # set default state for missing partitions if it is per partition with fallback to global 413 if self._GLOBAL_STATE_KEY in stream_state: 414 self._set_global_state(stream_state[self._GLOBAL_STATE_KEY]) 415 416 # Set initial parent state 417 if stream_state.get("parent_state"): 418 self._parent_state = stream_state["parent_state"] 419 420 # Set parent state for partition routers based on parent streams 421 self._partition_router.set_initial_state(stream_state) 422 423 def _set_global_state(self, stream_state: Mapping[str, Any]) -> None: 424 """ 425 Initializes the global cursor state from the provided stream state. 426 427 If the cursor field key is present in the stream state, its value is parsed, 428 formatted, and stored as the global cursor. This ensures consistency in state 429 representation across partitions. 430 """ 431 if self.cursor_field.cursor_field_key in stream_state: 432 global_state_value = stream_state[self.cursor_field.cursor_field_key] 433 final_format_global_state_value = self._connector_state_converter.output_format( 434 self._connector_state_converter.parse_value(global_state_value) 435 ) 436 437 fixed_global_state = { 438 self.cursor_field.cursor_field_key: final_format_global_state_value 439 } 440 441 self._global_cursor = deepcopy(fixed_global_state) 442 self._new_global_cursor = deepcopy(fixed_global_state) 443 444 def observe(self, record: Record) -> None: 445 if not record.associated_slice: 446 raise ValueError( 447 "Invalid state as stream slices that are emitted should refer to an existing cursor" 448 ) 449 450 record_cursor = self._connector_state_converter.output_format( 451 self._connector_state_converter.parse_value(self._cursor_field.extract_value(record)) 452 ) 453 self._update_global_cursor(record_cursor) 454 if not self._use_global_cursor: 455 self._cursor_per_partition[ 456 self._to_partition_key(record.associated_slice.partition) 457 ].observe(record) 458 459 def _update_global_cursor(self, value: Any) -> None: 460 if ( 461 self._new_global_cursor is None 462 or self._new_global_cursor[self.cursor_field.cursor_field_key] < value 463 ): 464 self._new_global_cursor = {self.cursor_field.cursor_field_key: copy.deepcopy(value)} 465 466 def _to_partition_key(self, partition: Mapping[str, Any]) -> str: 467 return self._partition_serializer.to_partition_key(partition) 468 469 def _to_dict(self, partition_key: str) -> Mapping[str, Any]: 470 return self._partition_serializer.to_partition(partition_key) 471 472 def _create_cursor( 473 self, cursor_state: Any, runtime_lookback_window: int = 0 474 ) -> ConcurrentCursor: 475 cursor = self._cursor_factory.create( 476 stream_state=deepcopy(cursor_state), 477 runtime_lookback_window=timedelta(seconds=runtime_lookback_window), 478 ) 479 return cursor 480 481 def should_be_synced(self, record: Record) -> bool: 482 return self._get_cursor(record).should_be_synced(record) 483 484 def _get_cursor(self, record: Record) -> ConcurrentCursor: 485 if not record.associated_slice: 486 raise ValueError( 487 "Invalid state as stream slices that are emitted should refer to an existing cursor" 488 ) 489 partition_key = self._to_partition_key(record.associated_slice.partition) 490 if partition_key not in self._cursor_per_partition: 491 raise ValueError( 492 "Invalid state as stream slices that are emitted should refer to an existing cursor" 493 ) 494 cursor = self._cursor_per_partition[partition_key] 495 return cursor 496 497 def limit_reached(self) -> bool: 498 return self._number_of_partitions > self.SWITCH_TO_GLOBAL_LIMIT
Manages state per partition when a stream has many partitions, preventing data loss or duplication.
Attributes:
DEFAULT_MAX_PARTITIONS_NUMBER (int): Maximum number of partitions to retain in memory (default is 10,000).
Partition Limitation Logic Ensures the number of tracked partitions does not exceed the specified limit to prevent memory overuse. Oldest partitions are removed when the limit is reached.
Global Cursor Fallback New partitions use global state as the initial state to progress the state for deleted or new partitions. The history data added after the initial sync will be missing.
CurrentPerPartitionCursor expects the state of the ConcurrentCursor to follow the format {cursor_field: cursor_value}.
72 def __init__( 73 self, 74 cursor_factory: ConcurrentCursorFactory, 75 partition_router: PartitionRouter, 76 stream_name: str, 77 stream_namespace: Optional[str], 78 stream_state: Any, 79 message_repository: MessageRepository, 80 connector_state_manager: ConnectorStateManager, 81 connector_state_converter: AbstractStreamStateConverter, 82 cursor_field: CursorField, 83 ) -> None: 84 self._global_cursor: Optional[StreamState] = {} 85 self._stream_name = stream_name 86 self._stream_namespace = stream_namespace 87 self._message_repository = message_repository 88 self._connector_state_manager = connector_state_manager 89 self._connector_state_converter = connector_state_converter 90 self._cursor_field = cursor_field 91 92 self._cursor_factory = cursor_factory 93 self._partition_router = partition_router 94 95 # The dict is ordered to ensure that once the maximum number of partitions is reached, 96 # the oldest partitions can be efficiently removed, maintaining the most recent partitions. 97 self._cursor_per_partition: OrderedDict[str, ConcurrentCursor] = OrderedDict() 98 self._semaphore_per_partition: OrderedDict[str, threading.Semaphore] = OrderedDict() 99 100 # Parent-state tracking: store each partition’s parent state in creation order 101 self._partition_parent_state_map: OrderedDict[str, Mapping[str, Any]] = OrderedDict() 102 103 self._finished_partitions: set[str] = set() 104 self._lock = threading.Lock() 105 self._timer = Timer() 106 self._new_global_cursor: Optional[StreamState] = None 107 self._lookback_window: int = 0 108 self._parent_state: Optional[StreamState] = None 109 self._number_of_partitions: int = 0 110 self._use_global_cursor: bool = False 111 self._partition_serializer = PerPartitionKeySerializer() 112 # Track the last time a state message was emitted 113 self._last_emission_time: float = 0.0 114 115 self._set_initial_state(stream_state)
121 @property 122 def state(self) -> MutableMapping[str, Any]: 123 state: dict[str, Any] = {"use_global_cursor": self._use_global_cursor} 124 if not self._use_global_cursor: 125 states = [] 126 for partition_tuple, cursor in self._cursor_per_partition.items(): 127 if cursor.state: 128 states.append( 129 { 130 "partition": self._to_dict(partition_tuple), 131 "cursor": copy.deepcopy(cursor.state), 132 } 133 ) 134 state[self._PERPARTITION_STATE_KEY] = states 135 136 if self._global_cursor: 137 state[self._GLOBAL_STATE_KEY] = self._global_cursor 138 if self._lookback_window is not None: 139 state["lookback_window"] = self._lookback_window 140 if self._parent_state is not None: 141 state["parent_state"] = self._parent_state 142 return state
144 def close_partition(self, partition: Partition) -> None: 145 # Attempt to retrieve the stream slice 146 stream_slice: Optional[StreamSlice] = partition.to_slice() # type: ignore[assignment] 147 148 # Ensure stream_slice is not None 149 if stream_slice is None: 150 raise ValueError("stream_slice cannot be None") 151 152 partition_key = self._to_partition_key(stream_slice.partition) 153 with self._lock: 154 self._semaphore_per_partition[partition_key].acquire() 155 if not self._use_global_cursor: 156 self._cursor_per_partition[partition_key].close_partition(partition=partition) 157 cursor = self._cursor_per_partition[partition_key] 158 if ( 159 partition_key in self._finished_partitions 160 and self._semaphore_per_partition[partition_key]._value == 0 161 ): 162 self._update_global_cursor(cursor.state[self.cursor_field.cursor_field_key]) 163 164 self._check_and_update_parent_state() 165 166 self._emit_state_message()
Indicate to the cursor that the partition has been successfully processed
216 def ensure_at_least_one_state_emitted(self) -> None: 217 """ 218 The platform expects at least one state message on successful syncs. Hence, whatever happens, we expect this method to be 219 called. 220 """ 221 if not any( 222 semaphore_item[1]._value for semaphore_item in self._semaphore_per_partition.items() 223 ): 224 self._global_cursor = self._new_global_cursor 225 self._lookback_window = self._timer.finish() 226 self._parent_state = self._partition_router.get_stream_state() 227 self._emit_state_message(throttle=False)
The platform expects at least one state message on successful syncs. Hence, whatever happens, we expect this method to be called.
254 def stream_slices(self) -> Iterable[StreamSlice]: 255 if self._timer.is_running(): 256 raise RuntimeError("stream_slices has been executed more than once.") 257 258 slices = self._partition_router.stream_slices() 259 self._timer.start() 260 for partition, last, parent_state in iterate_with_last_flag_and_state( 261 slices, self._partition_router.get_stream_state 262 ): 263 yield from self._generate_slices_from_partition(partition, parent_state)
Default placeholder implementation of generate_slices. Subclasses can override this method to provide actual behavior.
444 def observe(self, record: Record) -> None: 445 if not record.associated_slice: 446 raise ValueError( 447 "Invalid state as stream slices that are emitted should refer to an existing cursor" 448 ) 449 450 record_cursor = self._connector_state_converter.output_format( 451 self._connector_state_converter.parse_value(self._cursor_field.extract_value(record)) 452 ) 453 self._update_global_cursor(record_cursor) 454 if not self._use_global_cursor: 455 self._cursor_per_partition[ 456 self._to_partition_key(record.associated_slice.partition) 457 ].observe(record)
Indicate to the cursor that the record has been emitted
28@dataclass 29class DatetimeBasedCursor(DeclarativeCursor): 30 """ 31 Slices the stream over a datetime range and create a state with format {<cursor_field>: <datetime> } 32 33 Given a start time, end time, a step function, and an optional lookback window, 34 the stream slicer will partition the date range from start time - lookback window to end time. 35 36 The step function is defined as a string of the form ISO8601 duration 37 38 The timestamp format accepts the same format codes as datetime.strfptime, which are 39 all the format codes required by the 1989 C standard. 40 Full list of accepted format codes: https://man7.org/linux/man-pages/man3/strftime.3.html 41 42 Attributes: 43 start_datetime (Union[MinMaxDatetime, str]): the datetime that determines the earliest record that should be synced 44 end_datetime (Optional[Union[MinMaxDatetime, str]]): the datetime that determines the last record that should be synced 45 cursor_field (Union[InterpolatedString, str]): record's cursor field 46 datetime_format (str): format of the datetime 47 step (Optional[str]): size of the timewindow (ISO8601 duration) 48 cursor_granularity (Optional[str]): smallest increment the datetime_format has (ISO 8601 duration) that will be used to ensure that the start of a slice does not overlap with the end of the previous one 49 config (Config): connection config 50 start_time_option (Optional[RequestOption]): request option for start time 51 end_time_option (Optional[RequestOption]): request option for end time 52 partition_field_start (Optional[str]): partition start time field 53 partition_field_end (Optional[str]): stream slice end time field 54 lookback_window (Optional[InterpolatedString]): how many days before start_datetime to read data for (ISO8601 duration) 55 """ 56 57 start_datetime: Union[MinMaxDatetime, str] 58 cursor_field: Union[InterpolatedString, str] 59 datetime_format: str 60 config: Config 61 parameters: InitVar[Mapping[str, Any]] 62 _highest_observed_cursor_field_value: Optional[str] = field( 63 repr=False, default=None 64 ) # tracks the latest observed datetime, which may not be safe to emit in the case of out-of-order records 65 _cursor: Optional[str] = field( 66 repr=False, default=None 67 ) # tracks the latest observed datetime that is appropriate to emit as stream state 68 end_datetime: Optional[Union[MinMaxDatetime, str]] = None 69 step: Optional[Union[InterpolatedString, str]] = None 70 cursor_granularity: Optional[str] = None 71 start_time_option: Optional[RequestOption] = None 72 end_time_option: Optional[RequestOption] = None 73 partition_field_start: Optional[str] = None 74 partition_field_end: Optional[str] = None 75 lookback_window: Optional[Union[InterpolatedString, str]] = None 76 message_repository: Optional[MessageRepository] = None 77 is_compare_strictly: Optional[bool] = False 78 cursor_datetime_formats: List[str] = field(default_factory=lambda: []) 79 80 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 81 if (self.step and not self.cursor_granularity) or ( 82 not self.step and self.cursor_granularity 83 ): 84 raise ValueError( 85 f"If step is defined, cursor_granularity should be as well and vice-versa. " 86 f"Right now, step is `{self.step}` and cursor_granularity is `{self.cursor_granularity}`" 87 ) 88 self._start_datetime = MinMaxDatetime.create(self.start_datetime, parameters) 89 self._end_datetime = ( 90 None if not self.end_datetime else MinMaxDatetime.create(self.end_datetime, parameters) 91 ) 92 93 self._timezone = datetime.timezone.utc 94 self._interpolation = JinjaInterpolation() 95 96 self._step = ( 97 self._parse_timedelta( 98 InterpolatedString.create(self.step, parameters=parameters).eval(self.config) 99 ) 100 if self.step 101 else datetime.timedelta.max 102 ) 103 self._cursor_granularity = self._parse_timedelta(self.cursor_granularity) 104 self.cursor_field = InterpolatedString.create(self.cursor_field, parameters=parameters) 105 self._lookback_window = ( 106 InterpolatedString.create(self.lookback_window, parameters=parameters) 107 if self.lookback_window 108 else None 109 ) 110 self._partition_field_start = InterpolatedString.create( 111 self.partition_field_start or "start_time", parameters=parameters 112 ) 113 self._partition_field_end = InterpolatedString.create( 114 self.partition_field_end or "end_time", parameters=parameters 115 ) 116 self._parser = DatetimeParser() 117 118 # If datetime format is not specified then start/end datetime should inherit it from the stream slicer 119 if not self._start_datetime.datetime_format: 120 self._start_datetime.datetime_format = self.datetime_format 121 if self._end_datetime and not self._end_datetime.datetime_format: 122 self._end_datetime.datetime_format = self.datetime_format 123 124 if not self.cursor_datetime_formats: 125 self.cursor_datetime_formats = [self.datetime_format] 126 127 _validate_component_request_option_paths( 128 self.config, self.start_time_option, self.end_time_option 129 ) 130 131 def get_stream_state(self) -> StreamState: 132 return {self.cursor_field.eval(self.config): self._cursor} if self._cursor else {} # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__ 133 134 def set_initial_state(self, stream_state: StreamState) -> None: 135 """ 136 Cursors are not initialized with their state. As state is needed in order to function properly, this method should be called 137 before calling anything else 138 139 :param stream_state: The state of the stream as returned by get_stream_state 140 """ 141 self._cursor = ( 142 stream_state.get(self.cursor_field.eval(self.config)) if stream_state else None # type: ignore [union-attr] 143 ) 144 145 def observe(self, stream_slice: StreamSlice, record: Record) -> None: 146 """ 147 Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read. 148 149 :param stream_slice: The current slice, which may or may not contain the most recently observed record 150 :param record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the 151 stream state may need to be deferred depending on whether the source reliably orders records by the cursor field. 152 """ 153 record_cursor_value = record.get(self.cursor_field.eval(self.config)) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__ 154 # if the current record has no cursor value, we cannot meaningfully update the state based on it, so there is nothing more to do 155 if not record_cursor_value: 156 return 157 158 start_field = self._partition_field_start.eval(self.config) 159 end_field = self._partition_field_end.eval(self.config) 160 is_highest_observed_cursor_value = ( 161 not self._highest_observed_cursor_field_value 162 or self.parse_date(record_cursor_value) 163 > self.parse_date(self._highest_observed_cursor_field_value) 164 ) 165 if ( 166 self._is_within_daterange_boundaries( 167 record, 168 stream_slice.get(start_field), # type: ignore [arg-type] 169 stream_slice.get(end_field), # type: ignore [arg-type] 170 ) 171 and is_highest_observed_cursor_value 172 ): 173 self._highest_observed_cursor_field_value = record_cursor_value 174 175 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 176 if stream_slice.partition: 177 raise ValueError( 178 f"Stream slice {stream_slice} should not have a partition. Got {stream_slice.partition}." 179 ) 180 cursor_value_str_by_cursor_value_datetime = dict( 181 map( 182 # we need to ensure the cursor value is preserved as is in the state else the CATs might complain of something like 183 # 2023-01-04T17:30:19.000Z' <= '2023-01-04T17:30:19.000000Z' 184 lambda datetime_str: (self.parse_date(datetime_str), datetime_str), # type: ignore # because of the filter on the next line, this will only be called with a str 185 filter( 186 lambda item: item, [self._cursor, self._highest_observed_cursor_field_value] 187 ), 188 ) 189 ) 190 self._cursor = ( 191 cursor_value_str_by_cursor_value_datetime[ 192 max(cursor_value_str_by_cursor_value_datetime.keys()) 193 ] 194 if cursor_value_str_by_cursor_value_datetime 195 else None 196 ) 197 198 def stream_slices(self) -> Iterable[StreamSlice]: 199 """ 200 Partition the daterange into slices of size = step. 201 202 The start of the window is the minimum datetime between start_datetime - lookback_window and the stream_state's datetime 203 The end of the window is the minimum datetime between the start of the window and end_datetime. 204 205 :return: 206 """ 207 end_datetime = self.select_best_end_datetime() 208 start_datetime = self._calculate_earliest_possible_value(self.select_best_end_datetime()) 209 return self._partition_daterange(start_datetime, end_datetime, self._step) 210 211 def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: 212 # Datetime based cursors operate over slices made up of datetime ranges. Stream state is based on the progress 213 # through each slice and does not belong to a specific slice. We just return stream state as it is. 214 return self.get_stream_state() 215 216 def _calculate_earliest_possible_value( 217 self, end_datetime: datetime.datetime 218 ) -> datetime.datetime: 219 lookback_delta = self._parse_timedelta( 220 self._lookback_window.eval(self.config) if self._lookback_window else "P0D" 221 ) 222 earliest_possible_start_datetime = min( 223 self._start_datetime.get_datetime(self.config), end_datetime 224 ) 225 try: 226 cursor_datetime = ( 227 self._calculate_cursor_datetime_from_state(self.get_stream_state()) - lookback_delta 228 ) 229 except OverflowError: 230 # cursor_datetime defers to the minimum date if it does not exist in the state. Trying to subtract 231 # a timedelta from the minimum datetime results in an OverflowError 232 cursor_datetime = self._calculate_cursor_datetime_from_state(self.get_stream_state()) 233 return max(earliest_possible_start_datetime, cursor_datetime) 234 235 def select_best_end_datetime(self) -> datetime.datetime: 236 """ 237 Returns the optimal end datetime. 238 This method compares the current datetime with a pre-configured end datetime 239 and returns the earlier of the two. If no pre-configured end datetime is set, 240 the current datetime is returned. 241 242 :return datetime.datetime: The best end datetime, which is either the current datetime or the pre-configured end datetime, whichever is earlier. 243 """ 244 now = datetime.datetime.now(tz=self._timezone) 245 if not self._end_datetime: 246 return now 247 return min(self._end_datetime.get_datetime(self.config), now) 248 249 def _calculate_cursor_datetime_from_state( 250 self, stream_state: Mapping[str, Any] 251 ) -> datetime.datetime: 252 if self.cursor_field.eval(self.config, stream_state=stream_state) in stream_state: # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__ 253 return self.parse_date(stream_state[self.cursor_field.eval(self.config)]) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__ 254 return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc) 255 256 def _format_datetime(self, dt: datetime.datetime) -> str: 257 return self._parser.format(dt, self.datetime_format) 258 259 def _partition_daterange( 260 self, 261 start: datetime.datetime, 262 end: datetime.datetime, 263 step: Union[datetime.timedelta, Duration], 264 ) -> List[StreamSlice]: 265 start_field = self._partition_field_start.eval(self.config) 266 end_field = self._partition_field_end.eval(self.config) 267 dates = [] 268 269 while self._is_within_date_range(start, end): 270 next_start = self._evaluate_next_start_date_safely(start, step) 271 end_date = self._get_date(next_start - self._cursor_granularity, end, min) 272 dates.append( 273 StreamSlice( 274 partition={}, 275 cursor_slice={ 276 start_field: self._format_datetime(start), 277 end_field: self._format_datetime(end_date), 278 }, 279 ) 280 ) 281 start = next_start 282 return dates 283 284 def _is_within_date_range(self, start: datetime.datetime, end: datetime.datetime) -> bool: 285 if self.is_compare_strictly: 286 return start < end 287 return start <= end 288 289 def _evaluate_next_start_date_safely( 290 self, start: datetime.datetime, step: datetime.timedelta 291 ) -> datetime.datetime: 292 """ 293 Given that we set the default step at datetime.timedelta.max, we will generate an OverflowError when evaluating the next start_date 294 This method assumes that users would never enter a step that would generate an overflow. Given that would be the case, the code 295 would have broken anyway. 296 """ 297 try: 298 return start + step 299 except OverflowError: 300 return datetime.datetime.max.replace(tzinfo=datetime.timezone.utc) 301 302 def _get_date( 303 self, 304 cursor_value: datetime.datetime, 305 default_date: datetime.datetime, 306 comparator: Callable[[datetime.datetime, datetime.datetime], datetime.datetime], 307 ) -> datetime.datetime: 308 cursor_date = cursor_value or default_date 309 return comparator(cursor_date, default_date) 310 311 def parse_date(self, date: str) -> datetime.datetime: 312 for datetime_format in self.cursor_datetime_formats + [self.datetime_format]: 313 try: 314 return self._parser.parse(date, datetime_format) 315 except ValueError: 316 pass 317 raise ValueError(f"No format in {self.cursor_datetime_formats} matching {date}") 318 319 @classmethod 320 def _parse_timedelta(cls, time_str: Optional[str]) -> Union[datetime.timedelta, Duration]: 321 """ 322 :return Parses an ISO 8601 durations into datetime.timedelta or Duration objects. 323 """ 324 if not time_str: 325 return datetime.timedelta(0) 326 return parse_duration(time_str) 327 328 def get_request_params( 329 self, 330 *, 331 stream_state: Optional[StreamState] = None, 332 stream_slice: Optional[StreamSlice] = None, 333 next_page_token: Optional[Mapping[str, Any]] = None, 334 ) -> Mapping[str, Any]: 335 return self._get_request_options(RequestOptionType.request_parameter, stream_slice) 336 337 def get_request_headers( 338 self, 339 *, 340 stream_state: Optional[StreamState] = None, 341 stream_slice: Optional[StreamSlice] = None, 342 next_page_token: Optional[Mapping[str, Any]] = None, 343 ) -> Mapping[str, Any]: 344 return self._get_request_options(RequestOptionType.header, stream_slice) 345 346 def get_request_body_data( 347 self, 348 *, 349 stream_state: Optional[StreamState] = None, 350 stream_slice: Optional[StreamSlice] = None, 351 next_page_token: Optional[Mapping[str, Any]] = None, 352 ) -> Mapping[str, Any]: 353 return self._get_request_options(RequestOptionType.body_data, stream_slice) 354 355 def get_request_body_json( 356 self, 357 *, 358 stream_state: Optional[StreamState] = None, 359 stream_slice: Optional[StreamSlice] = None, 360 next_page_token: Optional[Mapping[str, Any]] = None, 361 ) -> Mapping[str, Any]: 362 return self._get_request_options(RequestOptionType.body_json, stream_slice) 363 364 def request_kwargs(self) -> Mapping[str, Any]: 365 # Never update kwargs 366 return {} 367 368 def _get_request_options( 369 self, option_type: RequestOptionType, stream_slice: Optional[StreamSlice] 370 ) -> Mapping[str, Any]: 371 options: MutableMapping[str, Any] = {} 372 if not stream_slice: 373 return options 374 375 if self.start_time_option and self.start_time_option.inject_into == option_type: 376 start_time_value = stream_slice.get(self._partition_field_start.eval(self.config)) 377 self.start_time_option.inject_into_request(options, start_time_value, self.config) 378 379 if self.end_time_option and self.end_time_option.inject_into == option_type: 380 end_time_value = stream_slice.get(self._partition_field_end.eval(self.config)) 381 self.end_time_option.inject_into_request(options, end_time_value, self.config) 382 383 return options 384 385 def should_be_synced(self, record: Record) -> bool: 386 cursor_field = self.cursor_field.eval(self.config) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__ 387 record_cursor_value = record.get(cursor_field) 388 if not record_cursor_value: 389 self._send_log( 390 Level.WARN, 391 f"Could not find cursor field `{cursor_field}` in record. The incremental sync will assume it needs to be synced", 392 ) 393 return True 394 latest_possible_cursor_value = self.select_best_end_datetime() 395 earliest_possible_cursor_value = self._calculate_earliest_possible_value( 396 latest_possible_cursor_value 397 ) 398 return self._is_within_daterange_boundaries( 399 record, earliest_possible_cursor_value, latest_possible_cursor_value 400 ) 401 402 def _is_within_daterange_boundaries( 403 self, 404 record: Record, 405 start_datetime_boundary: Union[datetime.datetime, str], 406 end_datetime_boundary: Union[datetime.datetime, str], 407 ) -> bool: 408 cursor_field = self.cursor_field.eval(self.config) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__ 409 record_cursor_value = record.get(cursor_field) 410 if not record_cursor_value: 411 self._send_log( 412 Level.WARN, 413 f"Could not find cursor field `{cursor_field}` in record. The record will not be considered when emitting sync state", 414 ) 415 return False 416 if isinstance(start_datetime_boundary, str): 417 start_datetime_boundary = self.parse_date(start_datetime_boundary) 418 if isinstance(end_datetime_boundary, str): 419 end_datetime_boundary = self.parse_date(end_datetime_boundary) 420 return ( 421 start_datetime_boundary <= self.parse_date(record_cursor_value) <= end_datetime_boundary 422 ) 423 424 def _send_log(self, level: Level, message: str) -> None: 425 if self.message_repository: 426 self.message_repository.emit_message( 427 AirbyteMessage( 428 type=Type.LOG, 429 log=AirbyteLogMessage(level=level, message=message), 430 ) 431 ) 432 433 def is_greater_than_or_equal(self, first: Record, second: Record) -> bool: 434 cursor_field = self.cursor_field.eval(self.config) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__ 435 first_cursor_value = first.get(cursor_field) 436 second_cursor_value = second.get(cursor_field) 437 if first_cursor_value and second_cursor_value: 438 return self.parse_date(first_cursor_value) >= self.parse_date(second_cursor_value) 439 elif first_cursor_value: 440 return True 441 else: 442 return False 443 444 def set_runtime_lookback_window(self, lookback_window_in_seconds: int) -> None: 445 """ 446 Updates the lookback window based on a given number of seconds if the new duration 447 is greater than the currently configured lookback window. 448 449 :param lookback_window_in_seconds: The lookback duration in seconds to potentially update to. 450 """ 451 runtime_lookback_window = duration_isoformat(timedelta(seconds=lookback_window_in_seconds)) 452 config_lookback = parse_duration( 453 self._lookback_window.eval(self.config) if self._lookback_window else "P0D" 454 ) 455 456 # Check if the new runtime lookback window is greater than the current config lookback 457 if parse_duration(runtime_lookback_window) > config_lookback: 458 self._lookback_window = InterpolatedString.create( 459 runtime_lookback_window, parameters={} 460 )
Slices the stream over a datetime range and create a state with format {
Given a start time, end time, a step function, and an optional lookback window, the stream slicer will partition the date range from start time - lookback window to end time.
The step function is defined as a string of the form ISO8601 duration
The timestamp format accepts the same format codes as datetime.strfptime, which are all the format codes required by the 1989 C standard. Full list of accepted format codes: https://man7.org/linux/man-pages/man3/strftime.3.html
Attributes:
- start_datetime (Union[MinMaxDatetime, str]): the datetime that determines the earliest record that should be synced
- end_datetime (Optional[Union[MinMaxDatetime, str]]): the datetime that determines the last record that should be synced
- cursor_field (Union[InterpolatedString, str]): record's cursor field
- datetime_format (str): format of the datetime
- step (Optional[str]): size of the timewindow (ISO8601 duration)
- cursor_granularity (Optional[str]): smallest increment the datetime_format has (ISO 8601 duration) that will be used to ensure that the start of a slice does not overlap with the end of the previous one
- config (Config): connection config
- start_time_option (Optional[RequestOption]): request option for start time
- end_time_option (Optional[RequestOption]): request option for end time
- partition_field_start (Optional[str]): partition start time field
- partition_field_end (Optional[str]): stream slice end time field
- lookback_window (Optional[InterpolatedString]): how many days before start_datetime to read data for (ISO8601 duration)
131 def get_stream_state(self) -> StreamState: 132 return {self.cursor_field.eval(self.config): self._cursor} if self._cursor else {} # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__
Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it is used for two things:
- Interpolation of the requests
- Transformation of records
- Saving the state
For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that allows for emitting the state to the platform.
134 def set_initial_state(self, stream_state: StreamState) -> None: 135 """ 136 Cursors are not initialized with their state. As state is needed in order to function properly, this method should be called 137 before calling anything else 138 139 :param stream_state: The state of the stream as returned by get_stream_state 140 """ 141 self._cursor = ( 142 stream_state.get(self.cursor_field.eval(self.config)) if stream_state else None # type: ignore [union-attr] 143 )
Cursors are not initialized with their state. As state is needed in order to function properly, this method should be called before calling anything else
Parameters
- stream_state: The state of the stream as returned by get_stream_state
145 def observe(self, stream_slice: StreamSlice, record: Record) -> None: 146 """ 147 Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read. 148 149 :param stream_slice: The current slice, which may or may not contain the most recently observed record 150 :param record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the 151 stream state may need to be deferred depending on whether the source reliably orders records by the cursor field. 152 """ 153 record_cursor_value = record.get(self.cursor_field.eval(self.config)) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__ 154 # if the current record has no cursor value, we cannot meaningfully update the state based on it, so there is nothing more to do 155 if not record_cursor_value: 156 return 157 158 start_field = self._partition_field_start.eval(self.config) 159 end_field = self._partition_field_end.eval(self.config) 160 is_highest_observed_cursor_value = ( 161 not self._highest_observed_cursor_field_value 162 or self.parse_date(record_cursor_value) 163 > self.parse_date(self._highest_observed_cursor_field_value) 164 ) 165 if ( 166 self._is_within_daterange_boundaries( 167 record, 168 stream_slice.get(start_field), # type: ignore [arg-type] 169 stream_slice.get(end_field), # type: ignore [arg-type] 170 ) 171 and is_highest_observed_cursor_value 172 ): 173 self._highest_observed_cursor_field_value = record_cursor_value
Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read.
Parameters
- stream_slice: The current slice, which may or may not contain the most recently observed record
- record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the stream state may need to be deferred depending on whether the source reliably orders records by the cursor field.
175 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 176 if stream_slice.partition: 177 raise ValueError( 178 f"Stream slice {stream_slice} should not have a partition. Got {stream_slice.partition}." 179 ) 180 cursor_value_str_by_cursor_value_datetime = dict( 181 map( 182 # we need to ensure the cursor value is preserved as is in the state else the CATs might complain of something like 183 # 2023-01-04T17:30:19.000Z' <= '2023-01-04T17:30:19.000000Z' 184 lambda datetime_str: (self.parse_date(datetime_str), datetime_str), # type: ignore # because of the filter on the next line, this will only be called with a str 185 filter( 186 lambda item: item, [self._cursor, self._highest_observed_cursor_field_value] 187 ), 188 ) 189 ) 190 self._cursor = ( 191 cursor_value_str_by_cursor_value_datetime[ 192 max(cursor_value_str_by_cursor_value_datetime.keys()) 193 ] 194 if cursor_value_str_by_cursor_value_datetime 195 else None 196 )
Update state based on the stream slice. Note that stream_slice.cursor_slice
and most_recent_record.associated_slice
are expected
to be the same but we make it explicit here that stream_slice
should be leveraged to update the state. We do not pass in the
latest record, since cursor instances should maintain the relevant internal state on their own.
Parameters
- stream_slice: slice to close
198 def stream_slices(self) -> Iterable[StreamSlice]: 199 """ 200 Partition the daterange into slices of size = step. 201 202 The start of the window is the minimum datetime between start_datetime - lookback_window and the stream_state's datetime 203 The end of the window is the minimum datetime between the start of the window and end_datetime. 204 205 :return: 206 """ 207 end_datetime = self.select_best_end_datetime() 208 start_datetime = self._calculate_earliest_possible_value(self.select_best_end_datetime()) 209 return self._partition_daterange(start_datetime, end_datetime, self._step)
Partition the daterange into slices of size = step.
The start of the window is the minimum datetime between start_datetime - lookback_window and the stream_state's datetime The end of the window is the minimum datetime between the start of the window and end_datetime.
Returns
211 def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: 212 # Datetime based cursors operate over slices made up of datetime ranges. Stream state is based on the progress 213 # through each slice and does not belong to a specific slice. We just return stream state as it is. 214 return self.get_stream_state()
Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of a specific parent delineated by the incoming slice's partition object.
235 def select_best_end_datetime(self) -> datetime.datetime: 236 """ 237 Returns the optimal end datetime. 238 This method compares the current datetime with a pre-configured end datetime 239 and returns the earlier of the two. If no pre-configured end datetime is set, 240 the current datetime is returned. 241 242 :return datetime.datetime: The best end datetime, which is either the current datetime or the pre-configured end datetime, whichever is earlier. 243 """ 244 now = datetime.datetime.now(tz=self._timezone) 245 if not self._end_datetime: 246 return now 247 return min(self._end_datetime.get_datetime(self.config), now)
Returns the optimal end datetime. This method compares the current datetime with a pre-configured end datetime and returns the earlier of the two. If no pre-configured end datetime is set, the current datetime is returned.
Returns
The best end datetime, which is either the current datetime or the pre-configured end datetime, whichever is earlier.
311 def parse_date(self, date: str) -> datetime.datetime: 312 for datetime_format in self.cursor_datetime_formats + [self.datetime_format]: 313 try: 314 return self._parser.parse(date, datetime_format) 315 except ValueError: 316 pass 317 raise ValueError(f"No format in {self.cursor_datetime_formats} matching {date}")
328 def get_request_params( 329 self, 330 *, 331 stream_state: Optional[StreamState] = None, 332 stream_slice: Optional[StreamSlice] = None, 333 next_page_token: Optional[Mapping[str, Any]] = None, 334 ) -> Mapping[str, Any]: 335 return self._get_request_options(RequestOptionType.request_parameter, stream_slice)
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
337 def get_request_headers( 338 self, 339 *, 340 stream_state: Optional[StreamState] = None, 341 stream_slice: Optional[StreamSlice] = None, 342 next_page_token: Optional[Mapping[str, Any]] = None, 343 ) -> Mapping[str, Any]: 344 return self._get_request_options(RequestOptionType.header, stream_slice)
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
346 def get_request_body_data( 347 self, 348 *, 349 stream_state: Optional[StreamState] = None, 350 stream_slice: Optional[StreamSlice] = None, 351 next_page_token: Optional[Mapping[str, Any]] = None, 352 ) -> Mapping[str, Any]: 353 return self._get_request_options(RequestOptionType.body_data, stream_slice)
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
355 def get_request_body_json( 356 self, 357 *, 358 stream_state: Optional[StreamState] = None, 359 stream_slice: Optional[StreamSlice] = None, 360 next_page_token: Optional[Mapping[str, Any]] = None, 361 ) -> Mapping[str, Any]: 362 return self._get_request_options(RequestOptionType.body_json, stream_slice)
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
385 def should_be_synced(self, record: Record) -> bool: 386 cursor_field = self.cursor_field.eval(self.config) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__ 387 record_cursor_value = record.get(cursor_field) 388 if not record_cursor_value: 389 self._send_log( 390 Level.WARN, 391 f"Could not find cursor field `{cursor_field}` in record. The incremental sync will assume it needs to be synced", 392 ) 393 return True 394 latest_possible_cursor_value = self.select_best_end_datetime() 395 earliest_possible_cursor_value = self._calculate_earliest_possible_value( 396 latest_possible_cursor_value 397 ) 398 return self._is_within_daterange_boundaries( 399 record, earliest_possible_cursor_value, latest_possible_cursor_value 400 )
Evaluating if a record should be synced allows for filtering and stop condition on pagination
433 def is_greater_than_or_equal(self, first: Record, second: Record) -> bool: 434 cursor_field = self.cursor_field.eval(self.config) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__ 435 first_cursor_value = first.get(cursor_field) 436 second_cursor_value = second.get(cursor_field) 437 if first_cursor_value and second_cursor_value: 438 return self.parse_date(first_cursor_value) >= self.parse_date(second_cursor_value) 439 elif first_cursor_value: 440 return True 441 else: 442 return False
Evaluating which record is greater in terms of cursor. This is used to avoid having to capture all the records to close a slice
444 def set_runtime_lookback_window(self, lookback_window_in_seconds: int) -> None: 445 """ 446 Updates the lookback window based on a given number of seconds if the new duration 447 is greater than the currently configured lookback window. 448 449 :param lookback_window_in_seconds: The lookback duration in seconds to potentially update to. 450 """ 451 runtime_lookback_window = duration_isoformat(timedelta(seconds=lookback_window_in_seconds)) 452 config_lookback = parse_duration( 453 self._lookback_window.eval(self.config) if self._lookback_window else "P0D" 454 ) 455 456 # Check if the new runtime lookback window is greater than the current config lookback 457 if parse_duration(runtime_lookback_window) > config_lookback: 458 self._lookback_window = InterpolatedString.create( 459 runtime_lookback_window, parameters={} 460 )
Updates the lookback window based on a given number of seconds if the new duration is greater than the currently configured lookback window.
Parameters
- lookback_window_in_seconds: The lookback duration in seconds to potentially update to.
10class DeclarativeCursor(Cursor, StreamSlicer, ABC): 11 """ 12 DeclarativeCursors are components that allow for checkpointing syncs. In addition to managing the fetching and updating of 13 state, declarative cursors also manage stream slicing and injecting slice values into outbound requests. 14 """
DeclarativeCursors are components that allow for checkpointing syncs. In addition to managing the fetching and updating of state, declarative cursors also manage stream slicing and injecting slice values into outbound requests.
Inherited Members
72class GlobalSubstreamCursor(DeclarativeCursor): 73 """ 74 The GlobalSubstreamCursor is designed to track the state of substreams using a single global cursor. 75 This class is beneficial for streams with many partitions, as it allows the state to be managed globally 76 instead of per partition, simplifying state management and reducing the size of state messages. 77 78 This cursor is activated by setting the `global_substream_cursor` parameter for incremental sync. 79 80 Warnings: 81 - This class enforces a minimal lookback window for substream based on the duration of the previous sync to avoid losing records. This lookback ensures that any records added or updated during the sync are captured in subsequent syncs. 82 - The global cursor is updated only at the end of the sync. If the sync ends prematurely (e.g., due to an exception), the state will not be updated. 83 - When using the `incremental_dependency` option, the sync will progress through parent records, preventing the sync from getting infinitely stuck. However, it is crucial to understand the requirements for both the `global_substream_cursor` and `incremental_dependency` options to avoid data loss. 84 """ 85 86 def __init__(self, stream_cursor: DatetimeBasedCursor, partition_router: PartitionRouter): 87 self._stream_cursor = stream_cursor 88 self._partition_router = partition_router 89 self._timer = Timer() 90 self._lock = threading.Lock() 91 self._slice_semaphore = threading.Semaphore( 92 0 93 ) # Start with 0, indicating no slices being tracked 94 self._all_slices_yielded = False 95 self._lookback_window: Optional[int] = None 96 self._current_partition: Optional[Mapping[str, Any]] = None 97 self._last_slice: bool = False 98 self._parent_state: Optional[Mapping[str, Any]] = None 99 100 def start_slices_generation(self) -> None: 101 self._timer.start() 102 103 def stream_slices(self) -> Iterable[StreamSlice]: 104 """ 105 Generates stream slices, ensuring the last slice is properly flagged and processed. 106 107 This method creates a sequence of stream slices by iterating over partitions and cursor slices. 108 It holds onto one slice in memory to set `_all_slices_yielded` to `True` before yielding the 109 final slice. A semaphore is used to track the processing of slices, ensuring that `close_slice` 110 is called only after all slices have been processed. 111 112 We expect the following events: 113 * Yields all the slices except the last one. At this point, `close_slice` won't actually close the global slice as `self._all_slices_yielded == False` 114 * Release the semaphore one last time before setting `self._all_slices_yielded = True`. This will cause `close_slice` to know about all the slices before we indicate that all slices have been yielded so the left side of `if self._all_slices_yielded and self._slice_semaphore._value == 0` will be false if not everything is closed 115 * Setting `self._all_slices_yielded = True`. We do that before actually yielding the last slice as the caller of `stream_slices` might stop iterating at any point and hence the code after `yield` might not be executed 116 * Yield the last slice. At that point, once there are as many slices yielded as closes, the global slice will be closed too 117 """ 118 slice_generator = ( 119 StreamSlice( 120 partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields 121 ) 122 for partition in self._partition_router.stream_slices() 123 for cursor_slice in self._stream_cursor.stream_slices() 124 ) 125 126 self.start_slices_generation() 127 for slice, last, state in iterate_with_last_flag_and_state( 128 slice_generator, self._partition_router.get_stream_state 129 ): 130 self._parent_state = state 131 self.register_slice(last) 132 yield slice 133 self._parent_state = self._partition_router.get_stream_state() 134 135 def generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[StreamSlice]: 136 slice_generator = ( 137 StreamSlice( 138 partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields 139 ) 140 for cursor_slice in self._stream_cursor.stream_slices() 141 ) 142 143 yield from slice_generator 144 145 def register_slice(self, last: bool) -> None: 146 """ 147 Tracks the processing of a stream slice. 148 149 Releases the semaphore for each slice. If it's the last slice (`last=True`), 150 sets `_all_slices_yielded` to `True` to indicate no more slices will be processed. 151 152 Args: 153 last (bool): True if the current slice is the last in the sequence. 154 """ 155 self._slice_semaphore.release() 156 if last: 157 self._all_slices_yielded = True 158 159 def set_initial_state(self, stream_state: StreamState) -> None: 160 """ 161 Set the initial state for the cursors. 162 163 This method initializes the state for the global cursor using the provided stream state. 164 165 Additionally, it sets the parent state for partition routers that are based on parent streams. If a partition router 166 does not have parent streams, this step will be skipped due to the default PartitionRouter implementation. 167 168 Args: 169 stream_state (StreamState): The state of the streams to be set. The format of the stream state should be: 170 { 171 "state": { 172 "last_updated": "2023-05-27T00:00:00Z" 173 }, 174 "parent_state": { 175 "parent_stream_name": { 176 "last_updated": "2023-05-27T00:00:00Z" 177 } 178 }, 179 "lookback_window": 132 180 } 181 """ 182 if not stream_state: 183 return 184 185 if "lookback_window" in stream_state: 186 self._lookback_window = stream_state["lookback_window"] 187 self._inject_lookback_into_stream_cursor(stream_state["lookback_window"]) 188 189 if "state" in stream_state: 190 self._stream_cursor.set_initial_state(stream_state["state"]) 191 elif "states" not in stream_state: 192 # We assume that `stream_state` is in the old global format 193 # Example: {"global_state_format_key": "global_state_format_value"} 194 self._stream_cursor.set_initial_state(stream_state) 195 196 # Set parent state for partition routers based on parent streams 197 self._partition_router.set_initial_state(stream_state) 198 199 def _inject_lookback_into_stream_cursor(self, lookback_window: int) -> None: 200 """ 201 Modifies the stream cursor's lookback window based on the duration of the previous sync. 202 This adjustment ensures the cursor is set to the minimal lookback window necessary for 203 avoiding missing data. 204 205 Parameters: 206 lookback_window (int): The lookback duration in seconds to be set, derived from 207 the previous sync. 208 209 Raises: 210 ValueError: If the cursor does not support dynamic lookback window adjustments. 211 """ 212 if hasattr(self._stream_cursor, "set_runtime_lookback_window"): 213 self._stream_cursor.set_runtime_lookback_window(lookback_window) 214 else: 215 raise ValueError( 216 "The cursor class for Global Substream Cursor does not have a set_runtime_lookback_window method" 217 ) 218 219 def observe(self, stream_slice: StreamSlice, record: Record) -> None: 220 self._stream_cursor.observe( 221 StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), record 222 ) 223 224 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 225 """ 226 Close the current stream slice. 227 228 This method is called when a stream slice is completed. For the global parent cursor, we close the child cursor 229 only after reading all slices. This ensures that we do not miss any child records from a later parent record 230 if the child cursor is earlier than a record from the first parent record. 231 232 Args: 233 stream_slice (StreamSlice): The stream slice to be closed. 234 *args (Any): Additional arguments. 235 """ 236 with self._lock: 237 self._slice_semaphore.acquire() 238 if self._all_slices_yielded and self._slice_semaphore._value == 0: 239 self._lookback_window = self._timer.finish() 240 self._stream_cursor.close_slice( 241 StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), *args 242 ) 243 244 def get_stream_state(self) -> StreamState: 245 state: dict[str, Any] = {"state": self._stream_cursor.get_stream_state()} 246 247 if self._parent_state: 248 state["parent_state"] = self._parent_state 249 250 if self._lookback_window is not None: 251 state["lookback_window"] = self._lookback_window 252 253 return state 254 255 def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: 256 # stream_slice is ignored as cursor is global 257 return self._stream_cursor.get_stream_state() 258 259 def get_request_params( 260 self, 261 *, 262 stream_state: Optional[StreamState] = None, 263 stream_slice: Optional[StreamSlice] = None, 264 next_page_token: Optional[Mapping[str, Any]] = None, 265 ) -> Mapping[str, Any]: 266 if stream_slice: 267 return self._partition_router.get_request_params( # type: ignore # this always returns a mapping 268 stream_state=stream_state, 269 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 270 next_page_token=next_page_token, 271 ) | self._stream_cursor.get_request_params( 272 stream_state=stream_state, 273 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 274 next_page_token=next_page_token, 275 ) 276 else: 277 raise ValueError("A partition needs to be provided in order to get request params") 278 279 def get_request_headers( 280 self, 281 *, 282 stream_state: Optional[StreamState] = None, 283 stream_slice: Optional[StreamSlice] = None, 284 next_page_token: Optional[Mapping[str, Any]] = None, 285 ) -> Mapping[str, Any]: 286 if stream_slice: 287 return self._partition_router.get_request_headers( # type: ignore # this always returns a mapping 288 stream_state=stream_state, 289 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 290 next_page_token=next_page_token, 291 ) | self._stream_cursor.get_request_headers( 292 stream_state=stream_state, 293 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 294 next_page_token=next_page_token, 295 ) 296 else: 297 raise ValueError("A partition needs to be provided in order to get request headers") 298 299 def get_request_body_data( 300 self, 301 *, 302 stream_state: Optional[StreamState] = None, 303 stream_slice: Optional[StreamSlice] = None, 304 next_page_token: Optional[Mapping[str, Any]] = None, 305 ) -> Union[Mapping[str, Any], str]: 306 if stream_slice: 307 return self._partition_router.get_request_body_data( # type: ignore # this always returns a mapping 308 stream_state=stream_state, 309 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 310 next_page_token=next_page_token, 311 ) | self._stream_cursor.get_request_body_data( 312 stream_state=stream_state, 313 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 314 next_page_token=next_page_token, 315 ) 316 else: 317 raise ValueError("A partition needs to be provided in order to get request body data") 318 319 def get_request_body_json( 320 self, 321 *, 322 stream_state: Optional[StreamState] = None, 323 stream_slice: Optional[StreamSlice] = None, 324 next_page_token: Optional[Mapping[str, Any]] = None, 325 ) -> Mapping[str, Any]: 326 if stream_slice: 327 return self._partition_router.get_request_body_json( # type: ignore # this always returns a mapping 328 stream_state=stream_state, 329 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 330 next_page_token=next_page_token, 331 ) | self._stream_cursor.get_request_body_json( 332 stream_state=stream_state, 333 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 334 next_page_token=next_page_token, 335 ) 336 else: 337 raise ValueError("A partition needs to be provided in order to get request body json") 338 339 def should_be_synced(self, record: Record) -> bool: 340 return self._stream_cursor.should_be_synced(self._convert_record_to_cursor_record(record)) 341 342 def is_greater_than_or_equal(self, first: Record, second: Record) -> bool: 343 return self._stream_cursor.is_greater_than_or_equal( 344 self._convert_record_to_cursor_record(first), 345 self._convert_record_to_cursor_record(second), 346 ) 347 348 @staticmethod 349 def _convert_record_to_cursor_record(record: Record) -> Record: 350 return Record( 351 data=record.data, 352 stream_name=record.stream_name, 353 associated_slice=StreamSlice( 354 partition={}, cursor_slice=record.associated_slice.cursor_slice 355 ) 356 if record.associated_slice 357 else None, 358 )
The GlobalSubstreamCursor is designed to track the state of substreams using a single global cursor. This class is beneficial for streams with many partitions, as it allows the state to be managed globally instead of per partition, simplifying state management and reducing the size of state messages.
This cursor is activated by setting the global_substream_cursor
parameter for incremental sync.
Warnings:
- This class enforces a minimal lookback window for substream based on the duration of the previous sync to avoid losing records. This lookback ensures that any records added or updated during the sync are captured in subsequent syncs.
- The global cursor is updated only at the end of the sync. If the sync ends prematurely (e.g., due to an exception), the state will not be updated.
- When using the
incremental_dependency
option, the sync will progress through parent records, preventing the sync from getting infinitely stuck. However, it is crucial to understand the requirements for both theglobal_substream_cursor
andincremental_dependency
options to avoid data loss.
86 def __init__(self, stream_cursor: DatetimeBasedCursor, partition_router: PartitionRouter): 87 self._stream_cursor = stream_cursor 88 self._partition_router = partition_router 89 self._timer = Timer() 90 self._lock = threading.Lock() 91 self._slice_semaphore = threading.Semaphore( 92 0 93 ) # Start with 0, indicating no slices being tracked 94 self._all_slices_yielded = False 95 self._lookback_window: Optional[int] = None 96 self._current_partition: Optional[Mapping[str, Any]] = None 97 self._last_slice: bool = False 98 self._parent_state: Optional[Mapping[str, Any]] = None
103 def stream_slices(self) -> Iterable[StreamSlice]: 104 """ 105 Generates stream slices, ensuring the last slice is properly flagged and processed. 106 107 This method creates a sequence of stream slices by iterating over partitions and cursor slices. 108 It holds onto one slice in memory to set `_all_slices_yielded` to `True` before yielding the 109 final slice. A semaphore is used to track the processing of slices, ensuring that `close_slice` 110 is called only after all slices have been processed. 111 112 We expect the following events: 113 * Yields all the slices except the last one. At this point, `close_slice` won't actually close the global slice as `self._all_slices_yielded == False` 114 * Release the semaphore one last time before setting `self._all_slices_yielded = True`. This will cause `close_slice` to know about all the slices before we indicate that all slices have been yielded so the left side of `if self._all_slices_yielded and self._slice_semaphore._value == 0` will be false if not everything is closed 115 * Setting `self._all_slices_yielded = True`. We do that before actually yielding the last slice as the caller of `stream_slices` might stop iterating at any point and hence the code after `yield` might not be executed 116 * Yield the last slice. At that point, once there are as many slices yielded as closes, the global slice will be closed too 117 """ 118 slice_generator = ( 119 StreamSlice( 120 partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields 121 ) 122 for partition in self._partition_router.stream_slices() 123 for cursor_slice in self._stream_cursor.stream_slices() 124 ) 125 126 self.start_slices_generation() 127 for slice, last, state in iterate_with_last_flag_and_state( 128 slice_generator, self._partition_router.get_stream_state 129 ): 130 self._parent_state = state 131 self.register_slice(last) 132 yield slice 133 self._parent_state = self._partition_router.get_stream_state()
Generates stream slices, ensuring the last slice is properly flagged and processed.
This method creates a sequence of stream slices by iterating over partitions and cursor slices.
It holds onto one slice in memory to set _all_slices_yielded
to True
before yielding the
final slice. A semaphore is used to track the processing of slices, ensuring that close_slice
is called only after all slices have been processed.
We expect the following events:
- Yields all the slices except the last one. At this point,
close_slice
won't actually close the global slice asself._all_slices_yielded == False
- Release the semaphore one last time before setting
self._all_slices_yielded = True
. This will causeclose_slice
to know about all the slices before we indicate that all slices have been yielded so the left side ofif self._all_slices_yielded and self._slice_semaphore._value == 0
will be false if not everything is closed - Setting
self._all_slices_yielded = True
. We do that before actually yielding the last slice as the caller ofstream_slices
might stop iterating at any point and hence the code afteryield
might not be executed - Yield the last slice. At that point, once there are as many slices yielded as closes, the global slice will be closed too
135 def generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[StreamSlice]: 136 slice_generator = ( 137 StreamSlice( 138 partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields 139 ) 140 for cursor_slice in self._stream_cursor.stream_slices() 141 ) 142 143 yield from slice_generator
145 def register_slice(self, last: bool) -> None: 146 """ 147 Tracks the processing of a stream slice. 148 149 Releases the semaphore for each slice. If it's the last slice (`last=True`), 150 sets `_all_slices_yielded` to `True` to indicate no more slices will be processed. 151 152 Args: 153 last (bool): True if the current slice is the last in the sequence. 154 """ 155 self._slice_semaphore.release() 156 if last: 157 self._all_slices_yielded = True
Tracks the processing of a stream slice.
Releases the semaphore for each slice. If it's the last slice (last=True
),
sets _all_slices_yielded
to True
to indicate no more slices will be processed.
Arguments:
- last (bool): True if the current slice is the last in the sequence.
159 def set_initial_state(self, stream_state: StreamState) -> None: 160 """ 161 Set the initial state for the cursors. 162 163 This method initializes the state for the global cursor using the provided stream state. 164 165 Additionally, it sets the parent state for partition routers that are based on parent streams. If a partition router 166 does not have parent streams, this step will be skipped due to the default PartitionRouter implementation. 167 168 Args: 169 stream_state (StreamState): The state of the streams to be set. The format of the stream state should be: 170 { 171 "state": { 172 "last_updated": "2023-05-27T00:00:00Z" 173 }, 174 "parent_state": { 175 "parent_stream_name": { 176 "last_updated": "2023-05-27T00:00:00Z" 177 } 178 }, 179 "lookback_window": 132 180 } 181 """ 182 if not stream_state: 183 return 184 185 if "lookback_window" in stream_state: 186 self._lookback_window = stream_state["lookback_window"] 187 self._inject_lookback_into_stream_cursor(stream_state["lookback_window"]) 188 189 if "state" in stream_state: 190 self._stream_cursor.set_initial_state(stream_state["state"]) 191 elif "states" not in stream_state: 192 # We assume that `stream_state` is in the old global format 193 # Example: {"global_state_format_key": "global_state_format_value"} 194 self._stream_cursor.set_initial_state(stream_state) 195 196 # Set parent state for partition routers based on parent streams 197 self._partition_router.set_initial_state(stream_state)
Set the initial state for the cursors.
This method initializes the state for the global cursor using the provided stream state.
Additionally, it sets the parent state for partition routers that are based on parent streams. If a partition router does not have parent streams, this step will be skipped due to the default PartitionRouter implementation.
Arguments:
- stream_state (StreamState): The state of the streams to be set. The format of the stream state should be: { "state": { "last_updated": "2023-05-27T00:00:00Z" }, "parent_state": { "parent_stream_name": { "last_updated": "2023-05-27T00:00:00Z" } }, "lookback_window": 132 }
219 def observe(self, stream_slice: StreamSlice, record: Record) -> None: 220 self._stream_cursor.observe( 221 StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), record 222 )
Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read.
Parameters
- stream_slice: The current slice, which may or may not contain the most recently observed record
- record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the stream state may need to be deferred depending on whether the source reliably orders records by the cursor field.
224 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 225 """ 226 Close the current stream slice. 227 228 This method is called when a stream slice is completed. For the global parent cursor, we close the child cursor 229 only after reading all slices. This ensures that we do not miss any child records from a later parent record 230 if the child cursor is earlier than a record from the first parent record. 231 232 Args: 233 stream_slice (StreamSlice): The stream slice to be closed. 234 *args (Any): Additional arguments. 235 """ 236 with self._lock: 237 self._slice_semaphore.acquire() 238 if self._all_slices_yielded and self._slice_semaphore._value == 0: 239 self._lookback_window = self._timer.finish() 240 self._stream_cursor.close_slice( 241 StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), *args 242 )
Close the current stream slice.
This method is called when a stream slice is completed. For the global parent cursor, we close the child cursor only after reading all slices. This ensures that we do not miss any child records from a later parent record if the child cursor is earlier than a record from the first parent record.
Arguments:
- stream_slice (StreamSlice): The stream slice to be closed.
- *args (Any): Additional arguments.
244 def get_stream_state(self) -> StreamState: 245 state: dict[str, Any] = {"state": self._stream_cursor.get_stream_state()} 246 247 if self._parent_state: 248 state["parent_state"] = self._parent_state 249 250 if self._lookback_window is not None: 251 state["lookback_window"] = self._lookback_window 252 253 return state
Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it is used for two things:
- Interpolation of the requests
- Transformation of records
- Saving the state
For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that allows for emitting the state to the platform.
255 def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: 256 # stream_slice is ignored as cursor is global 257 return self._stream_cursor.get_stream_state()
Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of a specific parent delineated by the incoming slice's partition object.
259 def get_request_params( 260 self, 261 *, 262 stream_state: Optional[StreamState] = None, 263 stream_slice: Optional[StreamSlice] = None, 264 next_page_token: Optional[Mapping[str, Any]] = None, 265 ) -> Mapping[str, Any]: 266 if stream_slice: 267 return self._partition_router.get_request_params( # type: ignore # this always returns a mapping 268 stream_state=stream_state, 269 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 270 next_page_token=next_page_token, 271 ) | self._stream_cursor.get_request_params( 272 stream_state=stream_state, 273 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 274 next_page_token=next_page_token, 275 ) 276 else: 277 raise ValueError("A partition needs to be provided in order to get request params")
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
279 def get_request_headers( 280 self, 281 *, 282 stream_state: Optional[StreamState] = None, 283 stream_slice: Optional[StreamSlice] = None, 284 next_page_token: Optional[Mapping[str, Any]] = None, 285 ) -> Mapping[str, Any]: 286 if stream_slice: 287 return self._partition_router.get_request_headers( # type: ignore # this always returns a mapping 288 stream_state=stream_state, 289 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 290 next_page_token=next_page_token, 291 ) | self._stream_cursor.get_request_headers( 292 stream_state=stream_state, 293 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 294 next_page_token=next_page_token, 295 ) 296 else: 297 raise ValueError("A partition needs to be provided in order to get request headers")
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
299 def get_request_body_data( 300 self, 301 *, 302 stream_state: Optional[StreamState] = None, 303 stream_slice: Optional[StreamSlice] = None, 304 next_page_token: Optional[Mapping[str, Any]] = None, 305 ) -> Union[Mapping[str, Any], str]: 306 if stream_slice: 307 return self._partition_router.get_request_body_data( # type: ignore # this always returns a mapping 308 stream_state=stream_state, 309 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 310 next_page_token=next_page_token, 311 ) | self._stream_cursor.get_request_body_data( 312 stream_state=stream_state, 313 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 314 next_page_token=next_page_token, 315 ) 316 else: 317 raise ValueError("A partition needs to be provided in order to get request body data")
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
319 def get_request_body_json( 320 self, 321 *, 322 stream_state: Optional[StreamState] = None, 323 stream_slice: Optional[StreamSlice] = None, 324 next_page_token: Optional[Mapping[str, Any]] = None, 325 ) -> Mapping[str, Any]: 326 if stream_slice: 327 return self._partition_router.get_request_body_json( # type: ignore # this always returns a mapping 328 stream_state=stream_state, 329 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 330 next_page_token=next_page_token, 331 ) | self._stream_cursor.get_request_body_json( 332 stream_state=stream_state, 333 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 334 next_page_token=next_page_token, 335 ) 336 else: 337 raise ValueError("A partition needs to be provided in order to get request body json")
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
339 def should_be_synced(self, record: Record) -> bool: 340 return self._stream_cursor.should_be_synced(self._convert_record_to_cursor_record(record))
Evaluating if a record should be synced allows for filtering and stop condition on pagination
342 def is_greater_than_or_equal(self, first: Record, second: Record) -> bool: 343 return self._stream_cursor.is_greater_than_or_equal( 344 self._convert_record_to_cursor_record(first), 345 self._convert_record_to_cursor_record(second), 346 )
Evaluating which record is greater in terms of cursor. This is used to avoid having to capture all the records to close a slice
28class PerPartitionCursor(DeclarativeCursor): 29 """ 30 Manages state per partition when a stream has many partitions, to prevent data loss or duplication. 31 32 **Partition Limitation and Limit Reached Logic** 33 34 - **DEFAULT_MAX_PARTITIONS_NUMBER**: The maximum number of partitions to keep in memory (default is 10,000). 35 - **_cursor_per_partition**: An ordered dictionary that stores cursors for each partition. 36 - **_over_limit**: A counter that increments each time an oldest partition is removed when the limit is exceeded. 37 38 The class ensures that the number of partitions tracked does not exceed the `DEFAULT_MAX_PARTITIONS_NUMBER` to prevent excessive memory usage. 39 40 - When the number of partitions exceeds the limit, the oldest partitions are removed from `_cursor_per_partition`, and `_over_limit` is incremented accordingly. 41 - The `limit_reached` method returns `True` when `_over_limit` exceeds `DEFAULT_MAX_PARTITIONS_NUMBER`, indicating that the global cursor should be used instead of per-partition cursors. 42 43 This approach avoids unnecessary switching to a global cursor due to temporary spikes in partition counts, ensuring that switching is only done when a sustained high number of partitions is observed. 44 """ 45 46 DEFAULT_MAX_PARTITIONS_NUMBER = 10000 47 _NO_STATE: Mapping[str, Any] = {} 48 _NO_CURSOR_STATE: Mapping[str, Any] = {} 49 _KEY = 0 50 _VALUE = 1 51 _state_to_migrate_from: Mapping[str, Any] = {} 52 53 def __init__(self, cursor_factory: CursorFactory, partition_router: PartitionRouter): 54 self._cursor_factory = cursor_factory 55 self._partition_router = partition_router 56 # The dict is ordered to ensure that once the maximum number of partitions is reached, 57 # the oldest partitions can be efficiently removed, maintaining the most recent partitions. 58 self._cursor_per_partition: OrderedDict[str, DeclarativeCursor] = OrderedDict() 59 self._over_limit = 0 60 self._partition_serializer = PerPartitionKeySerializer() 61 62 def stream_slices(self) -> Iterable[StreamSlice]: 63 slices = self._partition_router.stream_slices() 64 for partition in slices: 65 yield from self.generate_slices_from_partition(partition) 66 67 def generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[StreamSlice]: 68 # Ensure the maximum number of partitions is not exceeded 69 self._ensure_partition_limit() 70 71 cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition)) 72 if not cursor: 73 partition_state = ( 74 self._state_to_migrate_from 75 if self._state_to_migrate_from 76 else self._NO_CURSOR_STATE 77 ) 78 cursor = self._create_cursor(partition_state) 79 self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor 80 81 for cursor_slice in cursor.stream_slices(): 82 yield StreamSlice( 83 partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields 84 ) 85 86 def _ensure_partition_limit(self) -> None: 87 """ 88 Ensure the maximum number of partitions is not exceeded. If so, the oldest added partition will be dropped. 89 """ 90 while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1: 91 self._over_limit += 1 92 oldest_partition = self._cursor_per_partition.popitem(last=False)[ 93 0 94 ] # Remove the oldest partition 95 logger.warning( 96 f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._over_limit}." 97 ) 98 99 def limit_reached(self) -> bool: 100 return self._over_limit > self.DEFAULT_MAX_PARTITIONS_NUMBER 101 102 def set_initial_state(self, stream_state: StreamState) -> None: 103 """ 104 Set the initial state for the cursors. 105 106 This method initializes the state for each partition cursor using the provided stream state. 107 If a partition state is provided in the stream state, it will update the corresponding partition cursor with this state. 108 109 Additionally, it sets the parent state for partition routers that are based on parent streams. If a partition router 110 does not have parent streams, this step will be skipped due to the default PartitionRouter implementation. 111 112 Args: 113 stream_state (StreamState): The state of the streams to be set. The format of the stream state should be: 114 { 115 "states": [ 116 { 117 "partition": { 118 "partition_key": "value" 119 }, 120 "cursor": { 121 "last_updated": "2023-05-27T00:00:00Z" 122 } 123 } 124 ], 125 "parent_state": { 126 "parent_stream_name": { 127 "last_updated": "2023-05-27T00:00:00Z" 128 } 129 } 130 } 131 """ 132 if not stream_state: 133 return 134 135 if "states" not in stream_state: 136 # We assume that `stream_state` is in a global format that can be applied to all partitions. 137 # Example: {"global_state_format_key": "global_state_format_value"} 138 self._state_to_migrate_from = stream_state 139 140 else: 141 for state in stream_state["states"]: 142 self._cursor_per_partition[self._to_partition_key(state["partition"])] = ( 143 self._create_cursor(state["cursor"]) 144 ) 145 146 # set default state for missing partitions if it is per partition with fallback to global 147 if "state" in stream_state: 148 self._state_to_migrate_from = stream_state["state"] 149 150 # Set parent state for partition routers based on parent streams 151 self._partition_router.set_initial_state(stream_state) 152 153 def observe(self, stream_slice: StreamSlice, record: Record) -> None: 154 self._cursor_per_partition[self._to_partition_key(stream_slice.partition)].observe( 155 StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), record 156 ) 157 158 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 159 try: 160 self._cursor_per_partition[self._to_partition_key(stream_slice.partition)].close_slice( 161 StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), *args 162 ) 163 except KeyError as exception: 164 raise ValueError( 165 f"Partition {str(exception)} could not be found in current state based on the record. This is unexpected because " 166 f"we should only update state for partitions that were emitted during `stream_slices`" 167 ) 168 169 def get_stream_state(self) -> StreamState: 170 states = [] 171 for partition_tuple, cursor in self._cursor_per_partition.items(): 172 cursor_state = cursor.get_stream_state() 173 if cursor_state: 174 states.append( 175 { 176 "partition": self._to_dict(partition_tuple), 177 "cursor": cursor_state, 178 } 179 ) 180 state: dict[str, Any] = {"states": states} 181 182 parent_state = self._partition_router.get_stream_state() 183 if parent_state: 184 state["parent_state"] = parent_state 185 return state 186 187 def _get_state_for_partition(self, partition: Mapping[str, Any]) -> Optional[StreamState]: 188 cursor = self._cursor_per_partition.get(self._to_partition_key(partition)) 189 if cursor: 190 return cursor.get_stream_state() 191 192 return None 193 194 @staticmethod 195 def _is_new_state(stream_state: Mapping[str, Any]) -> bool: 196 return not bool(stream_state) 197 198 def _to_partition_key(self, partition: Mapping[str, Any]) -> str: 199 return self._partition_serializer.to_partition_key(partition) 200 201 def _to_dict(self, partition_key: str) -> Mapping[str, Any]: 202 return self._partition_serializer.to_partition(partition_key) 203 204 def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: 205 if not stream_slice: 206 raise ValueError("A partition needs to be provided in order to extract a state") 207 208 if not stream_slice: 209 return None 210 211 return self._get_state_for_partition(stream_slice.partition) 212 213 def _create_cursor(self, cursor_state: Any) -> DeclarativeCursor: 214 cursor = self._cursor_factory.create() 215 cursor.set_initial_state(cursor_state) 216 return cursor 217 218 def get_request_params( 219 self, 220 *, 221 stream_state: Optional[StreamState] = None, 222 stream_slice: Optional[StreamSlice] = None, 223 next_page_token: Optional[Mapping[str, Any]] = None, 224 ) -> Mapping[str, Any]: 225 if stream_slice: 226 if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition: 227 self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition)) 228 return self._partition_router.get_request_params( # type: ignore # this always returns a mapping 229 stream_state=stream_state, 230 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 231 next_page_token=next_page_token, 232 ) | self._cursor_per_partition[ 233 self._to_partition_key(stream_slice.partition) 234 ].get_request_params( 235 stream_state=stream_state, 236 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 237 next_page_token=next_page_token, 238 ) 239 else: 240 raise ValueError("A partition needs to be provided in order to get request params") 241 242 def get_request_headers( 243 self, 244 *, 245 stream_state: Optional[StreamState] = None, 246 stream_slice: Optional[StreamSlice] = None, 247 next_page_token: Optional[Mapping[str, Any]] = None, 248 ) -> Mapping[str, Any]: 249 if stream_slice: 250 if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition: 251 self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition)) 252 return self._partition_router.get_request_headers( # type: ignore # this always returns a mapping 253 stream_state=stream_state, 254 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 255 next_page_token=next_page_token, 256 ) | self._cursor_per_partition[ 257 self._to_partition_key(stream_slice.partition) 258 ].get_request_headers( 259 stream_state=stream_state, 260 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 261 next_page_token=next_page_token, 262 ) 263 else: 264 raise ValueError("A partition needs to be provided in order to get request headers") 265 266 def get_request_body_data( 267 self, 268 *, 269 stream_state: Optional[StreamState] = None, 270 stream_slice: Optional[StreamSlice] = None, 271 next_page_token: Optional[Mapping[str, Any]] = None, 272 ) -> Union[Mapping[str, Any], str]: 273 if stream_slice: 274 if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition: 275 self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition)) 276 return self._partition_router.get_request_body_data( # type: ignore # this always returns a mapping 277 stream_state=stream_state, 278 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 279 next_page_token=next_page_token, 280 ) | self._cursor_per_partition[ 281 self._to_partition_key(stream_slice.partition) 282 ].get_request_body_data( 283 stream_state=stream_state, 284 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 285 next_page_token=next_page_token, 286 ) 287 else: 288 raise ValueError("A partition needs to be provided in order to get request body data") 289 290 def get_request_body_json( 291 self, 292 *, 293 stream_state: Optional[StreamState] = None, 294 stream_slice: Optional[StreamSlice] = None, 295 next_page_token: Optional[Mapping[str, Any]] = None, 296 ) -> Mapping[str, Any]: 297 if stream_slice: 298 if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition: 299 self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition)) 300 return self._partition_router.get_request_body_json( # type: ignore # this always returns a mapping 301 stream_state=stream_state, 302 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 303 next_page_token=next_page_token, 304 ) | self._cursor_per_partition[ 305 self._to_partition_key(stream_slice.partition) 306 ].get_request_body_json( 307 stream_state=stream_state, 308 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 309 next_page_token=next_page_token, 310 ) 311 else: 312 raise ValueError("A partition needs to be provided in order to get request body json") 313 314 def should_be_synced(self, record: Record) -> bool: 315 return self._get_cursor(record).should_be_synced( 316 self._convert_record_to_cursor_record(record) 317 ) 318 319 def is_greater_than_or_equal(self, first: Record, second: Record) -> bool: 320 if not first.associated_slice or not second.associated_slice: 321 raise ValueError( 322 f"Both records should have an associated slice but got {first.associated_slice} and {second.associated_slice}" 323 ) 324 if first.associated_slice.partition != second.associated_slice.partition: 325 raise ValueError( 326 f"To compare records, partition should be the same but got {first.associated_slice.partition} and {second.associated_slice.partition}" 327 ) 328 329 return self._get_cursor(first).is_greater_than_or_equal( 330 self._convert_record_to_cursor_record(first), 331 self._convert_record_to_cursor_record(second), 332 ) 333 334 @staticmethod 335 def _convert_record_to_cursor_record(record: Record) -> Record: 336 return Record( 337 data=record.data, 338 stream_name=record.stream_name, 339 associated_slice=StreamSlice( 340 partition={}, cursor_slice=record.associated_slice.cursor_slice 341 ) 342 if record.associated_slice 343 else None, 344 ) 345 346 def _get_cursor(self, record: Record) -> DeclarativeCursor: 347 if not record.associated_slice: 348 raise ValueError( 349 "Invalid state as stream slices that are emitted should refer to an existing cursor" 350 ) 351 partition_key = self._to_partition_key(record.associated_slice.partition) 352 if partition_key not in self._cursor_per_partition: 353 self._create_cursor_for_partition(partition_key) 354 cursor = self._cursor_per_partition[partition_key] 355 return cursor 356 357 def _create_cursor_for_partition(self, partition_key: str) -> None: 358 """ 359 Dynamically creates and initializes a cursor for the specified partition. 360 361 This method is required for `ConcurrentPerPartitionCursor`. For concurrent cursors, 362 stream_slices is executed only for the concurrent cursor, so cursors per partition 363 are not created for the declarative cursor. This method ensures that a cursor is available 364 to create requests for the specified partition. The cursor is initialized 365 with the per-partition state if present in the initial state, or with the global state 366 adjusted by the lookback window, or with the state to migrate from. 367 368 Note: 369 This is a temporary workaround and should be removed once the declarative cursor 370 is decoupled from the concurrent cursor implementation. 371 372 Args: 373 partition_key (str): The unique identifier for the partition for which the cursor 374 needs to be created. 375 """ 376 partition_state = ( 377 self._state_to_migrate_from if self._state_to_migrate_from else self._NO_CURSOR_STATE 378 ) 379 cursor = self._create_cursor(partition_state) 380 381 self._cursor_per_partition[partition_key] = cursor
Manages state per partition when a stream has many partitions, to prevent data loss or duplication.
Partition Limitation and Limit Reached Logic
- DEFAULT_MAX_PARTITIONS_NUMBER: The maximum number of partitions to keep in memory (default is 10,000).
- _cursor_per_partition: An ordered dictionary that stores cursors for each partition.
- _over_limit: A counter that increments each time an oldest partition is removed when the limit is exceeded.
The class ensures that the number of partitions tracked does not exceed the DEFAULT_MAX_PARTITIONS_NUMBER
to prevent excessive memory usage.
- When the number of partitions exceeds the limit, the oldest partitions are removed from
_cursor_per_partition
, and_over_limit
is incremented accordingly. - The
limit_reached
method returnsTrue
when_over_limit
exceedsDEFAULT_MAX_PARTITIONS_NUMBER
, indicating that the global cursor should be used instead of per-partition cursors.
This approach avoids unnecessary switching to a global cursor due to temporary spikes in partition counts, ensuring that switching is only done when a sustained high number of partitions is observed.
53 def __init__(self, cursor_factory: CursorFactory, partition_router: PartitionRouter): 54 self._cursor_factory = cursor_factory 55 self._partition_router = partition_router 56 # The dict is ordered to ensure that once the maximum number of partitions is reached, 57 # the oldest partitions can be efficiently removed, maintaining the most recent partitions. 58 self._cursor_per_partition: OrderedDict[str, DeclarativeCursor] = OrderedDict() 59 self._over_limit = 0 60 self._partition_serializer = PerPartitionKeySerializer()
62 def stream_slices(self) -> Iterable[StreamSlice]: 63 slices = self._partition_router.stream_slices() 64 for partition in slices: 65 yield from self.generate_slices_from_partition(partition)
Defines stream slices
Returns
An iterable of stream slices
67 def generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[StreamSlice]: 68 # Ensure the maximum number of partitions is not exceeded 69 self._ensure_partition_limit() 70 71 cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition)) 72 if not cursor: 73 partition_state = ( 74 self._state_to_migrate_from 75 if self._state_to_migrate_from 76 else self._NO_CURSOR_STATE 77 ) 78 cursor = self._create_cursor(partition_state) 79 self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor 80 81 for cursor_slice in cursor.stream_slices(): 82 yield StreamSlice( 83 partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields 84 )
102 def set_initial_state(self, stream_state: StreamState) -> None: 103 """ 104 Set the initial state for the cursors. 105 106 This method initializes the state for each partition cursor using the provided stream state. 107 If a partition state is provided in the stream state, it will update the corresponding partition cursor with this state. 108 109 Additionally, it sets the parent state for partition routers that are based on parent streams. If a partition router 110 does not have parent streams, this step will be skipped due to the default PartitionRouter implementation. 111 112 Args: 113 stream_state (StreamState): The state of the streams to be set. The format of the stream state should be: 114 { 115 "states": [ 116 { 117 "partition": { 118 "partition_key": "value" 119 }, 120 "cursor": { 121 "last_updated": "2023-05-27T00:00:00Z" 122 } 123 } 124 ], 125 "parent_state": { 126 "parent_stream_name": { 127 "last_updated": "2023-05-27T00:00:00Z" 128 } 129 } 130 } 131 """ 132 if not stream_state: 133 return 134 135 if "states" not in stream_state: 136 # We assume that `stream_state` is in a global format that can be applied to all partitions. 137 # Example: {"global_state_format_key": "global_state_format_value"} 138 self._state_to_migrate_from = stream_state 139 140 else: 141 for state in stream_state["states"]: 142 self._cursor_per_partition[self._to_partition_key(state["partition"])] = ( 143 self._create_cursor(state["cursor"]) 144 ) 145 146 # set default state for missing partitions if it is per partition with fallback to global 147 if "state" in stream_state: 148 self._state_to_migrate_from = stream_state["state"] 149 150 # Set parent state for partition routers based on parent streams 151 self._partition_router.set_initial_state(stream_state)
Set the initial state for the cursors.
This method initializes the state for each partition cursor using the provided stream state. If a partition state is provided in the stream state, it will update the corresponding partition cursor with this state.
Additionally, it sets the parent state for partition routers that are based on parent streams. If a partition router does not have parent streams, this step will be skipped due to the default PartitionRouter implementation.
Arguments:
- stream_state (StreamState): The state of the streams to be set. The format of the stream state should be: { "states": [ { "partition": { "partition_key": "value" }, "cursor": { "last_updated": "2023-05-27T00:00:00Z" } } ], "parent_state": { "parent_stream_name": { "last_updated": "2023-05-27T00:00:00Z" } } }
153 def observe(self, stream_slice: StreamSlice, record: Record) -> None: 154 self._cursor_per_partition[self._to_partition_key(stream_slice.partition)].observe( 155 StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), record 156 )
Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read.
Parameters
- stream_slice: The current slice, which may or may not contain the most recently observed record
- record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the stream state may need to be deferred depending on whether the source reliably orders records by the cursor field.
158 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 159 try: 160 self._cursor_per_partition[self._to_partition_key(stream_slice.partition)].close_slice( 161 StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), *args 162 ) 163 except KeyError as exception: 164 raise ValueError( 165 f"Partition {str(exception)} could not be found in current state based on the record. This is unexpected because " 166 f"we should only update state for partitions that were emitted during `stream_slices`" 167 )
Update state based on the stream slice. Note that stream_slice.cursor_slice
and most_recent_record.associated_slice
are expected
to be the same but we make it explicit here that stream_slice
should be leveraged to update the state. We do not pass in the
latest record, since cursor instances should maintain the relevant internal state on their own.
Parameters
- stream_slice: slice to close
169 def get_stream_state(self) -> StreamState: 170 states = [] 171 for partition_tuple, cursor in self._cursor_per_partition.items(): 172 cursor_state = cursor.get_stream_state() 173 if cursor_state: 174 states.append( 175 { 176 "partition": self._to_dict(partition_tuple), 177 "cursor": cursor_state, 178 } 179 ) 180 state: dict[str, Any] = {"states": states} 181 182 parent_state = self._partition_router.get_stream_state() 183 if parent_state: 184 state["parent_state"] = parent_state 185 return state
Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it is used for two things:
- Interpolation of the requests
- Transformation of records
- Saving the state
For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that allows for emitting the state to the platform.
204 def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: 205 if not stream_slice: 206 raise ValueError("A partition needs to be provided in order to extract a state") 207 208 if not stream_slice: 209 return None 210 211 return self._get_state_for_partition(stream_slice.partition)
Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of a specific parent delineated by the incoming slice's partition object.
218 def get_request_params( 219 self, 220 *, 221 stream_state: Optional[StreamState] = None, 222 stream_slice: Optional[StreamSlice] = None, 223 next_page_token: Optional[Mapping[str, Any]] = None, 224 ) -> Mapping[str, Any]: 225 if stream_slice: 226 if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition: 227 self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition)) 228 return self._partition_router.get_request_params( # type: ignore # this always returns a mapping 229 stream_state=stream_state, 230 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 231 next_page_token=next_page_token, 232 ) | self._cursor_per_partition[ 233 self._to_partition_key(stream_slice.partition) 234 ].get_request_params( 235 stream_state=stream_state, 236 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 237 next_page_token=next_page_token, 238 ) 239 else: 240 raise ValueError("A partition needs to be provided in order to get request params")
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
242 def get_request_headers( 243 self, 244 *, 245 stream_state: Optional[StreamState] = None, 246 stream_slice: Optional[StreamSlice] = None, 247 next_page_token: Optional[Mapping[str, Any]] = None, 248 ) -> Mapping[str, Any]: 249 if stream_slice: 250 if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition: 251 self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition)) 252 return self._partition_router.get_request_headers( # type: ignore # this always returns a mapping 253 stream_state=stream_state, 254 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 255 next_page_token=next_page_token, 256 ) | self._cursor_per_partition[ 257 self._to_partition_key(stream_slice.partition) 258 ].get_request_headers( 259 stream_state=stream_state, 260 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 261 next_page_token=next_page_token, 262 ) 263 else: 264 raise ValueError("A partition needs to be provided in order to get request headers")
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
266 def get_request_body_data( 267 self, 268 *, 269 stream_state: Optional[StreamState] = None, 270 stream_slice: Optional[StreamSlice] = None, 271 next_page_token: Optional[Mapping[str, Any]] = None, 272 ) -> Union[Mapping[str, Any], str]: 273 if stream_slice: 274 if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition: 275 self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition)) 276 return self._partition_router.get_request_body_data( # type: ignore # this always returns a mapping 277 stream_state=stream_state, 278 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 279 next_page_token=next_page_token, 280 ) | self._cursor_per_partition[ 281 self._to_partition_key(stream_slice.partition) 282 ].get_request_body_data( 283 stream_state=stream_state, 284 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 285 next_page_token=next_page_token, 286 ) 287 else: 288 raise ValueError("A partition needs to be provided in order to get request body data")
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
290 def get_request_body_json( 291 self, 292 *, 293 stream_state: Optional[StreamState] = None, 294 stream_slice: Optional[StreamSlice] = None, 295 next_page_token: Optional[Mapping[str, Any]] = None, 296 ) -> Mapping[str, Any]: 297 if stream_slice: 298 if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition: 299 self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition)) 300 return self._partition_router.get_request_body_json( # type: ignore # this always returns a mapping 301 stream_state=stream_state, 302 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 303 next_page_token=next_page_token, 304 ) | self._cursor_per_partition[ 305 self._to_partition_key(stream_slice.partition) 306 ].get_request_body_json( 307 stream_state=stream_state, 308 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 309 next_page_token=next_page_token, 310 ) 311 else: 312 raise ValueError("A partition needs to be provided in order to get request body json")
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
314 def should_be_synced(self, record: Record) -> bool: 315 return self._get_cursor(record).should_be_synced( 316 self._convert_record_to_cursor_record(record) 317 )
Evaluating if a record should be synced allows for filtering and stop condition on pagination
319 def is_greater_than_or_equal(self, first: Record, second: Record) -> bool: 320 if not first.associated_slice or not second.associated_slice: 321 raise ValueError( 322 f"Both records should have an associated slice but got {first.associated_slice} and {second.associated_slice}" 323 ) 324 if first.associated_slice.partition != second.associated_slice.partition: 325 raise ValueError( 326 f"To compare records, partition should be the same but got {first.associated_slice.partition} and {second.associated_slice.partition}" 327 ) 328 329 return self._get_cursor(first).is_greater_than_or_equal( 330 self._convert_record_to_cursor_record(first), 331 self._convert_record_to_cursor_record(second), 332 )
Evaluating which record is greater in terms of cursor. This is used to avoid having to capture all the records to close a slice
21class PerPartitionWithGlobalCursor(DeclarativeCursor): 22 """ 23 Manages state for streams with multiple partitions, with an optional fallback to a global cursor when specific conditions are met. 24 25 This cursor handles partitioned streams by maintaining individual state per partition using `PerPartitionCursor`. If the number of partitions exceeds a defined limit, it switches to a global cursor (`GlobalSubstreamCursor`) to manage state more efficiently. 26 27 **Overview** 28 29 - **Partition-Based State**: Initially manages state per partition to ensure accurate processing of each partition's data. 30 - **Global Fallback**: Switches to a global cursor when the partition limit is exceeded to handle state management more effectively. 31 32 **Switching Logic** 33 34 - Monitors the number of partitions. 35 - If `PerPartitionCursor.limit_reached()` returns `True`, sets `_use_global_cursor` to `True`, activating the global cursor. 36 37 **Active Cursor Selection** 38 39 - Uses the `_get_active_cursor()` helper method to select the active cursor based on the `_use_global_cursor` flag. 40 - This simplifies the logic and ensures consistent cursor usage across methods. 41 42 **State Structure Example** 43 44 ```json 45 { 46 "states": [ 47 { 48 "partition": {"partition_key": "partition_1"}, 49 "cursor": {"cursor_field": "2021-01-15"} 50 }, 51 { 52 "partition": {"partition_key": "partition_2"}, 53 "cursor": {"cursor_field": "2021-02-14"} 54 } 55 ], 56 "state": { 57 "cursor_field": "2021-02-15" 58 }, 59 "use_global_cursor": false 60 } 61 ``` 62 63 In this example, the cursor is using partition-based state management (`"use_global_cursor": false`), maintaining separate cursor states for each partition. 64 65 **Usage Scenario** 66 67 Suitable for streams where the number of partitions may vary significantly, requiring dynamic switching between per-partition and global state management to ensure data consistency and efficient synchronization. 68 """ 69 70 def __init__( 71 self, 72 cursor_factory: CursorFactory, 73 partition_router: PartitionRouter, 74 stream_cursor: DatetimeBasedCursor, 75 ): 76 self._partition_router = partition_router 77 self._per_partition_cursor = PerPartitionCursor(cursor_factory, partition_router) 78 self._global_cursor = GlobalSubstreamCursor(stream_cursor, partition_router) 79 self._use_global_cursor = False 80 self._current_partition: Optional[Mapping[str, Any]] = None 81 self._last_slice: bool = False 82 self._parent_state: Optional[Mapping[str, Any]] = None 83 84 def _get_active_cursor(self) -> Union[PerPartitionCursor, GlobalSubstreamCursor]: 85 return self._global_cursor if self._use_global_cursor else self._per_partition_cursor 86 87 def stream_slices(self) -> Iterable[StreamSlice]: 88 self._global_cursor.start_slices_generation() 89 90 # Iterate through partitions and process slices 91 for partition, is_last_partition, parent_state in iterate_with_last_flag_and_state( 92 self._partition_router.stream_slices(), self._partition_router.get_stream_state 93 ): 94 # Generate slices for the current cursor and handle the last slice using the flag 95 self._parent_state = parent_state 96 for slice, is_last_slice, _ in iterate_with_last_flag_and_state( 97 self._get_active_cursor().generate_slices_from_partition(partition=partition), 98 lambda: None, 99 ): 100 self._global_cursor.register_slice(is_last_slice and is_last_partition) 101 yield slice 102 self._parent_state = self._partition_router.get_stream_state() 103 104 def set_initial_state(self, stream_state: StreamState) -> None: 105 """ 106 Set the initial state for the cursors. 107 """ 108 self._use_global_cursor = stream_state.get("use_global_cursor", False) 109 110 self._parent_state = stream_state.get("parent_state", {}) 111 112 self._global_cursor.set_initial_state(stream_state) 113 if not self._use_global_cursor: 114 self._per_partition_cursor.set_initial_state(stream_state) 115 116 def observe(self, stream_slice: StreamSlice, record: Record) -> None: 117 if not self._use_global_cursor and self._per_partition_cursor.limit_reached(): 118 self._use_global_cursor = True 119 120 if not self._use_global_cursor: 121 self._per_partition_cursor.observe(stream_slice, record) 122 self._global_cursor.observe(stream_slice, record) 123 124 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 125 if not self._use_global_cursor: 126 self._per_partition_cursor.close_slice(stream_slice, *args) 127 self._global_cursor.close_slice(stream_slice, *args) 128 129 def get_stream_state(self) -> StreamState: 130 final_state: MutableMapping[str, Any] = {"use_global_cursor": self._use_global_cursor} 131 132 final_state.update(self._global_cursor.get_stream_state()) 133 if not self._use_global_cursor: 134 final_state.update(self._per_partition_cursor.get_stream_state()) 135 136 final_state["parent_state"] = self._parent_state 137 if not final_state.get("parent_state"): 138 del final_state["parent_state"] 139 140 return final_state 141 142 def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: 143 return self._get_active_cursor().select_state(stream_slice) 144 145 def get_request_params( 146 self, 147 *, 148 stream_state: Optional[StreamState] = None, 149 stream_slice: Optional[StreamSlice] = None, 150 next_page_token: Optional[Mapping[str, Any]] = None, 151 ) -> Mapping[str, Any]: 152 return self._get_active_cursor().get_request_params( 153 stream_state=stream_state, 154 stream_slice=stream_slice, 155 next_page_token=next_page_token, 156 ) 157 158 def get_request_headers( 159 self, 160 *, 161 stream_state: Optional[StreamState] = None, 162 stream_slice: Optional[StreamSlice] = None, 163 next_page_token: Optional[Mapping[str, Any]] = None, 164 ) -> Mapping[str, Any]: 165 return self._get_active_cursor().get_request_headers( 166 stream_state=stream_state, 167 stream_slice=stream_slice, 168 next_page_token=next_page_token, 169 ) 170 171 def get_request_body_data( 172 self, 173 *, 174 stream_state: Optional[StreamState] = None, 175 stream_slice: Optional[StreamSlice] = None, 176 next_page_token: Optional[Mapping[str, Any]] = None, 177 ) -> Union[Mapping[str, Any], str]: 178 return self._get_active_cursor().get_request_body_data( 179 stream_state=stream_state, 180 stream_slice=stream_slice, 181 next_page_token=next_page_token, 182 ) 183 184 def get_request_body_json( 185 self, 186 *, 187 stream_state: Optional[StreamState] = None, 188 stream_slice: Optional[StreamSlice] = None, 189 next_page_token: Optional[Mapping[str, Any]] = None, 190 ) -> Mapping[str, Any]: 191 return self._get_active_cursor().get_request_body_json( 192 stream_state=stream_state, 193 stream_slice=stream_slice, 194 next_page_token=next_page_token, 195 ) 196 197 def should_be_synced(self, record: Record) -> bool: 198 return self._get_active_cursor().should_be_synced(record) 199 200 def is_greater_than_or_equal(self, first: Record, second: Record) -> bool: 201 return self._global_cursor.is_greater_than_or_equal(first, second)
Manages state for streams with multiple partitions, with an optional fallback to a global cursor when specific conditions are met.
This cursor handles partitioned streams by maintaining individual state per partition using PerPartitionCursor
. If the number of partitions exceeds a defined limit, it switches to a global cursor (GlobalSubstreamCursor
) to manage state more efficiently.
Overview
- Partition-Based State: Initially manages state per partition to ensure accurate processing of each partition's data.
- Global Fallback: Switches to a global cursor when the partition limit is exceeded to handle state management more effectively.
Switching Logic
- Monitors the number of partitions.
- If
PerPartitionCursor.limit_reached()
returnsTrue
, sets_use_global_cursor
toTrue
, activating the global cursor.
Active Cursor Selection
- Uses the
_get_active_cursor()
helper method to select the active cursor based on the_use_global_cursor
flag. - This simplifies the logic and ensures consistent cursor usage across methods.
State Structure Example
{
"states": [
{
"partition": {"partition_key": "partition_1"},
"cursor": {"cursor_field": "2021-01-15"}
},
{
"partition": {"partition_key": "partition_2"},
"cursor": {"cursor_field": "2021-02-14"}
}
],
"state": {
"cursor_field": "2021-02-15"
},
"use_global_cursor": false
}
In this example, the cursor is using partition-based state management ("use_global_cursor": false
), maintaining separate cursor states for each partition.
Usage Scenario
Suitable for streams where the number of partitions may vary significantly, requiring dynamic switching between per-partition and global state management to ensure data consistency and efficient synchronization.
70 def __init__( 71 self, 72 cursor_factory: CursorFactory, 73 partition_router: PartitionRouter, 74 stream_cursor: DatetimeBasedCursor, 75 ): 76 self._partition_router = partition_router 77 self._per_partition_cursor = PerPartitionCursor(cursor_factory, partition_router) 78 self._global_cursor = GlobalSubstreamCursor(stream_cursor, partition_router) 79 self._use_global_cursor = False 80 self._current_partition: Optional[Mapping[str, Any]] = None 81 self._last_slice: bool = False 82 self._parent_state: Optional[Mapping[str, Any]] = None
87 def stream_slices(self) -> Iterable[StreamSlice]: 88 self._global_cursor.start_slices_generation() 89 90 # Iterate through partitions and process slices 91 for partition, is_last_partition, parent_state in iterate_with_last_flag_and_state( 92 self._partition_router.stream_slices(), self._partition_router.get_stream_state 93 ): 94 # Generate slices for the current cursor and handle the last slice using the flag 95 self._parent_state = parent_state 96 for slice, is_last_slice, _ in iterate_with_last_flag_and_state( 97 self._get_active_cursor().generate_slices_from_partition(partition=partition), 98 lambda: None, 99 ): 100 self._global_cursor.register_slice(is_last_slice and is_last_partition) 101 yield slice 102 self._parent_state = self._partition_router.get_stream_state()
Defines stream slices
Returns
An iterable of stream slices
104 def set_initial_state(self, stream_state: StreamState) -> None: 105 """ 106 Set the initial state for the cursors. 107 """ 108 self._use_global_cursor = stream_state.get("use_global_cursor", False) 109 110 self._parent_state = stream_state.get("parent_state", {}) 111 112 self._global_cursor.set_initial_state(stream_state) 113 if not self._use_global_cursor: 114 self._per_partition_cursor.set_initial_state(stream_state)
Set the initial state for the cursors.
116 def observe(self, stream_slice: StreamSlice, record: Record) -> None: 117 if not self._use_global_cursor and self._per_partition_cursor.limit_reached(): 118 self._use_global_cursor = True 119 120 if not self._use_global_cursor: 121 self._per_partition_cursor.observe(stream_slice, record) 122 self._global_cursor.observe(stream_slice, record)
Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read.
Parameters
- stream_slice: The current slice, which may or may not contain the most recently observed record
- record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the stream state may need to be deferred depending on whether the source reliably orders records by the cursor field.
124 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 125 if not self._use_global_cursor: 126 self._per_partition_cursor.close_slice(stream_slice, *args) 127 self._global_cursor.close_slice(stream_slice, *args)
Update state based on the stream slice. Note that stream_slice.cursor_slice
and most_recent_record.associated_slice
are expected
to be the same but we make it explicit here that stream_slice
should be leveraged to update the state. We do not pass in the
latest record, since cursor instances should maintain the relevant internal state on their own.
Parameters
- stream_slice: slice to close
129 def get_stream_state(self) -> StreamState: 130 final_state: MutableMapping[str, Any] = {"use_global_cursor": self._use_global_cursor} 131 132 final_state.update(self._global_cursor.get_stream_state()) 133 if not self._use_global_cursor: 134 final_state.update(self._per_partition_cursor.get_stream_state()) 135 136 final_state["parent_state"] = self._parent_state 137 if not final_state.get("parent_state"): 138 del final_state["parent_state"] 139 140 return final_state
Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it is used for two things:
- Interpolation of the requests
- Transformation of records
- Saving the state
For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that allows for emitting the state to the platform.
142 def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: 143 return self._get_active_cursor().select_state(stream_slice)
Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of a specific parent delineated by the incoming slice's partition object.
145 def get_request_params( 146 self, 147 *, 148 stream_state: Optional[StreamState] = None, 149 stream_slice: Optional[StreamSlice] = None, 150 next_page_token: Optional[Mapping[str, Any]] = None, 151 ) -> Mapping[str, Any]: 152 return self._get_active_cursor().get_request_params( 153 stream_state=stream_state, 154 stream_slice=stream_slice, 155 next_page_token=next_page_token, 156 )
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
158 def get_request_headers( 159 self, 160 *, 161 stream_state: Optional[StreamState] = None, 162 stream_slice: Optional[StreamSlice] = None, 163 next_page_token: Optional[Mapping[str, Any]] = None, 164 ) -> Mapping[str, Any]: 165 return self._get_active_cursor().get_request_headers( 166 stream_state=stream_state, 167 stream_slice=stream_slice, 168 next_page_token=next_page_token, 169 )
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
171 def get_request_body_data( 172 self, 173 *, 174 stream_state: Optional[StreamState] = None, 175 stream_slice: Optional[StreamSlice] = None, 176 next_page_token: Optional[Mapping[str, Any]] = None, 177 ) -> Union[Mapping[str, Any], str]: 178 return self._get_active_cursor().get_request_body_data( 179 stream_state=stream_state, 180 stream_slice=stream_slice, 181 next_page_token=next_page_token, 182 )
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
184 def get_request_body_json( 185 self, 186 *, 187 stream_state: Optional[StreamState] = None, 188 stream_slice: Optional[StreamSlice] = None, 189 next_page_token: Optional[Mapping[str, Any]] = None, 190 ) -> Mapping[str, Any]: 191 return self._get_active_cursor().get_request_body_json( 192 stream_state=stream_state, 193 stream_slice=stream_slice, 194 next_page_token=next_page_token, 195 )
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
197 def should_be_synced(self, record: Record) -> bool: 198 return self._get_active_cursor().should_be_synced(record)
Evaluating if a record should be synced allows for filtering and stop condition on pagination
12@dataclass 13class ResumableFullRefreshCursor(DeclarativeCursor): 14 parameters: InitVar[Mapping[str, Any]] 15 16 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 17 self._cursor: StreamState = {} 18 19 def get_stream_state(self) -> StreamState: 20 return self._cursor 21 22 def set_initial_state(self, stream_state: StreamState) -> None: 23 self._cursor = stream_state 24 25 def observe(self, stream_slice: StreamSlice, record: Record) -> None: 26 """ 27 Resumable full refresh manages state using a page number so it does not need to update state by observing incoming records. 28 """ 29 pass 30 31 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 32 # The ResumableFullRefreshCursor doesn't support nested streams yet so receiving a partition is unexpected 33 if stream_slice.partition: 34 raise ValueError( 35 f"Stream slice {stream_slice} should not have a partition. Got {stream_slice.partition}." 36 ) 37 self._cursor = stream_slice.cursor_slice 38 39 def should_be_synced(self, record: Record) -> bool: 40 """ 41 Unlike date-based cursors which filter out records outside slice boundaries, resumable full refresh records exist within pages 42 that don't have filterable bounds. We should always return them. 43 """ 44 return True 45 46 def is_greater_than_or_equal(self, first: Record, second: Record) -> bool: 47 """ 48 RFR record don't have ordering to be compared between one another. 49 """ 50 return False 51 52 def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: 53 # A top-level RFR cursor only manages the state of a single partition 54 return self._cursor 55 56 def stream_slices(self) -> Iterable[StreamSlice]: 57 """ 58 Resumable full refresh cursors only return a single slice and can't perform partitioning because iteration is done per-page 59 along an unbounded set. 60 """ 61 yield from [StreamSlice(cursor_slice=self._cursor, partition={})] 62 63 # This is an interesting pattern that might not seem obvious at first glance. This cursor itself has no functional need to 64 # inject any request values into the outbound response because the up-to-date pagination state is already loaded and 65 # maintained by the paginator component 66 def get_request_params( 67 self, 68 *, 69 stream_state: Optional[StreamState] = None, 70 stream_slice: Optional[StreamSlice] = None, 71 next_page_token: Optional[Mapping[str, Any]] = None, 72 ) -> Mapping[str, Any]: 73 return {} 74 75 def get_request_headers( 76 self, 77 *, 78 stream_state: Optional[StreamState] = None, 79 stream_slice: Optional[StreamSlice] = None, 80 next_page_token: Optional[Mapping[str, Any]] = None, 81 ) -> Mapping[str, Any]: 82 return {} 83 84 def get_request_body_data( 85 self, 86 *, 87 stream_state: Optional[StreamState] = None, 88 stream_slice: Optional[StreamSlice] = None, 89 next_page_token: Optional[Mapping[str, Any]] = None, 90 ) -> Mapping[str, Any]: 91 return {} 92 93 def get_request_body_json( 94 self, 95 *, 96 stream_state: Optional[StreamState] = None, 97 stream_slice: Optional[StreamSlice] = None, 98 next_page_token: Optional[Mapping[str, Any]] = None, 99 ) -> Mapping[str, Any]: 100 return {}
Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it is used for two things:
- Interpolation of the requests
- Transformation of records
- Saving the state
For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that allows for emitting the state to the platform.
Cursors are not initialized with their state. As state is needed in order to function properly, this method should be called before calling anything else
Parameters
- stream_state: The state of the stream as returned by get_stream_state
25 def observe(self, stream_slice: StreamSlice, record: Record) -> None: 26 """ 27 Resumable full refresh manages state using a page number so it does not need to update state by observing incoming records. 28 """ 29 pass
Resumable full refresh manages state using a page number so it does not need to update state by observing incoming records.
31 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 32 # The ResumableFullRefreshCursor doesn't support nested streams yet so receiving a partition is unexpected 33 if stream_slice.partition: 34 raise ValueError( 35 f"Stream slice {stream_slice} should not have a partition. Got {stream_slice.partition}." 36 ) 37 self._cursor = stream_slice.cursor_slice
Update state based on the stream slice. Note that stream_slice.cursor_slice
and most_recent_record.associated_slice
are expected
to be the same but we make it explicit here that stream_slice
should be leveraged to update the state. We do not pass in the
latest record, since cursor instances should maintain the relevant internal state on their own.
Parameters
- stream_slice: slice to close
39 def should_be_synced(self, record: Record) -> bool: 40 """ 41 Unlike date-based cursors which filter out records outside slice boundaries, resumable full refresh records exist within pages 42 that don't have filterable bounds. We should always return them. 43 """ 44 return True
Unlike date-based cursors which filter out records outside slice boundaries, resumable full refresh records exist within pages that don't have filterable bounds. We should always return them.
46 def is_greater_than_or_equal(self, first: Record, second: Record) -> bool: 47 """ 48 RFR record don't have ordering to be compared between one another. 49 """ 50 return False
RFR record don't have ordering to be compared between one another.
52 def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: 53 # A top-level RFR cursor only manages the state of a single partition 54 return self._cursor
Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of a specific parent delineated by the incoming slice's partition object.
56 def stream_slices(self) -> Iterable[StreamSlice]: 57 """ 58 Resumable full refresh cursors only return a single slice and can't perform partitioning because iteration is done per-page 59 along an unbounded set. 60 """ 61 yield from [StreamSlice(cursor_slice=self._cursor, partition={})]
Resumable full refresh cursors only return a single slice and can't perform partitioning because iteration is done per-page along an unbounded set.
66 def get_request_params( 67 self, 68 *, 69 stream_state: Optional[StreamState] = None, 70 stream_slice: Optional[StreamSlice] = None, 71 next_page_token: Optional[Mapping[str, Any]] = None, 72 ) -> Mapping[str, Any]: 73 return {}
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
75 def get_request_headers( 76 self, 77 *, 78 stream_state: Optional[StreamState] = None, 79 stream_slice: Optional[StreamSlice] = None, 80 next_page_token: Optional[Mapping[str, Any]] = None, 81 ) -> Mapping[str, Any]: 82 return {}
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
84 def get_request_body_data( 85 self, 86 *, 87 stream_state: Optional[StreamState] = None, 88 stream_slice: Optional[StreamSlice] = None, 89 next_page_token: Optional[Mapping[str, Any]] = None, 90 ) -> Mapping[str, Any]: 91 return {}
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
93 def get_request_body_json( 94 self, 95 *, 96 stream_state: Optional[StreamState] = None, 97 stream_slice: Optional[StreamSlice] = None, 98 next_page_token: Optional[Mapping[str, Any]] = None, 99 ) -> Mapping[str, Any]: 100 return {}
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
103@dataclass 104class ChildPartitionResumableFullRefreshCursor(ResumableFullRefreshCursor): 105 """ 106 The Sub-stream Resumable Cursor for Full-Refresh substreams. 107 Follows the parent type `ResumableFullRefreshCursor` with a small override, 108 to provide the ability to close the substream's slice once it has finished processing. 109 110 Check the `close_slice` method overide for more info about the actual behaviour of this cursor. 111 """ 112 113 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 114 """ 115 Once the current slice has finished syncing: 116 - paginator returns None 117 - no more slices to process 118 119 we assume that the records are processed and emitted already, 120 thus we have to set the cursor to ` __ab_full_refresh_sync_complete: true `, 121 otherwise there is a risk of Inf. Loop processing the same slice. 122 """ 123 self._cursor = FULL_REFRESH_COMPLETE_STATE
The Sub-stream Resumable Cursor for Full-Refresh substreams.
Follows the parent type ResumableFullRefreshCursor
with a small override,
to provide the ability to close the substream's slice once it has finished processing.
Check the close_slice
method overide for more info about the actual behaviour of this cursor.
113 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 114 """ 115 Once the current slice has finished syncing: 116 - paginator returns None 117 - no more slices to process 118 119 we assume that the records are processed and emitted already, 120 thus we have to set the cursor to ` __ab_full_refresh_sync_complete: true `, 121 otherwise there is a risk of Inf. Loop processing the same slice. 122 """ 123 self._cursor = FULL_REFRESH_COMPLETE_STATE
Once the current slice has finished syncing:
- paginator returns None
- no more slices to process
we assume that the records are processed and emitted already,
thus we have to set the cursor to __ab_full_refresh_sync_complete: true
,
otherwise there is a risk of Inf. Loop processing the same slice.