airbyte_cdk.sources.streams.concurrent.cursor
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import functools 6import logging 7from abc import ABC, abstractmethod 8from typing import ( 9 Any, 10 Callable, 11 Iterable, 12 List, 13 Mapping, 14 MutableMapping, 15 Optional, 16 Tuple, 17 Union, 18) 19 20from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager 21from airbyte_cdk.sources.message import MessageRepository 22from airbyte_cdk.sources.streams import NO_CURSOR_STATE_KEY 23from airbyte_cdk.sources.streams.concurrent.clamping import ClampingStrategy, NoClamping 24from airbyte_cdk.sources.streams.concurrent.cursor_types import CursorValueType, GapType 25from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition 26from airbyte_cdk.sources.streams.concurrent.partitions.stream_slicer import StreamSlicer 27from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import ( 28 AbstractStreamStateConverter, 29) 30from airbyte_cdk.sources.types import Record, StreamSlice 31 32LOGGER = logging.getLogger("airbyte") 33 34 35def _extract_value(mapping: Mapping[str, Any], path: List[str]) -> Any: 36 return functools.reduce(lambda a, b: a[b], path, mapping) 37 38 39class CursorField: 40 def __init__(self, cursor_field_key: str) -> None: 41 self.cursor_field_key = cursor_field_key 42 43 def extract_value(self, record: Record) -> CursorValueType: 44 cursor_value = record.data.get(self.cursor_field_key) 45 if cursor_value is None: 46 raise ValueError(f"Could not find cursor field {self.cursor_field_key} in record") 47 return cursor_value # type: ignore # we assume that the value the path points at is a comparable 48 49 50class Cursor(StreamSlicer, ABC): 51 @property 52 @abstractmethod 53 def state(self) -> MutableMapping[str, Any]: ... 54 55 @abstractmethod 56 def observe(self, record: Record) -> None: 57 """ 58 Indicate to the cursor that the record has been emitted 59 """ 60 raise NotImplementedError() 61 62 @abstractmethod 63 def close_partition(self, partition: Partition) -> None: 64 """ 65 Indicate to the cursor that the partition has been successfully processed 66 """ 67 raise NotImplementedError() 68 69 @abstractmethod 70 def ensure_at_least_one_state_emitted(self) -> None: 71 """ 72 State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per 73 stream. Hence, if no partitions are generated, this method needs to be called. 74 """ 75 raise NotImplementedError() 76 77 @abstractmethod 78 def should_be_synced(self, record: Record) -> bool: 79 pass 80 81 def stream_slices(self) -> Iterable[StreamSlice]: 82 """ 83 Default placeholder implementation of generate_slices. 84 Subclasses can override this method to provide actual behavior. 85 """ 86 yield StreamSlice(partition={}, cursor_slice={}) 87 88 89class FinalStateCursor(Cursor): 90 """Cursor that is used to guarantee at least one state message is emitted for a concurrent stream.""" 91 92 def __init__( 93 self, 94 stream_name: str, 95 stream_namespace: Optional[str], 96 message_repository: MessageRepository, 97 ) -> None: 98 self._stream_name = stream_name 99 self._stream_namespace = stream_namespace 100 self._message_repository = message_repository 101 # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel 102 # state message rather than manage overall source state. This is also only temporary as we move to the resumable 103 # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state. 104 self._connector_state_manager = ConnectorStateManager() 105 self._has_closed_at_least_one_slice = False 106 107 @property 108 def state(self) -> MutableMapping[str, Any]: 109 return {NO_CURSOR_STATE_KEY: True} 110 111 def observe(self, record: Record) -> None: 112 pass 113 114 def close_partition(self, partition: Partition) -> None: 115 pass 116 117 def ensure_at_least_one_state_emitted(self) -> None: 118 """ 119 Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync 120 """ 121 122 self._connector_state_manager.update_state_for_stream( 123 self._stream_name, self._stream_namespace, self.state 124 ) 125 state_message = self._connector_state_manager.create_state_message( 126 self._stream_name, self._stream_namespace 127 ) 128 self._message_repository.emit_message(state_message) 129 130 def should_be_synced(self, record: Record) -> bool: 131 return True 132 133 134class ConcurrentCursor(Cursor): 135 _START_BOUNDARY = 0 136 _END_BOUNDARY = 1 137 138 def __init__( 139 self, 140 stream_name: str, 141 stream_namespace: Optional[str], 142 stream_state: Any, 143 message_repository: MessageRepository, 144 connector_state_manager: ConnectorStateManager, 145 connector_state_converter: AbstractStreamStateConverter, 146 cursor_field: CursorField, 147 slice_boundary_fields: Optional[Tuple[str, str]], 148 start: Optional[CursorValueType], 149 end_provider: Callable[[], CursorValueType], 150 lookback_window: Optional[GapType] = None, 151 slice_range: Optional[GapType] = None, 152 cursor_granularity: Optional[GapType] = None, 153 clamping_strategy: ClampingStrategy = NoClamping(), 154 ) -> None: 155 self._stream_name = stream_name 156 self._stream_namespace = stream_namespace 157 self._message_repository = message_repository 158 self._connector_state_converter = connector_state_converter 159 self._connector_state_manager = connector_state_manager 160 self._cursor_field = cursor_field 161 # To see some example where the slice boundaries might not be defined, check https://github.com/airbytehq/airbyte/blob/1ce84d6396e446e1ac2377362446e3fb94509461/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py#L363-L379 162 self._slice_boundary_fields = slice_boundary_fields 163 self._start = start 164 self._end_provider = end_provider 165 self.start, self._concurrent_state = self._get_concurrent_state(stream_state) 166 self._lookback_window = lookback_window 167 self._slice_range = slice_range 168 self._most_recent_cursor_value_per_partition: MutableMapping[ 169 Union[StreamSlice, Mapping[str, Any], None], Any 170 ] = {} 171 self._has_closed_at_least_one_slice = False 172 self._cursor_granularity = cursor_granularity 173 # Flag to track if the logger has been triggered (per stream) 174 self._should_be_synced_logger_triggered = False 175 self._clamping_strategy = clamping_strategy 176 177 @property 178 def state(self) -> MutableMapping[str, Any]: 179 return self._connector_state_converter.convert_to_state_message( 180 self.cursor_field, self._concurrent_state 181 ) 182 183 @property 184 def cursor_field(self) -> CursorField: 185 return self._cursor_field 186 187 @property 188 def _slice_boundary_fields_wrapper(self) -> Tuple[str, str]: 189 return ( 190 self._slice_boundary_fields 191 if self._slice_boundary_fields 192 else ( 193 self._connector_state_converter.START_KEY, 194 self._connector_state_converter.END_KEY, 195 ) 196 ) 197 198 def _get_concurrent_state( 199 self, state: MutableMapping[str, Any] 200 ) -> Tuple[CursorValueType, MutableMapping[str, Any]]: 201 if self._connector_state_converter.is_state_message_compatible(state): 202 partitioned_state = self._connector_state_converter.deserialize(state) 203 slices_from_partitioned_state = partitioned_state.get("slices", []) 204 205 value_from_partitioned_state = None 206 if slices_from_partitioned_state: 207 # We assume here that the slices have been already merged 208 first_slice = slices_from_partitioned_state[0] 209 value_from_partitioned_state = ( 210 first_slice[self._connector_state_converter.MOST_RECENT_RECORD_KEY] 211 if self._connector_state_converter.MOST_RECENT_RECORD_KEY in first_slice 212 else first_slice[self._connector_state_converter.END_KEY] 213 ) 214 return ( 215 value_from_partitioned_state 216 or self._start 217 or self._connector_state_converter.zero_value, 218 partitioned_state, 219 ) 220 return self._connector_state_converter.convert_from_sequential_state( 221 self._cursor_field, state, self._start 222 ) 223 224 def observe(self, record: Record) -> None: 225 most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get( 226 record.associated_slice 227 ) 228 try: 229 cursor_value = self._extract_cursor_value(record) 230 231 if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value: 232 self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value 233 except ValueError: 234 self._log_for_record_without_cursor_value() 235 236 def _extract_cursor_value(self, record: Record) -> Any: 237 return self._connector_state_converter.parse_value(self._cursor_field.extract_value(record)) 238 239 def close_partition(self, partition: Partition) -> None: 240 slice_count_before = len(self._concurrent_state.get("slices", [])) 241 self._add_slice_to_state(partition) 242 if slice_count_before < len( 243 self._concurrent_state["slices"] 244 ): # only emit if at least one slice has been processed 245 self._merge_partitions() 246 self._emit_state_message() 247 self._has_closed_at_least_one_slice = True 248 249 def _add_slice_to_state(self, partition: Partition) -> None: 250 most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get( 251 partition.to_slice() 252 ) 253 254 if self._slice_boundary_fields: 255 if "slices" not in self._concurrent_state: 256 raise RuntimeError( 257 f"The state for stream {self._stream_name} should have at least one slice to delineate the sync start time, but no slices are present. This is unexpected. Please contact Support." 258 ) 259 self._concurrent_state["slices"].append( 260 { 261 self._connector_state_converter.START_KEY: self._extract_from_slice( 262 partition, self._slice_boundary_fields[self._START_BOUNDARY] 263 ), 264 self._connector_state_converter.END_KEY: self._extract_from_slice( 265 partition, self._slice_boundary_fields[self._END_BOUNDARY] 266 ), 267 self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value, 268 } 269 ) 270 elif most_recent_cursor_value: 271 if self._has_closed_at_least_one_slice: 272 # If we track state value using records cursor field, we can only do that if there is one partition. This is because we save 273 # the state every time we close a partition. We assume that if there are multiple slices, they need to be providing 274 # boundaries. There are cases where partitions could not have boundaries: 275 # * The cursor should be per-partition 276 # * The stream state is actually the parent stream state 277 # There might be other cases not listed above. Those are not supported today hence the stream should not use this cursor for 278 # state management. For the specific user that was affected with this issue, we need to: 279 # * Fix state tracking (which is currently broken) 280 # * Make the new version available 281 # * (Probably) ask the user to reset the stream to avoid data loss 282 raise ValueError( 283 "Given that slice_boundary_fields is not defined and that per-partition state is not supported, only one slice is " 284 "expected. Please contact the Airbyte team." 285 ) 286 287 self._concurrent_state["slices"].append( 288 { 289 self._connector_state_converter.START_KEY: self.start, 290 self._connector_state_converter.END_KEY: most_recent_cursor_value, 291 self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value, 292 } 293 ) 294 295 def _emit_state_message(self) -> None: 296 self._connector_state_manager.update_state_for_stream( 297 self._stream_name, 298 self._stream_namespace, 299 self.state, 300 ) 301 state_message = self._connector_state_manager.create_state_message( 302 self._stream_name, self._stream_namespace 303 ) 304 self._message_repository.emit_message(state_message) 305 306 def _merge_partitions(self) -> None: 307 self._concurrent_state["slices"] = self._connector_state_converter.merge_intervals( 308 self._concurrent_state["slices"] 309 ) 310 311 def _extract_from_slice(self, partition: Partition, key: str) -> CursorValueType: 312 try: 313 _slice = partition.to_slice() 314 if not _slice: 315 raise KeyError(f"Could not find key `{key}` in empty slice") 316 return self._connector_state_converter.parse_value(_slice[key]) # type: ignore # we expect the devs to specify a key that would return a CursorValueType 317 except KeyError as exception: 318 raise KeyError( 319 f"Partition is expected to have key `{key}` but could not be found" 320 ) from exception 321 322 def ensure_at_least_one_state_emitted(self) -> None: 323 """ 324 The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be 325 called. 326 """ 327 self._emit_state_message() 328 329 def stream_slices(self) -> Iterable[StreamSlice]: 330 """ 331 Generating slices based on a few parameters: 332 * lookback_window: Buffer to remove from END_KEY of the highest slice 333 * slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created 334 * start: `_split_per_slice_range` will clip any value to `self._start which means that: 335 * if upper is less than self._start, no slices will be generated 336 * if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case) 337 338 Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be 339 inclusive in the API that is queried. 340 """ 341 self._merge_partitions() 342 343 if self._start is not None and self._is_start_before_first_slice(): 344 yield from self._split_per_slice_range( 345 self._start, 346 self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY], 347 False, 348 ) 349 350 if len(self._concurrent_state["slices"]) == 1: 351 yield from self._split_per_slice_range( 352 self._calculate_lower_boundary_of_last_slice( 353 self._concurrent_state["slices"][0][self._connector_state_converter.END_KEY] 354 ), 355 self._end_provider(), 356 True, 357 ) 358 elif len(self._concurrent_state["slices"]) > 1: 359 for i in range(len(self._concurrent_state["slices"]) - 1): 360 if self._cursor_granularity: 361 yield from self._split_per_slice_range( 362 self._concurrent_state["slices"][i][self._connector_state_converter.END_KEY] 363 + self._cursor_granularity, 364 self._concurrent_state["slices"][i + 1][ 365 self._connector_state_converter.START_KEY 366 ], 367 False, 368 ) 369 else: 370 yield from self._split_per_slice_range( 371 self._concurrent_state["slices"][i][ 372 self._connector_state_converter.END_KEY 373 ], 374 self._concurrent_state["slices"][i + 1][ 375 self._connector_state_converter.START_KEY 376 ], 377 False, 378 ) 379 yield from self._split_per_slice_range( 380 self._calculate_lower_boundary_of_last_slice( 381 self._concurrent_state["slices"][-1][self._connector_state_converter.END_KEY] 382 ), 383 self._end_provider(), 384 True, 385 ) 386 else: 387 raise ValueError("Expected at least one slice") 388 389 def _is_start_before_first_slice(self) -> bool: 390 return ( 391 self._start is not None 392 and self._start 393 < self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY] 394 ) 395 396 def _calculate_lower_boundary_of_last_slice( 397 self, lower_boundary: CursorValueType 398 ) -> CursorValueType: 399 if self._lookback_window: 400 return lower_boundary - self._lookback_window 401 return lower_boundary 402 403 def _split_per_slice_range( 404 self, lower: CursorValueType, upper: CursorValueType, upper_is_end: bool 405 ) -> Iterable[StreamSlice]: 406 if lower >= upper: 407 return 408 409 if self._start and upper < self._start: 410 return 411 412 lower = max(lower, self._start) if self._start else lower 413 if not self._slice_range or self._evaluate_upper_safely(lower, self._slice_range) >= upper: 414 clamped_lower = self._clamping_strategy.clamp(lower) 415 clamped_upper = self._clamping_strategy.clamp(upper) 416 start_value, end_value = ( 417 (clamped_lower, clamped_upper - self._cursor_granularity) 418 if self._cursor_granularity and not upper_is_end 419 else (clamped_lower, clamped_upper) 420 ) 421 yield StreamSlice( 422 partition={}, 423 cursor_slice={ 424 self._slice_boundary_fields_wrapper[ 425 self._START_BOUNDARY 426 ]: self._connector_state_converter.output_format(start_value), 427 self._slice_boundary_fields_wrapper[ 428 self._END_BOUNDARY 429 ]: self._connector_state_converter.output_format(end_value), 430 }, 431 ) 432 else: 433 stop_processing = False 434 current_lower_boundary = lower 435 while not stop_processing: 436 current_upper_boundary = min( 437 self._evaluate_upper_safely(current_lower_boundary, self._slice_range), upper 438 ) 439 has_reached_upper_boundary = current_upper_boundary >= upper 440 441 clamped_upper = ( 442 self._clamping_strategy.clamp(current_upper_boundary) 443 if current_upper_boundary != upper 444 else current_upper_boundary 445 ) 446 clamped_lower = self._clamping_strategy.clamp(current_lower_boundary) 447 if clamped_lower >= clamped_upper: 448 # clamping collapsed both values which means that it is time to stop processing 449 # FIXME should this be replace by proper end_provider 450 break 451 start_value, end_value = ( 452 (clamped_lower, clamped_upper - self._cursor_granularity) 453 if self._cursor_granularity 454 and (not upper_is_end or not has_reached_upper_boundary) 455 else (clamped_lower, clamped_upper) 456 ) 457 yield StreamSlice( 458 partition={}, 459 cursor_slice={ 460 self._slice_boundary_fields_wrapper[ 461 self._START_BOUNDARY 462 ]: self._connector_state_converter.output_format(start_value), 463 self._slice_boundary_fields_wrapper[ 464 self._END_BOUNDARY 465 ]: self._connector_state_converter.output_format(end_value), 466 }, 467 ) 468 current_lower_boundary = clamped_upper 469 if current_upper_boundary >= upper: 470 stop_processing = True 471 472 def _evaluate_upper_safely(self, lower: CursorValueType, step: GapType) -> CursorValueType: 473 """ 474 Given that we set the default step at datetime.timedelta.max, we will generate an OverflowError when evaluating the next start_date 475 This method assumes that users would never enter a step that would generate an overflow. Given that would be the case, the code 476 would have broken anyway. 477 """ 478 try: 479 return lower + step 480 except OverflowError: 481 return self._end_provider() 482 483 def should_be_synced(self, record: Record) -> bool: 484 """ 485 Determines if a record should be synced based on its cursor value. 486 :param record: The record to evaluate 487 488 :return: True if the record's cursor value falls within the sync boundaries 489 """ 490 try: 491 record_cursor_value: CursorValueType = self._extract_cursor_value(record) 492 except ValueError: 493 self._log_for_record_without_cursor_value() 494 return True 495 return self.start <= record_cursor_value <= self._end_provider() 496 497 def _log_for_record_without_cursor_value(self) -> None: 498 if not self._should_be_synced_logger_triggered: 499 LOGGER.warning( 500 f"Could not find cursor field `{self.cursor_field.cursor_field_key}` in record for stream {self._stream_name}. The incremental sync will assume it needs to be synced" 501 ) 502 self._should_be_synced_logger_triggered = True
40class CursorField: 41 def __init__(self, cursor_field_key: str) -> None: 42 self.cursor_field_key = cursor_field_key 43 44 def extract_value(self, record: Record) -> CursorValueType: 45 cursor_value = record.data.get(self.cursor_field_key) 46 if cursor_value is None: 47 raise ValueError(f"Could not find cursor field {self.cursor_field_key} in record") 48 return cursor_value # type: ignore # we assume that the value the path points at is a comparable
44 def extract_value(self, record: Record) -> CursorValueType: 45 cursor_value = record.data.get(self.cursor_field_key) 46 if cursor_value is None: 47 raise ValueError(f"Could not find cursor field {self.cursor_field_key} in record") 48 return cursor_value # type: ignore # we assume that the value the path points at is a comparable
51class Cursor(StreamSlicer, ABC): 52 @property 53 @abstractmethod 54 def state(self) -> MutableMapping[str, Any]: ... 55 56 @abstractmethod 57 def observe(self, record: Record) -> None: 58 """ 59 Indicate to the cursor that the record has been emitted 60 """ 61 raise NotImplementedError() 62 63 @abstractmethod 64 def close_partition(self, partition: Partition) -> None: 65 """ 66 Indicate to the cursor that the partition has been successfully processed 67 """ 68 raise NotImplementedError() 69 70 @abstractmethod 71 def ensure_at_least_one_state_emitted(self) -> None: 72 """ 73 State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per 74 stream. Hence, if no partitions are generated, this method needs to be called. 75 """ 76 raise NotImplementedError() 77 78 @abstractmethod 79 def should_be_synced(self, record: Record) -> bool: 80 pass 81 82 def stream_slices(self) -> Iterable[StreamSlice]: 83 """ 84 Default placeholder implementation of generate_slices. 85 Subclasses can override this method to provide actual behavior. 86 """ 87 yield StreamSlice(partition={}, cursor_slice={})
Slices the stream into chunks that can be fetched independently. Slices enable state checkpointing and data retrieval parallelization.
56 @abstractmethod 57 def observe(self, record: Record) -> None: 58 """ 59 Indicate to the cursor that the record has been emitted 60 """ 61 raise NotImplementedError()
Indicate to the cursor that the record has been emitted
63 @abstractmethod 64 def close_partition(self, partition: Partition) -> None: 65 """ 66 Indicate to the cursor that the partition has been successfully processed 67 """ 68 raise NotImplementedError()
Indicate to the cursor that the partition has been successfully processed
70 @abstractmethod 71 def ensure_at_least_one_state_emitted(self) -> None: 72 """ 73 State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per 74 stream. Hence, if no partitions are generated, this method needs to be called. 75 """ 76 raise NotImplementedError()
State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per stream. Hence, if no partitions are generated, this method needs to be called.
82 def stream_slices(self) -> Iterable[StreamSlice]: 83 """ 84 Default placeholder implementation of generate_slices. 85 Subclasses can override this method to provide actual behavior. 86 """ 87 yield StreamSlice(partition={}, cursor_slice={})
Default placeholder implementation of generate_slices. Subclasses can override this method to provide actual behavior.
90class FinalStateCursor(Cursor): 91 """Cursor that is used to guarantee at least one state message is emitted for a concurrent stream.""" 92 93 def __init__( 94 self, 95 stream_name: str, 96 stream_namespace: Optional[str], 97 message_repository: MessageRepository, 98 ) -> None: 99 self._stream_name = stream_name 100 self._stream_namespace = stream_namespace 101 self._message_repository = message_repository 102 # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel 103 # state message rather than manage overall source state. This is also only temporary as we move to the resumable 104 # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state. 105 self._connector_state_manager = ConnectorStateManager() 106 self._has_closed_at_least_one_slice = False 107 108 @property 109 def state(self) -> MutableMapping[str, Any]: 110 return {NO_CURSOR_STATE_KEY: True} 111 112 def observe(self, record: Record) -> None: 113 pass 114 115 def close_partition(self, partition: Partition) -> None: 116 pass 117 118 def ensure_at_least_one_state_emitted(self) -> None: 119 """ 120 Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync 121 """ 122 123 self._connector_state_manager.update_state_for_stream( 124 self._stream_name, self._stream_namespace, self.state 125 ) 126 state_message = self._connector_state_manager.create_state_message( 127 self._stream_name, self._stream_namespace 128 ) 129 self._message_repository.emit_message(state_message) 130 131 def should_be_synced(self, record: Record) -> bool: 132 return True
Cursor that is used to guarantee at least one state message is emitted for a concurrent stream.
93 def __init__( 94 self, 95 stream_name: str, 96 stream_namespace: Optional[str], 97 message_repository: MessageRepository, 98 ) -> None: 99 self._stream_name = stream_name 100 self._stream_namespace = stream_namespace 101 self._message_repository = message_repository 102 # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel 103 # state message rather than manage overall source state. This is also only temporary as we move to the resumable 104 # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state. 105 self._connector_state_manager = ConnectorStateManager() 106 self._has_closed_at_least_one_slice = False
Indicate to the cursor that the partition has been successfully processed
118 def ensure_at_least_one_state_emitted(self) -> None: 119 """ 120 Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync 121 """ 122 123 self._connector_state_manager.update_state_for_stream( 124 self._stream_name, self._stream_namespace, self.state 125 ) 126 state_message = self._connector_state_manager.create_state_message( 127 self._stream_name, self._stream_namespace 128 ) 129 self._message_repository.emit_message(state_message)
Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync
Inherited Members
135class ConcurrentCursor(Cursor): 136 _START_BOUNDARY = 0 137 _END_BOUNDARY = 1 138 139 def __init__( 140 self, 141 stream_name: str, 142 stream_namespace: Optional[str], 143 stream_state: Any, 144 message_repository: MessageRepository, 145 connector_state_manager: ConnectorStateManager, 146 connector_state_converter: AbstractStreamStateConverter, 147 cursor_field: CursorField, 148 slice_boundary_fields: Optional[Tuple[str, str]], 149 start: Optional[CursorValueType], 150 end_provider: Callable[[], CursorValueType], 151 lookback_window: Optional[GapType] = None, 152 slice_range: Optional[GapType] = None, 153 cursor_granularity: Optional[GapType] = None, 154 clamping_strategy: ClampingStrategy = NoClamping(), 155 ) -> None: 156 self._stream_name = stream_name 157 self._stream_namespace = stream_namespace 158 self._message_repository = message_repository 159 self._connector_state_converter = connector_state_converter 160 self._connector_state_manager = connector_state_manager 161 self._cursor_field = cursor_field 162 # To see some example where the slice boundaries might not be defined, check https://github.com/airbytehq/airbyte/blob/1ce84d6396e446e1ac2377362446e3fb94509461/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py#L363-L379 163 self._slice_boundary_fields = slice_boundary_fields 164 self._start = start 165 self._end_provider = end_provider 166 self.start, self._concurrent_state = self._get_concurrent_state(stream_state) 167 self._lookback_window = lookback_window 168 self._slice_range = slice_range 169 self._most_recent_cursor_value_per_partition: MutableMapping[ 170 Union[StreamSlice, Mapping[str, Any], None], Any 171 ] = {} 172 self._has_closed_at_least_one_slice = False 173 self._cursor_granularity = cursor_granularity 174 # Flag to track if the logger has been triggered (per stream) 175 self._should_be_synced_logger_triggered = False 176 self._clamping_strategy = clamping_strategy 177 178 @property 179 def state(self) -> MutableMapping[str, Any]: 180 return self._connector_state_converter.convert_to_state_message( 181 self.cursor_field, self._concurrent_state 182 ) 183 184 @property 185 def cursor_field(self) -> CursorField: 186 return self._cursor_field 187 188 @property 189 def _slice_boundary_fields_wrapper(self) -> Tuple[str, str]: 190 return ( 191 self._slice_boundary_fields 192 if self._slice_boundary_fields 193 else ( 194 self._connector_state_converter.START_KEY, 195 self._connector_state_converter.END_KEY, 196 ) 197 ) 198 199 def _get_concurrent_state( 200 self, state: MutableMapping[str, Any] 201 ) -> Tuple[CursorValueType, MutableMapping[str, Any]]: 202 if self._connector_state_converter.is_state_message_compatible(state): 203 partitioned_state = self._connector_state_converter.deserialize(state) 204 slices_from_partitioned_state = partitioned_state.get("slices", []) 205 206 value_from_partitioned_state = None 207 if slices_from_partitioned_state: 208 # We assume here that the slices have been already merged 209 first_slice = slices_from_partitioned_state[0] 210 value_from_partitioned_state = ( 211 first_slice[self._connector_state_converter.MOST_RECENT_RECORD_KEY] 212 if self._connector_state_converter.MOST_RECENT_RECORD_KEY in first_slice 213 else first_slice[self._connector_state_converter.END_KEY] 214 ) 215 return ( 216 value_from_partitioned_state 217 or self._start 218 or self._connector_state_converter.zero_value, 219 partitioned_state, 220 ) 221 return self._connector_state_converter.convert_from_sequential_state( 222 self._cursor_field, state, self._start 223 ) 224 225 def observe(self, record: Record) -> None: 226 most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get( 227 record.associated_slice 228 ) 229 try: 230 cursor_value = self._extract_cursor_value(record) 231 232 if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value: 233 self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value 234 except ValueError: 235 self._log_for_record_without_cursor_value() 236 237 def _extract_cursor_value(self, record: Record) -> Any: 238 return self._connector_state_converter.parse_value(self._cursor_field.extract_value(record)) 239 240 def close_partition(self, partition: Partition) -> None: 241 slice_count_before = len(self._concurrent_state.get("slices", [])) 242 self._add_slice_to_state(partition) 243 if slice_count_before < len( 244 self._concurrent_state["slices"] 245 ): # only emit if at least one slice has been processed 246 self._merge_partitions() 247 self._emit_state_message() 248 self._has_closed_at_least_one_slice = True 249 250 def _add_slice_to_state(self, partition: Partition) -> None: 251 most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get( 252 partition.to_slice() 253 ) 254 255 if self._slice_boundary_fields: 256 if "slices" not in self._concurrent_state: 257 raise RuntimeError( 258 f"The state for stream {self._stream_name} should have at least one slice to delineate the sync start time, but no slices are present. This is unexpected. Please contact Support." 259 ) 260 self._concurrent_state["slices"].append( 261 { 262 self._connector_state_converter.START_KEY: self._extract_from_slice( 263 partition, self._slice_boundary_fields[self._START_BOUNDARY] 264 ), 265 self._connector_state_converter.END_KEY: self._extract_from_slice( 266 partition, self._slice_boundary_fields[self._END_BOUNDARY] 267 ), 268 self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value, 269 } 270 ) 271 elif most_recent_cursor_value: 272 if self._has_closed_at_least_one_slice: 273 # If we track state value using records cursor field, we can only do that if there is one partition. This is because we save 274 # the state every time we close a partition. We assume that if there are multiple slices, they need to be providing 275 # boundaries. There are cases where partitions could not have boundaries: 276 # * The cursor should be per-partition 277 # * The stream state is actually the parent stream state 278 # There might be other cases not listed above. Those are not supported today hence the stream should not use this cursor for 279 # state management. For the specific user that was affected with this issue, we need to: 280 # * Fix state tracking (which is currently broken) 281 # * Make the new version available 282 # * (Probably) ask the user to reset the stream to avoid data loss 283 raise ValueError( 284 "Given that slice_boundary_fields is not defined and that per-partition state is not supported, only one slice is " 285 "expected. Please contact the Airbyte team." 286 ) 287 288 self._concurrent_state["slices"].append( 289 { 290 self._connector_state_converter.START_KEY: self.start, 291 self._connector_state_converter.END_KEY: most_recent_cursor_value, 292 self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value, 293 } 294 ) 295 296 def _emit_state_message(self) -> None: 297 self._connector_state_manager.update_state_for_stream( 298 self._stream_name, 299 self._stream_namespace, 300 self.state, 301 ) 302 state_message = self._connector_state_manager.create_state_message( 303 self._stream_name, self._stream_namespace 304 ) 305 self._message_repository.emit_message(state_message) 306 307 def _merge_partitions(self) -> None: 308 self._concurrent_state["slices"] = self._connector_state_converter.merge_intervals( 309 self._concurrent_state["slices"] 310 ) 311 312 def _extract_from_slice(self, partition: Partition, key: str) -> CursorValueType: 313 try: 314 _slice = partition.to_slice() 315 if not _slice: 316 raise KeyError(f"Could not find key `{key}` in empty slice") 317 return self._connector_state_converter.parse_value(_slice[key]) # type: ignore # we expect the devs to specify a key that would return a CursorValueType 318 except KeyError as exception: 319 raise KeyError( 320 f"Partition is expected to have key `{key}` but could not be found" 321 ) from exception 322 323 def ensure_at_least_one_state_emitted(self) -> None: 324 """ 325 The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be 326 called. 327 """ 328 self._emit_state_message() 329 330 def stream_slices(self) -> Iterable[StreamSlice]: 331 """ 332 Generating slices based on a few parameters: 333 * lookback_window: Buffer to remove from END_KEY of the highest slice 334 * slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created 335 * start: `_split_per_slice_range` will clip any value to `self._start which means that: 336 * if upper is less than self._start, no slices will be generated 337 * if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case) 338 339 Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be 340 inclusive in the API that is queried. 341 """ 342 self._merge_partitions() 343 344 if self._start is not None and self._is_start_before_first_slice(): 345 yield from self._split_per_slice_range( 346 self._start, 347 self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY], 348 False, 349 ) 350 351 if len(self._concurrent_state["slices"]) == 1: 352 yield from self._split_per_slice_range( 353 self._calculate_lower_boundary_of_last_slice( 354 self._concurrent_state["slices"][0][self._connector_state_converter.END_KEY] 355 ), 356 self._end_provider(), 357 True, 358 ) 359 elif len(self._concurrent_state["slices"]) > 1: 360 for i in range(len(self._concurrent_state["slices"]) - 1): 361 if self._cursor_granularity: 362 yield from self._split_per_slice_range( 363 self._concurrent_state["slices"][i][self._connector_state_converter.END_KEY] 364 + self._cursor_granularity, 365 self._concurrent_state["slices"][i + 1][ 366 self._connector_state_converter.START_KEY 367 ], 368 False, 369 ) 370 else: 371 yield from self._split_per_slice_range( 372 self._concurrent_state["slices"][i][ 373 self._connector_state_converter.END_KEY 374 ], 375 self._concurrent_state["slices"][i + 1][ 376 self._connector_state_converter.START_KEY 377 ], 378 False, 379 ) 380 yield from self._split_per_slice_range( 381 self._calculate_lower_boundary_of_last_slice( 382 self._concurrent_state["slices"][-1][self._connector_state_converter.END_KEY] 383 ), 384 self._end_provider(), 385 True, 386 ) 387 else: 388 raise ValueError("Expected at least one slice") 389 390 def _is_start_before_first_slice(self) -> bool: 391 return ( 392 self._start is not None 393 and self._start 394 < self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY] 395 ) 396 397 def _calculate_lower_boundary_of_last_slice( 398 self, lower_boundary: CursorValueType 399 ) -> CursorValueType: 400 if self._lookback_window: 401 return lower_boundary - self._lookback_window 402 return lower_boundary 403 404 def _split_per_slice_range( 405 self, lower: CursorValueType, upper: CursorValueType, upper_is_end: bool 406 ) -> Iterable[StreamSlice]: 407 if lower >= upper: 408 return 409 410 if self._start and upper < self._start: 411 return 412 413 lower = max(lower, self._start) if self._start else lower 414 if not self._slice_range or self._evaluate_upper_safely(lower, self._slice_range) >= upper: 415 clamped_lower = self._clamping_strategy.clamp(lower) 416 clamped_upper = self._clamping_strategy.clamp(upper) 417 start_value, end_value = ( 418 (clamped_lower, clamped_upper - self._cursor_granularity) 419 if self._cursor_granularity and not upper_is_end 420 else (clamped_lower, clamped_upper) 421 ) 422 yield StreamSlice( 423 partition={}, 424 cursor_slice={ 425 self._slice_boundary_fields_wrapper[ 426 self._START_BOUNDARY 427 ]: self._connector_state_converter.output_format(start_value), 428 self._slice_boundary_fields_wrapper[ 429 self._END_BOUNDARY 430 ]: self._connector_state_converter.output_format(end_value), 431 }, 432 ) 433 else: 434 stop_processing = False 435 current_lower_boundary = lower 436 while not stop_processing: 437 current_upper_boundary = min( 438 self._evaluate_upper_safely(current_lower_boundary, self._slice_range), upper 439 ) 440 has_reached_upper_boundary = current_upper_boundary >= upper 441 442 clamped_upper = ( 443 self._clamping_strategy.clamp(current_upper_boundary) 444 if current_upper_boundary != upper 445 else current_upper_boundary 446 ) 447 clamped_lower = self._clamping_strategy.clamp(current_lower_boundary) 448 if clamped_lower >= clamped_upper: 449 # clamping collapsed both values which means that it is time to stop processing 450 # FIXME should this be replace by proper end_provider 451 break 452 start_value, end_value = ( 453 (clamped_lower, clamped_upper - self._cursor_granularity) 454 if self._cursor_granularity 455 and (not upper_is_end or not has_reached_upper_boundary) 456 else (clamped_lower, clamped_upper) 457 ) 458 yield StreamSlice( 459 partition={}, 460 cursor_slice={ 461 self._slice_boundary_fields_wrapper[ 462 self._START_BOUNDARY 463 ]: self._connector_state_converter.output_format(start_value), 464 self._slice_boundary_fields_wrapper[ 465 self._END_BOUNDARY 466 ]: self._connector_state_converter.output_format(end_value), 467 }, 468 ) 469 current_lower_boundary = clamped_upper 470 if current_upper_boundary >= upper: 471 stop_processing = True 472 473 def _evaluate_upper_safely(self, lower: CursorValueType, step: GapType) -> CursorValueType: 474 """ 475 Given that we set the default step at datetime.timedelta.max, we will generate an OverflowError when evaluating the next start_date 476 This method assumes that users would never enter a step that would generate an overflow. Given that would be the case, the code 477 would have broken anyway. 478 """ 479 try: 480 return lower + step 481 except OverflowError: 482 return self._end_provider() 483 484 def should_be_synced(self, record: Record) -> bool: 485 """ 486 Determines if a record should be synced based on its cursor value. 487 :param record: The record to evaluate 488 489 :return: True if the record's cursor value falls within the sync boundaries 490 """ 491 try: 492 record_cursor_value: CursorValueType = self._extract_cursor_value(record) 493 except ValueError: 494 self._log_for_record_without_cursor_value() 495 return True 496 return self.start <= record_cursor_value <= self._end_provider() 497 498 def _log_for_record_without_cursor_value(self) -> None: 499 if not self._should_be_synced_logger_triggered: 500 LOGGER.warning( 501 f"Could not find cursor field `{self.cursor_field.cursor_field_key}` in record for stream {self._stream_name}. The incremental sync will assume it needs to be synced" 502 ) 503 self._should_be_synced_logger_triggered = True
Slices the stream into chunks that can be fetched independently. Slices enable state checkpointing and data retrieval parallelization.
139 def __init__( 140 self, 141 stream_name: str, 142 stream_namespace: Optional[str], 143 stream_state: Any, 144 message_repository: MessageRepository, 145 connector_state_manager: ConnectorStateManager, 146 connector_state_converter: AbstractStreamStateConverter, 147 cursor_field: CursorField, 148 slice_boundary_fields: Optional[Tuple[str, str]], 149 start: Optional[CursorValueType], 150 end_provider: Callable[[], CursorValueType], 151 lookback_window: Optional[GapType] = None, 152 slice_range: Optional[GapType] = None, 153 cursor_granularity: Optional[GapType] = None, 154 clamping_strategy: ClampingStrategy = NoClamping(), 155 ) -> None: 156 self._stream_name = stream_name 157 self._stream_namespace = stream_namespace 158 self._message_repository = message_repository 159 self._connector_state_converter = connector_state_converter 160 self._connector_state_manager = connector_state_manager 161 self._cursor_field = cursor_field 162 # To see some example where the slice boundaries might not be defined, check https://github.com/airbytehq/airbyte/blob/1ce84d6396e446e1ac2377362446e3fb94509461/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py#L363-L379 163 self._slice_boundary_fields = slice_boundary_fields 164 self._start = start 165 self._end_provider = end_provider 166 self.start, self._concurrent_state = self._get_concurrent_state(stream_state) 167 self._lookback_window = lookback_window 168 self._slice_range = slice_range 169 self._most_recent_cursor_value_per_partition: MutableMapping[ 170 Union[StreamSlice, Mapping[str, Any], None], Any 171 ] = {} 172 self._has_closed_at_least_one_slice = False 173 self._cursor_granularity = cursor_granularity 174 # Flag to track if the logger has been triggered (per stream) 175 self._should_be_synced_logger_triggered = False 176 self._clamping_strategy = clamping_strategy
225 def observe(self, record: Record) -> None: 226 most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get( 227 record.associated_slice 228 ) 229 try: 230 cursor_value = self._extract_cursor_value(record) 231 232 if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value: 233 self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value 234 except ValueError: 235 self._log_for_record_without_cursor_value()
Indicate to the cursor that the record has been emitted
240 def close_partition(self, partition: Partition) -> None: 241 slice_count_before = len(self._concurrent_state.get("slices", [])) 242 self._add_slice_to_state(partition) 243 if slice_count_before < len( 244 self._concurrent_state["slices"] 245 ): # only emit if at least one slice has been processed 246 self._merge_partitions() 247 self._emit_state_message() 248 self._has_closed_at_least_one_slice = True
Indicate to the cursor that the partition has been successfully processed
323 def ensure_at_least_one_state_emitted(self) -> None: 324 """ 325 The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be 326 called. 327 """ 328 self._emit_state_message()
The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be called.
330 def stream_slices(self) -> Iterable[StreamSlice]: 331 """ 332 Generating slices based on a few parameters: 333 * lookback_window: Buffer to remove from END_KEY of the highest slice 334 * slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created 335 * start: `_split_per_slice_range` will clip any value to `self._start which means that: 336 * if upper is less than self._start, no slices will be generated 337 * if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case) 338 339 Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be 340 inclusive in the API that is queried. 341 """ 342 self._merge_partitions() 343 344 if self._start is not None and self._is_start_before_first_slice(): 345 yield from self._split_per_slice_range( 346 self._start, 347 self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY], 348 False, 349 ) 350 351 if len(self._concurrent_state["slices"]) == 1: 352 yield from self._split_per_slice_range( 353 self._calculate_lower_boundary_of_last_slice( 354 self._concurrent_state["slices"][0][self._connector_state_converter.END_KEY] 355 ), 356 self._end_provider(), 357 True, 358 ) 359 elif len(self._concurrent_state["slices"]) > 1: 360 for i in range(len(self._concurrent_state["slices"]) - 1): 361 if self._cursor_granularity: 362 yield from self._split_per_slice_range( 363 self._concurrent_state["slices"][i][self._connector_state_converter.END_KEY] 364 + self._cursor_granularity, 365 self._concurrent_state["slices"][i + 1][ 366 self._connector_state_converter.START_KEY 367 ], 368 False, 369 ) 370 else: 371 yield from self._split_per_slice_range( 372 self._concurrent_state["slices"][i][ 373 self._connector_state_converter.END_KEY 374 ], 375 self._concurrent_state["slices"][i + 1][ 376 self._connector_state_converter.START_KEY 377 ], 378 False, 379 ) 380 yield from self._split_per_slice_range( 381 self._calculate_lower_boundary_of_last_slice( 382 self._concurrent_state["slices"][-1][self._connector_state_converter.END_KEY] 383 ), 384 self._end_provider(), 385 True, 386 ) 387 else: 388 raise ValueError("Expected at least one slice")
Generating slices based on a few parameters:
- lookback_window: Buffer to remove from END_KEY of the highest slice
- slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created
- start:
_split_per_slice_range
will clip any value to `self._start which means that:- if upper is less than self._start, no slices will be generated
- if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case)
Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be inclusive in the API that is queried.
484 def should_be_synced(self, record: Record) -> bool: 485 """ 486 Determines if a record should be synced based on its cursor value. 487 :param record: The record to evaluate 488 489 :return: True if the record's cursor value falls within the sync boundaries 490 """ 491 try: 492 record_cursor_value: CursorValueType = self._extract_cursor_value(record) 493 except ValueError: 494 self._log_for_record_without_cursor_value() 495 return True 496 return self.start <= record_cursor_value <= self._end_provider()
Determines if a record should be synced based on its cursor value.
Parameters
- record: The record to evaluate
Returns
True if the record's cursor value falls within the sync boundaries