airbyte_cdk.sources.streams.concurrent.cursor
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import functools 6import logging 7from abc import ABC, abstractmethod 8from typing import ( 9 Any, 10 Callable, 11 Iterable, 12 List, 13 Mapping, 14 MutableMapping, 15 Optional, 16 Tuple, 17 Union, 18) 19 20from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager 21from airbyte_cdk.sources.message import MessageRepository 22from airbyte_cdk.sources.streams import NO_CURSOR_STATE_KEY 23from airbyte_cdk.sources.streams.concurrent.clamping import ClampingStrategy, NoClamping 24from airbyte_cdk.sources.streams.concurrent.cursor_types import CursorValueType, GapType 25from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition 26from airbyte_cdk.sources.streams.concurrent.partitions.stream_slicer import StreamSlicer 27from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import ( 28 AbstractStreamStateConverter, 29) 30from airbyte_cdk.sources.types import Record, StreamSlice 31 32LOGGER = logging.getLogger("airbyte") 33 34 35def _extract_value(mapping: Mapping[str, Any], path: List[str]) -> Any: 36 return functools.reduce(lambda a, b: a[b], path, mapping) 37 38 39class CursorField: 40 def __init__(self, cursor_field_key: str) -> None: 41 self.cursor_field_key = cursor_field_key 42 43 def extract_value(self, record: Record) -> CursorValueType: 44 cursor_value = record.data.get(self.cursor_field_key) 45 if cursor_value is None: 46 raise ValueError(f"Could not find cursor field {self.cursor_field_key} in record") 47 return cursor_value # type: ignore # we assume that the value the path points at is a comparable 48 49 50class Cursor(StreamSlicer, ABC): 51 @property 52 @abstractmethod 53 def state(self) -> MutableMapping[str, Any]: ... 54 55 @abstractmethod 56 def observe(self, record: Record) -> None: 57 """ 58 Indicate to the cursor that the record has been emitted 59 """ 60 raise NotImplementedError() 61 62 @abstractmethod 63 def close_partition(self, partition: Partition) -> None: 64 """ 65 Indicate to the cursor that the partition has been successfully processed 66 """ 67 raise NotImplementedError() 68 69 @abstractmethod 70 def ensure_at_least_one_state_emitted(self) -> None: 71 """ 72 State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per 73 stream. Hence, if no partitions are generated, this method needs to be called. 74 """ 75 raise NotImplementedError() 76 77 def stream_slices(self) -> Iterable[StreamSlice]: 78 """ 79 Default placeholder implementation of generate_slices. 80 Subclasses can override this method to provide actual behavior. 81 """ 82 yield StreamSlice(partition={}, cursor_slice={}) 83 84 85class FinalStateCursor(Cursor): 86 """Cursor that is used to guarantee at least one state message is emitted for a concurrent stream.""" 87 88 def __init__( 89 self, 90 stream_name: str, 91 stream_namespace: Optional[str], 92 message_repository: MessageRepository, 93 ) -> None: 94 self._stream_name = stream_name 95 self._stream_namespace = stream_namespace 96 self._message_repository = message_repository 97 # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel 98 # state message rather than manage overall source state. This is also only temporary as we move to the resumable 99 # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state. 100 self._connector_state_manager = ConnectorStateManager() 101 self._has_closed_at_least_one_slice = False 102 103 @property 104 def state(self) -> MutableMapping[str, Any]: 105 return {NO_CURSOR_STATE_KEY: True} 106 107 def observe(self, record: Record) -> None: 108 pass 109 110 def close_partition(self, partition: Partition) -> None: 111 pass 112 113 def ensure_at_least_one_state_emitted(self) -> None: 114 """ 115 Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync 116 """ 117 118 self._connector_state_manager.update_state_for_stream( 119 self._stream_name, self._stream_namespace, self.state 120 ) 121 state_message = self._connector_state_manager.create_state_message( 122 self._stream_name, self._stream_namespace 123 ) 124 self._message_repository.emit_message(state_message) 125 126 127class ConcurrentCursor(Cursor): 128 _START_BOUNDARY = 0 129 _END_BOUNDARY = 1 130 131 def __init__( 132 self, 133 stream_name: str, 134 stream_namespace: Optional[str], 135 stream_state: Any, 136 message_repository: MessageRepository, 137 connector_state_manager: ConnectorStateManager, 138 connector_state_converter: AbstractStreamStateConverter, 139 cursor_field: CursorField, 140 slice_boundary_fields: Optional[Tuple[str, str]], 141 start: Optional[CursorValueType], 142 end_provider: Callable[[], CursorValueType], 143 lookback_window: Optional[GapType] = None, 144 slice_range: Optional[GapType] = None, 145 cursor_granularity: Optional[GapType] = None, 146 clamping_strategy: ClampingStrategy = NoClamping(), 147 ) -> None: 148 self._stream_name = stream_name 149 self._stream_namespace = stream_namespace 150 self._message_repository = message_repository 151 self._connector_state_converter = connector_state_converter 152 self._connector_state_manager = connector_state_manager 153 self._cursor_field = cursor_field 154 # To see some example where the slice boundaries might not be defined, check https://github.com/airbytehq/airbyte/blob/1ce84d6396e446e1ac2377362446e3fb94509461/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py#L363-L379 155 self._slice_boundary_fields = slice_boundary_fields 156 self._start = start 157 self._end_provider = end_provider 158 self.start, self._concurrent_state = self._get_concurrent_state(stream_state) 159 self._lookback_window = lookback_window 160 self._slice_range = slice_range 161 self._most_recent_cursor_value_per_partition: MutableMapping[ 162 Union[StreamSlice, Mapping[str, Any], None], Any 163 ] = {} 164 self._has_closed_at_least_one_slice = False 165 self._cursor_granularity = cursor_granularity 166 # Flag to track if the logger has been triggered (per stream) 167 self._should_be_synced_logger_triggered = False 168 self._clamping_strategy = clamping_strategy 169 170 @property 171 def state(self) -> MutableMapping[str, Any]: 172 return self._connector_state_converter.convert_to_state_message( 173 self.cursor_field, self._concurrent_state 174 ) 175 176 @property 177 def cursor_field(self) -> CursorField: 178 return self._cursor_field 179 180 @property 181 def _slice_boundary_fields_wrapper(self) -> Tuple[str, str]: 182 return ( 183 self._slice_boundary_fields 184 if self._slice_boundary_fields 185 else ( 186 self._connector_state_converter.START_KEY, 187 self._connector_state_converter.END_KEY, 188 ) 189 ) 190 191 def _get_concurrent_state( 192 self, state: MutableMapping[str, Any] 193 ) -> Tuple[CursorValueType, MutableMapping[str, Any]]: 194 if self._connector_state_converter.is_state_message_compatible(state): 195 return ( 196 self._start or self._connector_state_converter.zero_value, 197 self._connector_state_converter.deserialize(state), 198 ) 199 return self._connector_state_converter.convert_from_sequential_state( 200 self._cursor_field, state, self._start 201 ) 202 203 def observe(self, record: Record) -> None: 204 most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get( 205 record.associated_slice 206 ) 207 try: 208 cursor_value = self._extract_cursor_value(record) 209 210 if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value: 211 self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value 212 except ValueError: 213 self._log_for_record_without_cursor_value() 214 215 def _extract_cursor_value(self, record: Record) -> Any: 216 return self._connector_state_converter.parse_value(self._cursor_field.extract_value(record)) 217 218 def close_partition(self, partition: Partition) -> None: 219 slice_count_before = len(self._concurrent_state.get("slices", [])) 220 self._add_slice_to_state(partition) 221 if slice_count_before < len( 222 self._concurrent_state["slices"] 223 ): # only emit if at least one slice has been processed 224 self._merge_partitions() 225 self._emit_state_message() 226 self._has_closed_at_least_one_slice = True 227 228 def _add_slice_to_state(self, partition: Partition) -> None: 229 most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get( 230 partition.to_slice() 231 ) 232 233 if self._slice_boundary_fields: 234 if "slices" not in self._concurrent_state: 235 raise RuntimeError( 236 f"The state for stream {self._stream_name} should have at least one slice to delineate the sync start time, but no slices are present. This is unexpected. Please contact Support." 237 ) 238 self._concurrent_state["slices"].append( 239 { 240 self._connector_state_converter.START_KEY: self._extract_from_slice( 241 partition, self._slice_boundary_fields[self._START_BOUNDARY] 242 ), 243 self._connector_state_converter.END_KEY: self._extract_from_slice( 244 partition, self._slice_boundary_fields[self._END_BOUNDARY] 245 ), 246 self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value, 247 } 248 ) 249 elif most_recent_cursor_value: 250 if self._has_closed_at_least_one_slice: 251 # If we track state value using records cursor field, we can only do that if there is one partition. This is because we save 252 # the state every time we close a partition. We assume that if there are multiple slices, they need to be providing 253 # boundaries. There are cases where partitions could not have boundaries: 254 # * The cursor should be per-partition 255 # * The stream state is actually the parent stream state 256 # There might be other cases not listed above. Those are not supported today hence the stream should not use this cursor for 257 # state management. For the specific user that was affected with this issue, we need to: 258 # * Fix state tracking (which is currently broken) 259 # * Make the new version available 260 # * (Probably) ask the user to reset the stream to avoid data loss 261 raise ValueError( 262 "Given that slice_boundary_fields is not defined and that per-partition state is not supported, only one slice is " 263 "expected. Please contact the Airbyte team." 264 ) 265 266 self._concurrent_state["slices"].append( 267 { 268 self._connector_state_converter.START_KEY: self.start, 269 self._connector_state_converter.END_KEY: most_recent_cursor_value, 270 self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value, 271 } 272 ) 273 274 def _emit_state_message(self) -> None: 275 self._connector_state_manager.update_state_for_stream( 276 self._stream_name, 277 self._stream_namespace, 278 self.state, 279 ) 280 state_message = self._connector_state_manager.create_state_message( 281 self._stream_name, self._stream_namespace 282 ) 283 self._message_repository.emit_message(state_message) 284 285 def _merge_partitions(self) -> None: 286 self._concurrent_state["slices"] = self._connector_state_converter.merge_intervals( 287 self._concurrent_state["slices"] 288 ) 289 290 def _extract_from_slice(self, partition: Partition, key: str) -> CursorValueType: 291 try: 292 _slice = partition.to_slice() 293 if not _slice: 294 raise KeyError(f"Could not find key `{key}` in empty slice") 295 return self._connector_state_converter.parse_value(_slice[key]) # type: ignore # we expect the devs to specify a key that would return a CursorValueType 296 except KeyError as exception: 297 raise KeyError( 298 f"Partition is expected to have key `{key}` but could not be found" 299 ) from exception 300 301 def ensure_at_least_one_state_emitted(self) -> None: 302 """ 303 The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be 304 called. 305 """ 306 self._emit_state_message() 307 308 def stream_slices(self) -> Iterable[StreamSlice]: 309 """ 310 Generating slices based on a few parameters: 311 * lookback_window: Buffer to remove from END_KEY of the highest slice 312 * slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created 313 * start: `_split_per_slice_range` will clip any value to `self._start which means that: 314 * if upper is less than self._start, no slices will be generated 315 * if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case) 316 317 Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be 318 inclusive in the API that is queried. 319 """ 320 self._merge_partitions() 321 322 if self._start is not None and self._is_start_before_first_slice(): 323 yield from self._split_per_slice_range( 324 self._start, 325 self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY], 326 False, 327 ) 328 329 if len(self._concurrent_state["slices"]) == 1: 330 yield from self._split_per_slice_range( 331 self._calculate_lower_boundary_of_last_slice( 332 self._concurrent_state["slices"][0][self._connector_state_converter.END_KEY] 333 ), 334 self._end_provider(), 335 True, 336 ) 337 elif len(self._concurrent_state["slices"]) > 1: 338 for i in range(len(self._concurrent_state["slices"]) - 1): 339 if self._cursor_granularity: 340 yield from self._split_per_slice_range( 341 self._concurrent_state["slices"][i][self._connector_state_converter.END_KEY] 342 + self._cursor_granularity, 343 self._concurrent_state["slices"][i + 1][ 344 self._connector_state_converter.START_KEY 345 ], 346 False, 347 ) 348 else: 349 yield from self._split_per_slice_range( 350 self._concurrent_state["slices"][i][ 351 self._connector_state_converter.END_KEY 352 ], 353 self._concurrent_state["slices"][i + 1][ 354 self._connector_state_converter.START_KEY 355 ], 356 False, 357 ) 358 yield from self._split_per_slice_range( 359 self._calculate_lower_boundary_of_last_slice( 360 self._concurrent_state["slices"][-1][self._connector_state_converter.END_KEY] 361 ), 362 self._end_provider(), 363 True, 364 ) 365 else: 366 raise ValueError("Expected at least one slice") 367 368 def _is_start_before_first_slice(self) -> bool: 369 return ( 370 self._start is not None 371 and self._start 372 < self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY] 373 ) 374 375 def _calculate_lower_boundary_of_last_slice( 376 self, lower_boundary: CursorValueType 377 ) -> CursorValueType: 378 if self._lookback_window: 379 return lower_boundary - self._lookback_window 380 return lower_boundary 381 382 def _split_per_slice_range( 383 self, lower: CursorValueType, upper: CursorValueType, upper_is_end: bool 384 ) -> Iterable[StreamSlice]: 385 if lower >= upper: 386 return 387 388 if self._start and upper < self._start: 389 return 390 391 lower = max(lower, self._start) if self._start else lower 392 if not self._slice_range or self._evaluate_upper_safely(lower, self._slice_range) >= upper: 393 clamped_lower = self._clamping_strategy.clamp(lower) 394 clamped_upper = self._clamping_strategy.clamp(upper) 395 start_value, end_value = ( 396 (clamped_lower, clamped_upper - self._cursor_granularity) 397 if self._cursor_granularity and not upper_is_end 398 else (clamped_lower, clamped_upper) 399 ) 400 yield StreamSlice( 401 partition={}, 402 cursor_slice={ 403 self._slice_boundary_fields_wrapper[ 404 self._START_BOUNDARY 405 ]: self._connector_state_converter.output_format(start_value), 406 self._slice_boundary_fields_wrapper[ 407 self._END_BOUNDARY 408 ]: self._connector_state_converter.output_format(end_value), 409 }, 410 ) 411 else: 412 stop_processing = False 413 current_lower_boundary = lower 414 while not stop_processing: 415 current_upper_boundary = min( 416 self._evaluate_upper_safely(current_lower_boundary, self._slice_range), upper 417 ) 418 has_reached_upper_boundary = current_upper_boundary >= upper 419 420 clamped_upper = ( 421 self._clamping_strategy.clamp(current_upper_boundary) 422 if current_upper_boundary != upper 423 else current_upper_boundary 424 ) 425 clamped_lower = self._clamping_strategy.clamp(current_lower_boundary) 426 if clamped_lower >= clamped_upper: 427 # clamping collapsed both values which means that it is time to stop processing 428 # FIXME should this be replace by proper end_provider 429 break 430 start_value, end_value = ( 431 (clamped_lower, clamped_upper - self._cursor_granularity) 432 if self._cursor_granularity 433 and (not upper_is_end or not has_reached_upper_boundary) 434 else (clamped_lower, clamped_upper) 435 ) 436 yield StreamSlice( 437 partition={}, 438 cursor_slice={ 439 self._slice_boundary_fields_wrapper[ 440 self._START_BOUNDARY 441 ]: self._connector_state_converter.output_format(start_value), 442 self._slice_boundary_fields_wrapper[ 443 self._END_BOUNDARY 444 ]: self._connector_state_converter.output_format(end_value), 445 }, 446 ) 447 current_lower_boundary = clamped_upper 448 if current_upper_boundary >= upper: 449 stop_processing = True 450 451 def _evaluate_upper_safely(self, lower: CursorValueType, step: GapType) -> CursorValueType: 452 """ 453 Given that we set the default step at datetime.timedelta.max, we will generate an OverflowError when evaluating the next start_date 454 This method assumes that users would never enter a step that would generate an overflow. Given that would be the case, the code 455 would have broken anyway. 456 """ 457 try: 458 return lower + step 459 except OverflowError: 460 return self._end_provider() 461 462 def should_be_synced(self, record: Record) -> bool: 463 """ 464 Determines if a record should be synced based on its cursor value. 465 :param record: The record to evaluate 466 467 :return: True if the record's cursor value falls within the sync boundaries 468 """ 469 try: 470 record_cursor_value: CursorValueType = self._extract_cursor_value(record) 471 except ValueError: 472 self._log_for_record_without_cursor_value() 473 return True 474 return self.start <= record_cursor_value <= self._end_provider() 475 476 def _log_for_record_without_cursor_value(self) -> None: 477 if not self._should_be_synced_logger_triggered: 478 LOGGER.warning( 479 f"Could not find cursor field `{self.cursor_field.cursor_field_key}` in record for stream {self._stream_name}. The incremental sync will assume it needs to be synced" 480 ) 481 self._should_be_synced_logger_triggered = True
40class CursorField: 41 def __init__(self, cursor_field_key: str) -> None: 42 self.cursor_field_key = cursor_field_key 43 44 def extract_value(self, record: Record) -> CursorValueType: 45 cursor_value = record.data.get(self.cursor_field_key) 46 if cursor_value is None: 47 raise ValueError(f"Could not find cursor field {self.cursor_field_key} in record") 48 return cursor_value # type: ignore # we assume that the value the path points at is a comparable
44 def extract_value(self, record: Record) -> CursorValueType: 45 cursor_value = record.data.get(self.cursor_field_key) 46 if cursor_value is None: 47 raise ValueError(f"Could not find cursor field {self.cursor_field_key} in record") 48 return cursor_value # type: ignore # we assume that the value the path points at is a comparable
51class Cursor(StreamSlicer, ABC): 52 @property 53 @abstractmethod 54 def state(self) -> MutableMapping[str, Any]: ... 55 56 @abstractmethod 57 def observe(self, record: Record) -> None: 58 """ 59 Indicate to the cursor that the record has been emitted 60 """ 61 raise NotImplementedError() 62 63 @abstractmethod 64 def close_partition(self, partition: Partition) -> None: 65 """ 66 Indicate to the cursor that the partition has been successfully processed 67 """ 68 raise NotImplementedError() 69 70 @abstractmethod 71 def ensure_at_least_one_state_emitted(self) -> None: 72 """ 73 State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per 74 stream. Hence, if no partitions are generated, this method needs to be called. 75 """ 76 raise NotImplementedError() 77 78 def stream_slices(self) -> Iterable[StreamSlice]: 79 """ 80 Default placeholder implementation of generate_slices. 81 Subclasses can override this method to provide actual behavior. 82 """ 83 yield StreamSlice(partition={}, cursor_slice={})
Slices the stream into chunks that can be fetched independently. Slices enable state checkpointing and data retrieval parallelization.
56 @abstractmethod 57 def observe(self, record: Record) -> None: 58 """ 59 Indicate to the cursor that the record has been emitted 60 """ 61 raise NotImplementedError()
Indicate to the cursor that the record has been emitted
63 @abstractmethod 64 def close_partition(self, partition: Partition) -> None: 65 """ 66 Indicate to the cursor that the partition has been successfully processed 67 """ 68 raise NotImplementedError()
Indicate to the cursor that the partition has been successfully processed
70 @abstractmethod 71 def ensure_at_least_one_state_emitted(self) -> None: 72 """ 73 State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per 74 stream. Hence, if no partitions are generated, this method needs to be called. 75 """ 76 raise NotImplementedError()
State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per stream. Hence, if no partitions are generated, this method needs to be called.
78 def stream_slices(self) -> Iterable[StreamSlice]: 79 """ 80 Default placeholder implementation of generate_slices. 81 Subclasses can override this method to provide actual behavior. 82 """ 83 yield StreamSlice(partition={}, cursor_slice={})
Default placeholder implementation of generate_slices. Subclasses can override this method to provide actual behavior.
86class FinalStateCursor(Cursor): 87 """Cursor that is used to guarantee at least one state message is emitted for a concurrent stream.""" 88 89 def __init__( 90 self, 91 stream_name: str, 92 stream_namespace: Optional[str], 93 message_repository: MessageRepository, 94 ) -> None: 95 self._stream_name = stream_name 96 self._stream_namespace = stream_namespace 97 self._message_repository = message_repository 98 # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel 99 # state message rather than manage overall source state. This is also only temporary as we move to the resumable 100 # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state. 101 self._connector_state_manager = ConnectorStateManager() 102 self._has_closed_at_least_one_slice = False 103 104 @property 105 def state(self) -> MutableMapping[str, Any]: 106 return {NO_CURSOR_STATE_KEY: True} 107 108 def observe(self, record: Record) -> None: 109 pass 110 111 def close_partition(self, partition: Partition) -> None: 112 pass 113 114 def ensure_at_least_one_state_emitted(self) -> None: 115 """ 116 Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync 117 """ 118 119 self._connector_state_manager.update_state_for_stream( 120 self._stream_name, self._stream_namespace, self.state 121 ) 122 state_message = self._connector_state_manager.create_state_message( 123 self._stream_name, self._stream_namespace 124 ) 125 self._message_repository.emit_message(state_message)
Cursor that is used to guarantee at least one state message is emitted for a concurrent stream.
89 def __init__( 90 self, 91 stream_name: str, 92 stream_namespace: Optional[str], 93 message_repository: MessageRepository, 94 ) -> None: 95 self._stream_name = stream_name 96 self._stream_namespace = stream_namespace 97 self._message_repository = message_repository 98 # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel 99 # state message rather than manage overall source state. This is also only temporary as we move to the resumable 100 # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state. 101 self._connector_state_manager = ConnectorStateManager() 102 self._has_closed_at_least_one_slice = False
Indicate to the cursor that the partition has been successfully processed
114 def ensure_at_least_one_state_emitted(self) -> None: 115 """ 116 Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync 117 """ 118 119 self._connector_state_manager.update_state_for_stream( 120 self._stream_name, self._stream_namespace, self.state 121 ) 122 state_message = self._connector_state_manager.create_state_message( 123 self._stream_name, self._stream_namespace 124 ) 125 self._message_repository.emit_message(state_message)
Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync
Inherited Members
128class ConcurrentCursor(Cursor): 129 _START_BOUNDARY = 0 130 _END_BOUNDARY = 1 131 132 def __init__( 133 self, 134 stream_name: str, 135 stream_namespace: Optional[str], 136 stream_state: Any, 137 message_repository: MessageRepository, 138 connector_state_manager: ConnectorStateManager, 139 connector_state_converter: AbstractStreamStateConverter, 140 cursor_field: CursorField, 141 slice_boundary_fields: Optional[Tuple[str, str]], 142 start: Optional[CursorValueType], 143 end_provider: Callable[[], CursorValueType], 144 lookback_window: Optional[GapType] = None, 145 slice_range: Optional[GapType] = None, 146 cursor_granularity: Optional[GapType] = None, 147 clamping_strategy: ClampingStrategy = NoClamping(), 148 ) -> None: 149 self._stream_name = stream_name 150 self._stream_namespace = stream_namespace 151 self._message_repository = message_repository 152 self._connector_state_converter = connector_state_converter 153 self._connector_state_manager = connector_state_manager 154 self._cursor_field = cursor_field 155 # To see some example where the slice boundaries might not be defined, check https://github.com/airbytehq/airbyte/blob/1ce84d6396e446e1ac2377362446e3fb94509461/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py#L363-L379 156 self._slice_boundary_fields = slice_boundary_fields 157 self._start = start 158 self._end_provider = end_provider 159 self.start, self._concurrent_state = self._get_concurrent_state(stream_state) 160 self._lookback_window = lookback_window 161 self._slice_range = slice_range 162 self._most_recent_cursor_value_per_partition: MutableMapping[ 163 Union[StreamSlice, Mapping[str, Any], None], Any 164 ] = {} 165 self._has_closed_at_least_one_slice = False 166 self._cursor_granularity = cursor_granularity 167 # Flag to track if the logger has been triggered (per stream) 168 self._should_be_synced_logger_triggered = False 169 self._clamping_strategy = clamping_strategy 170 171 @property 172 def state(self) -> MutableMapping[str, Any]: 173 return self._connector_state_converter.convert_to_state_message( 174 self.cursor_field, self._concurrent_state 175 ) 176 177 @property 178 def cursor_field(self) -> CursorField: 179 return self._cursor_field 180 181 @property 182 def _slice_boundary_fields_wrapper(self) -> Tuple[str, str]: 183 return ( 184 self._slice_boundary_fields 185 if self._slice_boundary_fields 186 else ( 187 self._connector_state_converter.START_KEY, 188 self._connector_state_converter.END_KEY, 189 ) 190 ) 191 192 def _get_concurrent_state( 193 self, state: MutableMapping[str, Any] 194 ) -> Tuple[CursorValueType, MutableMapping[str, Any]]: 195 if self._connector_state_converter.is_state_message_compatible(state): 196 return ( 197 self._start or self._connector_state_converter.zero_value, 198 self._connector_state_converter.deserialize(state), 199 ) 200 return self._connector_state_converter.convert_from_sequential_state( 201 self._cursor_field, state, self._start 202 ) 203 204 def observe(self, record: Record) -> None: 205 most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get( 206 record.associated_slice 207 ) 208 try: 209 cursor_value = self._extract_cursor_value(record) 210 211 if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value: 212 self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value 213 except ValueError: 214 self._log_for_record_without_cursor_value() 215 216 def _extract_cursor_value(self, record: Record) -> Any: 217 return self._connector_state_converter.parse_value(self._cursor_field.extract_value(record)) 218 219 def close_partition(self, partition: Partition) -> None: 220 slice_count_before = len(self._concurrent_state.get("slices", [])) 221 self._add_slice_to_state(partition) 222 if slice_count_before < len( 223 self._concurrent_state["slices"] 224 ): # only emit if at least one slice has been processed 225 self._merge_partitions() 226 self._emit_state_message() 227 self._has_closed_at_least_one_slice = True 228 229 def _add_slice_to_state(self, partition: Partition) -> None: 230 most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get( 231 partition.to_slice() 232 ) 233 234 if self._slice_boundary_fields: 235 if "slices" not in self._concurrent_state: 236 raise RuntimeError( 237 f"The state for stream {self._stream_name} should have at least one slice to delineate the sync start time, but no slices are present. This is unexpected. Please contact Support." 238 ) 239 self._concurrent_state["slices"].append( 240 { 241 self._connector_state_converter.START_KEY: self._extract_from_slice( 242 partition, self._slice_boundary_fields[self._START_BOUNDARY] 243 ), 244 self._connector_state_converter.END_KEY: self._extract_from_slice( 245 partition, self._slice_boundary_fields[self._END_BOUNDARY] 246 ), 247 self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value, 248 } 249 ) 250 elif most_recent_cursor_value: 251 if self._has_closed_at_least_one_slice: 252 # If we track state value using records cursor field, we can only do that if there is one partition. This is because we save 253 # the state every time we close a partition. We assume that if there are multiple slices, they need to be providing 254 # boundaries. There are cases where partitions could not have boundaries: 255 # * The cursor should be per-partition 256 # * The stream state is actually the parent stream state 257 # There might be other cases not listed above. Those are not supported today hence the stream should not use this cursor for 258 # state management. For the specific user that was affected with this issue, we need to: 259 # * Fix state tracking (which is currently broken) 260 # * Make the new version available 261 # * (Probably) ask the user to reset the stream to avoid data loss 262 raise ValueError( 263 "Given that slice_boundary_fields is not defined and that per-partition state is not supported, only one slice is " 264 "expected. Please contact the Airbyte team." 265 ) 266 267 self._concurrent_state["slices"].append( 268 { 269 self._connector_state_converter.START_KEY: self.start, 270 self._connector_state_converter.END_KEY: most_recent_cursor_value, 271 self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value, 272 } 273 ) 274 275 def _emit_state_message(self) -> None: 276 self._connector_state_manager.update_state_for_stream( 277 self._stream_name, 278 self._stream_namespace, 279 self.state, 280 ) 281 state_message = self._connector_state_manager.create_state_message( 282 self._stream_name, self._stream_namespace 283 ) 284 self._message_repository.emit_message(state_message) 285 286 def _merge_partitions(self) -> None: 287 self._concurrent_state["slices"] = self._connector_state_converter.merge_intervals( 288 self._concurrent_state["slices"] 289 ) 290 291 def _extract_from_slice(self, partition: Partition, key: str) -> CursorValueType: 292 try: 293 _slice = partition.to_slice() 294 if not _slice: 295 raise KeyError(f"Could not find key `{key}` in empty slice") 296 return self._connector_state_converter.parse_value(_slice[key]) # type: ignore # we expect the devs to specify a key that would return a CursorValueType 297 except KeyError as exception: 298 raise KeyError( 299 f"Partition is expected to have key `{key}` but could not be found" 300 ) from exception 301 302 def ensure_at_least_one_state_emitted(self) -> None: 303 """ 304 The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be 305 called. 306 """ 307 self._emit_state_message() 308 309 def stream_slices(self) -> Iterable[StreamSlice]: 310 """ 311 Generating slices based on a few parameters: 312 * lookback_window: Buffer to remove from END_KEY of the highest slice 313 * slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created 314 * start: `_split_per_slice_range` will clip any value to `self._start which means that: 315 * if upper is less than self._start, no slices will be generated 316 * if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case) 317 318 Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be 319 inclusive in the API that is queried. 320 """ 321 self._merge_partitions() 322 323 if self._start is not None and self._is_start_before_first_slice(): 324 yield from self._split_per_slice_range( 325 self._start, 326 self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY], 327 False, 328 ) 329 330 if len(self._concurrent_state["slices"]) == 1: 331 yield from self._split_per_slice_range( 332 self._calculate_lower_boundary_of_last_slice( 333 self._concurrent_state["slices"][0][self._connector_state_converter.END_KEY] 334 ), 335 self._end_provider(), 336 True, 337 ) 338 elif len(self._concurrent_state["slices"]) > 1: 339 for i in range(len(self._concurrent_state["slices"]) - 1): 340 if self._cursor_granularity: 341 yield from self._split_per_slice_range( 342 self._concurrent_state["slices"][i][self._connector_state_converter.END_KEY] 343 + self._cursor_granularity, 344 self._concurrent_state["slices"][i + 1][ 345 self._connector_state_converter.START_KEY 346 ], 347 False, 348 ) 349 else: 350 yield from self._split_per_slice_range( 351 self._concurrent_state["slices"][i][ 352 self._connector_state_converter.END_KEY 353 ], 354 self._concurrent_state["slices"][i + 1][ 355 self._connector_state_converter.START_KEY 356 ], 357 False, 358 ) 359 yield from self._split_per_slice_range( 360 self._calculate_lower_boundary_of_last_slice( 361 self._concurrent_state["slices"][-1][self._connector_state_converter.END_KEY] 362 ), 363 self._end_provider(), 364 True, 365 ) 366 else: 367 raise ValueError("Expected at least one slice") 368 369 def _is_start_before_first_slice(self) -> bool: 370 return ( 371 self._start is not None 372 and self._start 373 < self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY] 374 ) 375 376 def _calculate_lower_boundary_of_last_slice( 377 self, lower_boundary: CursorValueType 378 ) -> CursorValueType: 379 if self._lookback_window: 380 return lower_boundary - self._lookback_window 381 return lower_boundary 382 383 def _split_per_slice_range( 384 self, lower: CursorValueType, upper: CursorValueType, upper_is_end: bool 385 ) -> Iterable[StreamSlice]: 386 if lower >= upper: 387 return 388 389 if self._start and upper < self._start: 390 return 391 392 lower = max(lower, self._start) if self._start else lower 393 if not self._slice_range or self._evaluate_upper_safely(lower, self._slice_range) >= upper: 394 clamped_lower = self._clamping_strategy.clamp(lower) 395 clamped_upper = self._clamping_strategy.clamp(upper) 396 start_value, end_value = ( 397 (clamped_lower, clamped_upper - self._cursor_granularity) 398 if self._cursor_granularity and not upper_is_end 399 else (clamped_lower, clamped_upper) 400 ) 401 yield StreamSlice( 402 partition={}, 403 cursor_slice={ 404 self._slice_boundary_fields_wrapper[ 405 self._START_BOUNDARY 406 ]: self._connector_state_converter.output_format(start_value), 407 self._slice_boundary_fields_wrapper[ 408 self._END_BOUNDARY 409 ]: self._connector_state_converter.output_format(end_value), 410 }, 411 ) 412 else: 413 stop_processing = False 414 current_lower_boundary = lower 415 while not stop_processing: 416 current_upper_boundary = min( 417 self._evaluate_upper_safely(current_lower_boundary, self._slice_range), upper 418 ) 419 has_reached_upper_boundary = current_upper_boundary >= upper 420 421 clamped_upper = ( 422 self._clamping_strategy.clamp(current_upper_boundary) 423 if current_upper_boundary != upper 424 else current_upper_boundary 425 ) 426 clamped_lower = self._clamping_strategy.clamp(current_lower_boundary) 427 if clamped_lower >= clamped_upper: 428 # clamping collapsed both values which means that it is time to stop processing 429 # FIXME should this be replace by proper end_provider 430 break 431 start_value, end_value = ( 432 (clamped_lower, clamped_upper - self._cursor_granularity) 433 if self._cursor_granularity 434 and (not upper_is_end or not has_reached_upper_boundary) 435 else (clamped_lower, clamped_upper) 436 ) 437 yield StreamSlice( 438 partition={}, 439 cursor_slice={ 440 self._slice_boundary_fields_wrapper[ 441 self._START_BOUNDARY 442 ]: self._connector_state_converter.output_format(start_value), 443 self._slice_boundary_fields_wrapper[ 444 self._END_BOUNDARY 445 ]: self._connector_state_converter.output_format(end_value), 446 }, 447 ) 448 current_lower_boundary = clamped_upper 449 if current_upper_boundary >= upper: 450 stop_processing = True 451 452 def _evaluate_upper_safely(self, lower: CursorValueType, step: GapType) -> CursorValueType: 453 """ 454 Given that we set the default step at datetime.timedelta.max, we will generate an OverflowError when evaluating the next start_date 455 This method assumes that users would never enter a step that would generate an overflow. Given that would be the case, the code 456 would have broken anyway. 457 """ 458 try: 459 return lower + step 460 except OverflowError: 461 return self._end_provider() 462 463 def should_be_synced(self, record: Record) -> bool: 464 """ 465 Determines if a record should be synced based on its cursor value. 466 :param record: The record to evaluate 467 468 :return: True if the record's cursor value falls within the sync boundaries 469 """ 470 try: 471 record_cursor_value: CursorValueType = self._extract_cursor_value(record) 472 except ValueError: 473 self._log_for_record_without_cursor_value() 474 return True 475 return self.start <= record_cursor_value <= self._end_provider() 476 477 def _log_for_record_without_cursor_value(self) -> None: 478 if not self._should_be_synced_logger_triggered: 479 LOGGER.warning( 480 f"Could not find cursor field `{self.cursor_field.cursor_field_key}` in record for stream {self._stream_name}. The incremental sync will assume it needs to be synced" 481 ) 482 self._should_be_synced_logger_triggered = True
Slices the stream into chunks that can be fetched independently. Slices enable state checkpointing and data retrieval parallelization.
132 def __init__( 133 self, 134 stream_name: str, 135 stream_namespace: Optional[str], 136 stream_state: Any, 137 message_repository: MessageRepository, 138 connector_state_manager: ConnectorStateManager, 139 connector_state_converter: AbstractStreamStateConverter, 140 cursor_field: CursorField, 141 slice_boundary_fields: Optional[Tuple[str, str]], 142 start: Optional[CursorValueType], 143 end_provider: Callable[[], CursorValueType], 144 lookback_window: Optional[GapType] = None, 145 slice_range: Optional[GapType] = None, 146 cursor_granularity: Optional[GapType] = None, 147 clamping_strategy: ClampingStrategy = NoClamping(), 148 ) -> None: 149 self._stream_name = stream_name 150 self._stream_namespace = stream_namespace 151 self._message_repository = message_repository 152 self._connector_state_converter = connector_state_converter 153 self._connector_state_manager = connector_state_manager 154 self._cursor_field = cursor_field 155 # To see some example where the slice boundaries might not be defined, check https://github.com/airbytehq/airbyte/blob/1ce84d6396e446e1ac2377362446e3fb94509461/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py#L363-L379 156 self._slice_boundary_fields = slice_boundary_fields 157 self._start = start 158 self._end_provider = end_provider 159 self.start, self._concurrent_state = self._get_concurrent_state(stream_state) 160 self._lookback_window = lookback_window 161 self._slice_range = slice_range 162 self._most_recent_cursor_value_per_partition: MutableMapping[ 163 Union[StreamSlice, Mapping[str, Any], None], Any 164 ] = {} 165 self._has_closed_at_least_one_slice = False 166 self._cursor_granularity = cursor_granularity 167 # Flag to track if the logger has been triggered (per stream) 168 self._should_be_synced_logger_triggered = False 169 self._clamping_strategy = clamping_strategy
204 def observe(self, record: Record) -> None: 205 most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get( 206 record.associated_slice 207 ) 208 try: 209 cursor_value = self._extract_cursor_value(record) 210 211 if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value: 212 self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value 213 except ValueError: 214 self._log_for_record_without_cursor_value()
Indicate to the cursor that the record has been emitted
219 def close_partition(self, partition: Partition) -> None: 220 slice_count_before = len(self._concurrent_state.get("slices", [])) 221 self._add_slice_to_state(partition) 222 if slice_count_before < len( 223 self._concurrent_state["slices"] 224 ): # only emit if at least one slice has been processed 225 self._merge_partitions() 226 self._emit_state_message() 227 self._has_closed_at_least_one_slice = True
Indicate to the cursor that the partition has been successfully processed
302 def ensure_at_least_one_state_emitted(self) -> None: 303 """ 304 The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be 305 called. 306 """ 307 self._emit_state_message()
The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be called.
309 def stream_slices(self) -> Iterable[StreamSlice]: 310 """ 311 Generating slices based on a few parameters: 312 * lookback_window: Buffer to remove from END_KEY of the highest slice 313 * slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created 314 * start: `_split_per_slice_range` will clip any value to `self._start which means that: 315 * if upper is less than self._start, no slices will be generated 316 * if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case) 317 318 Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be 319 inclusive in the API that is queried. 320 """ 321 self._merge_partitions() 322 323 if self._start is not None and self._is_start_before_first_slice(): 324 yield from self._split_per_slice_range( 325 self._start, 326 self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY], 327 False, 328 ) 329 330 if len(self._concurrent_state["slices"]) == 1: 331 yield from self._split_per_slice_range( 332 self._calculate_lower_boundary_of_last_slice( 333 self._concurrent_state["slices"][0][self._connector_state_converter.END_KEY] 334 ), 335 self._end_provider(), 336 True, 337 ) 338 elif len(self._concurrent_state["slices"]) > 1: 339 for i in range(len(self._concurrent_state["slices"]) - 1): 340 if self._cursor_granularity: 341 yield from self._split_per_slice_range( 342 self._concurrent_state["slices"][i][self._connector_state_converter.END_KEY] 343 + self._cursor_granularity, 344 self._concurrent_state["slices"][i + 1][ 345 self._connector_state_converter.START_KEY 346 ], 347 False, 348 ) 349 else: 350 yield from self._split_per_slice_range( 351 self._concurrent_state["slices"][i][ 352 self._connector_state_converter.END_KEY 353 ], 354 self._concurrent_state["slices"][i + 1][ 355 self._connector_state_converter.START_KEY 356 ], 357 False, 358 ) 359 yield from self._split_per_slice_range( 360 self._calculate_lower_boundary_of_last_slice( 361 self._concurrent_state["slices"][-1][self._connector_state_converter.END_KEY] 362 ), 363 self._end_provider(), 364 True, 365 ) 366 else: 367 raise ValueError("Expected at least one slice")
Generating slices based on a few parameters:
- lookback_window: Buffer to remove from END_KEY of the highest slice
- slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created
- start:
_split_per_slice_range
will clip any value to `self._start which means that:- if upper is less than self._start, no slices will be generated
- if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case)
Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be inclusive in the API that is queried.
463 def should_be_synced(self, record: Record) -> bool: 464 """ 465 Determines if a record should be synced based on its cursor value. 466 :param record: The record to evaluate 467 468 :return: True if the record's cursor value falls within the sync boundaries 469 """ 470 try: 471 record_cursor_value: CursorValueType = self._extract_cursor_value(record) 472 except ValueError: 473 self._log_for_record_without_cursor_value() 474 return True 475 return self.start <= record_cursor_value <= self._end_provider()
Determines if a record should be synced based on its cursor value.
Parameters
- record: The record to evaluate
Returns
True if the record's cursor value falls within the sync boundaries