airbyte_cdk.sources.streams.concurrent.cursor
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import functools 6import logging 7import threading 8from abc import ABC, abstractmethod 9from typing import ( 10 Any, 11 Callable, 12 Iterable, 13 List, 14 Mapping, 15 MutableMapping, 16 Optional, 17 Tuple, 18 Union, 19) 20 21from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager 22from airbyte_cdk.sources.message import MessageRepository, NoopMessageRepository 23from airbyte_cdk.sources.streams import NO_CURSOR_STATE_KEY 24from airbyte_cdk.sources.streams.concurrent.clamping import ClampingStrategy, NoClamping 25from airbyte_cdk.sources.streams.concurrent.cursor_types import CursorValueType, GapType 26from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition 27from airbyte_cdk.sources.streams.concurrent.partitions.stream_slicer import StreamSlicer 28from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import ( 29 AbstractStreamStateConverter, 30) 31from airbyte_cdk.sources.types import Record, StreamSlice 32 33LOGGER = logging.getLogger("airbyte") 34 35 36def _extract_value(mapping: Mapping[str, Any], path: List[str]) -> Any: 37 return functools.reduce(lambda a, b: a[b], path, mapping) 38 39 40class CursorField: 41 def __init__(self, cursor_field_key: str) -> None: 42 self.cursor_field_key = cursor_field_key 43 44 def extract_value(self, record: Record) -> Any: 45 cursor_value = record.data.get(self.cursor_field_key) 46 if cursor_value is None: 47 raise ValueError(f"Could not find cursor field {self.cursor_field_key} in record") 48 return cursor_value # type: ignore # we assume that the value the path points at is a comparable 49 50 51class Cursor(StreamSlicer, ABC): 52 @property 53 @abstractmethod 54 def state(self) -> MutableMapping[str, Any]: ... 55 56 @abstractmethod 57 def observe(self, record: Record) -> None: 58 """ 59 Indicate to the cursor that the record has been emitted 60 """ 61 raise NotImplementedError() 62 63 @abstractmethod 64 def close_partition(self, partition: Partition) -> None: 65 """ 66 Indicate to the cursor that the partition has been successfully processed 67 """ 68 raise NotImplementedError() 69 70 @abstractmethod 71 def ensure_at_least_one_state_emitted(self) -> None: 72 """ 73 State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per 74 stream. Hence, if no partitions are generated, this method needs to be called. 75 """ 76 raise NotImplementedError() 77 78 @abstractmethod 79 def should_be_synced(self, record: Record) -> bool: 80 pass 81 82 def stream_slices(self) -> Iterable[StreamSlice]: 83 """ 84 Default placeholder implementation of generate_slices. 85 Subclasses can override this method to provide actual behavior. 86 """ 87 yield StreamSlice(partition={}, cursor_slice={}) 88 89 90class FinalStateCursor(Cursor): 91 """Cursor that is used to guarantee at least one state message is emitted for a concurrent stream.""" 92 93 def __init__( 94 self, 95 stream_name: str, 96 stream_namespace: Optional[str], 97 message_repository: MessageRepository, 98 ) -> None: 99 self._stream_name = stream_name 100 self._stream_namespace = stream_namespace 101 self._message_repository = message_repository 102 # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel 103 # state message rather than manage overall source state. This is also only temporary as we move to the resumable 104 # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state. 105 self._connector_state_manager = ConnectorStateManager() 106 self._has_closed_at_least_one_slice = False 107 108 @property 109 def state(self) -> MutableMapping[str, Any]: 110 return {NO_CURSOR_STATE_KEY: True} 111 112 def observe(self, record: Record) -> None: 113 pass 114 115 def close_partition(self, partition: Partition) -> None: 116 pass 117 118 def ensure_at_least_one_state_emitted(self) -> None: 119 """ 120 Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync 121 """ 122 123 self._connector_state_manager.update_state_for_stream( 124 self._stream_name, self._stream_namespace, self.state 125 ) 126 state_message = self._connector_state_manager.create_state_message( 127 self._stream_name, self._stream_namespace 128 ) 129 self._message_repository.emit_message(state_message) 130 131 def should_be_synced(self, record: Record) -> bool: 132 return True 133 134 135class ConcurrentCursor(Cursor): 136 _START_BOUNDARY = 0 137 _END_BOUNDARY = 1 138 139 def copy_without_state(self) -> "ConcurrentCursor": 140 return self.__class__( 141 stream_name=self._stream_name, 142 stream_namespace=self._stream_namespace, 143 stream_state={}, 144 message_repository=NoopMessageRepository(), 145 connector_state_manager=ConnectorStateManager(), 146 connector_state_converter=self._connector_state_converter, 147 cursor_field=self._cursor_field, 148 slice_boundary_fields=self._slice_boundary_fields, 149 start=self._start, 150 end_provider=self._end_provider, 151 lookback_window=self._lookback_window, 152 slice_range=self._slice_range, 153 cursor_granularity=self._cursor_granularity, 154 clamping_strategy=self._clamping_strategy, 155 ) 156 157 def __init__( 158 self, 159 stream_name: str, 160 stream_namespace: Optional[str], 161 stream_state: Any, 162 message_repository: MessageRepository, 163 connector_state_manager: ConnectorStateManager, 164 connector_state_converter: AbstractStreamStateConverter, 165 cursor_field: CursorField, 166 slice_boundary_fields: Optional[Tuple[str, str]], 167 start: Optional[CursorValueType], 168 end_provider: Callable[[], CursorValueType], 169 lookback_window: Optional[GapType] = None, 170 slice_range: Optional[GapType] = None, 171 cursor_granularity: Optional[GapType] = None, 172 clamping_strategy: ClampingStrategy = NoClamping(), 173 ) -> None: 174 self._stream_name = stream_name 175 self._stream_namespace = stream_namespace 176 self._message_repository = message_repository 177 self._connector_state_converter = connector_state_converter 178 self._connector_state_manager = connector_state_manager 179 self._cursor_field = cursor_field 180 # To see some example where the slice boundaries might not be defined, check https://github.com/airbytehq/airbyte/blob/1ce84d6396e446e1ac2377362446e3fb94509461/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py#L363-L379 181 self._slice_boundary_fields = slice_boundary_fields 182 self._start = start 183 self._end_provider = end_provider 184 self.start, self._concurrent_state = self._get_concurrent_state(stream_state) 185 self._lookback_window = lookback_window 186 self._slice_range = slice_range 187 self._most_recent_cursor_value_per_partition: MutableMapping[ 188 Union[StreamSlice, Mapping[str, Any], None], Any 189 ] = {} 190 self._has_closed_at_least_one_slice = False 191 self._cursor_granularity = cursor_granularity 192 # Flag to track if the logger has been triggered (per stream) 193 self._should_be_synced_logger_triggered = False 194 self._clamping_strategy = clamping_strategy 195 self._is_ascending_order = True 196 197 # A lock is required when closing a partition because updating the cursor's concurrent_state is 198 # not thread safe. When multiple partitions are being closed by the cursor at the same time, it is 199 # possible for one partition to update concurrent_state after a second partition has already read 200 # the previous state. This can lead to the second partition overwriting the previous one's state. 201 self._lock = threading.Lock() 202 203 @property 204 def state(self) -> MutableMapping[str, Any]: 205 return self._connector_state_converter.convert_to_state_message( 206 self.cursor_field, self._concurrent_state 207 ) 208 209 @property 210 def cursor_field(self) -> CursorField: 211 return self._cursor_field 212 213 @property 214 def _slice_boundary_fields_wrapper(self) -> Tuple[str, str]: 215 return ( 216 self._slice_boundary_fields 217 if self._slice_boundary_fields 218 else ( 219 self._connector_state_converter.START_KEY, 220 self._connector_state_converter.END_KEY, 221 ) 222 ) 223 224 def _get_concurrent_state( 225 self, state: MutableMapping[str, Any] 226 ) -> Tuple[CursorValueType, MutableMapping[str, Any]]: 227 if self._connector_state_converter.is_state_message_compatible(state): 228 partitioned_state = self._connector_state_converter.deserialize(state) 229 slices_from_partitioned_state = partitioned_state.get("slices", []) 230 231 value_from_partitioned_state = None 232 if slices_from_partitioned_state: 233 # We assume here that the slices have been already merged 234 first_slice = slices_from_partitioned_state[0] 235 value_from_partitioned_state = ( 236 first_slice[self._connector_state_converter.MOST_RECENT_RECORD_KEY] 237 if self._connector_state_converter.MOST_RECENT_RECORD_KEY in first_slice 238 else first_slice[self._connector_state_converter.END_KEY] 239 ) 240 return ( 241 value_from_partitioned_state 242 or self._start 243 or self._connector_state_converter.zero_value, 244 partitioned_state, 245 ) 246 return self._connector_state_converter.convert_from_sequential_state( 247 self._cursor_field, state, self._start 248 ) 249 250 def observe(self, record: Record) -> None: 251 # Because observe writes to the most_recent_cursor_value_per_partition mapping, 252 # it is not thread-safe. However, this shouldn't lead to concurrency issues because 253 # observe() is only invoked by PartitionReader.process_partition(). Since the map is 254 # broken down according to partition, concurrent threads processing only read/write 255 # from different keys which avoids any conflicts. 256 # 257 # If we were to add thread safety, we should implement a lock per-partition 258 # which is instantiated during stream_slices() 259 most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get( 260 record.associated_slice 261 ) 262 try: 263 cursor_value = self._extract_cursor_value(record) 264 265 if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value: 266 self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value 267 elif most_recent_cursor_value > cursor_value: 268 self._is_ascending_order = False 269 except ValueError: 270 self._log_for_record_without_cursor_value() 271 272 def _extract_cursor_value(self, record: Record) -> Any: 273 return self._connector_state_converter.parse_value(self._cursor_field.extract_value(record)) 274 275 def close_partition(self, partition: Partition) -> None: 276 with self._lock: 277 slice_count_before = len(self._concurrent_state.get("slices", [])) 278 self._add_slice_to_state(partition) 279 if slice_count_before < len( 280 self._concurrent_state["slices"] 281 ): # only emit if at least one slice has been processed 282 self._merge_partitions() 283 self._emit_state_message() 284 self._has_closed_at_least_one_slice = True 285 286 def _add_slice_to_state(self, partition: Partition) -> None: 287 most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get( 288 partition.to_slice() 289 ) 290 291 if self._slice_boundary_fields: 292 if "slices" not in self._concurrent_state: 293 raise RuntimeError( 294 f"The state for stream {self._stream_name} should have at least one slice to delineate the sync start time, but no slices are present. This is unexpected. Please contact Support." 295 ) 296 self._concurrent_state["slices"].append( 297 { 298 self._connector_state_converter.START_KEY: self._extract_from_slice( 299 partition, self._slice_boundary_fields[self._START_BOUNDARY] 300 ), 301 self._connector_state_converter.END_KEY: self._extract_from_slice( 302 partition, self._slice_boundary_fields[self._END_BOUNDARY] 303 ), 304 self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value, 305 } 306 ) 307 elif most_recent_cursor_value: 308 if self._has_closed_at_least_one_slice: 309 # If we track state value using records cursor field, we can only do that if there is one partition. This is because we save 310 # the state every time we close a partition. We assume that if there are multiple slices, they need to be providing 311 # boundaries. There are cases where partitions could not have boundaries: 312 # * The cursor should be per-partition 313 # * The stream state is actually the parent stream state 314 # There might be other cases not listed above. Those are not supported today hence the stream should not use this cursor for 315 # state management. For the specific user that was affected with this issue, we need to: 316 # * Fix state tracking (which is currently broken) 317 # * Make the new version available 318 # * (Probably) ask the user to reset the stream to avoid data loss 319 raise ValueError( 320 "Given that slice_boundary_fields is not defined and that per-partition state is not supported, only one slice is " 321 "expected. Please contact the Airbyte team." 322 ) 323 324 self._concurrent_state["slices"].append( 325 { 326 self._connector_state_converter.START_KEY: self.start, 327 self._connector_state_converter.END_KEY: most_recent_cursor_value, 328 self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value, 329 } 330 ) 331 332 def _emit_state_message(self) -> None: 333 self._connector_state_manager.update_state_for_stream( 334 self._stream_name, 335 self._stream_namespace, 336 self.state, 337 ) 338 state_message = self._connector_state_manager.create_state_message( 339 self._stream_name, self._stream_namespace 340 ) 341 self._message_repository.emit_message(state_message) 342 343 def _merge_partitions(self) -> None: 344 self._concurrent_state["slices"] = self._connector_state_converter.merge_intervals( 345 self._concurrent_state["slices"] 346 ) 347 348 def _extract_from_slice(self, partition: Partition, key: str) -> CursorValueType: 349 try: 350 _slice = partition.to_slice() 351 if not _slice: 352 raise KeyError(f"Could not find key `{key}` in empty slice") 353 return self._connector_state_converter.parse_value(_slice[key]) # type: ignore # we expect the devs to specify a key that would return a CursorValueType 354 except KeyError as exception: 355 raise KeyError( 356 f"Partition is expected to have key `{key}` but could not be found" 357 ) from exception 358 359 def ensure_at_least_one_state_emitted(self) -> None: 360 """ 361 The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be 362 called. 363 """ 364 self._emit_state_message() 365 366 def stream_slices(self) -> Iterable[StreamSlice]: 367 """ 368 Generating slices based on a few parameters: 369 * lookback_window: Buffer to remove from END_KEY of the highest slice 370 * slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created 371 * start: `_split_per_slice_range` will clip any value to `self._start which means that: 372 * if upper is less than self._start, no slices will be generated 373 * if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case) 374 375 Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be 376 inclusive in the API that is queried. 377 """ 378 self._merge_partitions() 379 380 if self._start is not None and self._is_start_before_first_slice(): 381 yield from self._split_per_slice_range( 382 self._start, 383 self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY], 384 False, 385 ) 386 387 if len(self._concurrent_state["slices"]) == 1: 388 yield from self._split_per_slice_range( 389 self._calculate_lower_boundary_of_last_slice( 390 self._concurrent_state["slices"][0][self._connector_state_converter.END_KEY] 391 ), 392 self._end_provider(), 393 True, 394 ) 395 elif len(self._concurrent_state["slices"]) > 1: 396 for i in range(len(self._concurrent_state["slices"]) - 1): 397 if self._cursor_granularity: 398 yield from self._split_per_slice_range( 399 self._concurrent_state["slices"][i][self._connector_state_converter.END_KEY] 400 + self._cursor_granularity, 401 self._concurrent_state["slices"][i + 1][ 402 self._connector_state_converter.START_KEY 403 ], 404 False, 405 ) 406 else: 407 yield from self._split_per_slice_range( 408 self._concurrent_state["slices"][i][ 409 self._connector_state_converter.END_KEY 410 ], 411 self._concurrent_state["slices"][i + 1][ 412 self._connector_state_converter.START_KEY 413 ], 414 False, 415 ) 416 yield from self._split_per_slice_range( 417 self._calculate_lower_boundary_of_last_slice( 418 self._concurrent_state["slices"][-1][self._connector_state_converter.END_KEY] 419 ), 420 self._end_provider(), 421 True, 422 ) 423 else: 424 raise ValueError("Expected at least one slice") 425 426 def _is_start_before_first_slice(self) -> bool: 427 return ( 428 self._start is not None 429 and self._start 430 < self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY] 431 ) 432 433 def _calculate_lower_boundary_of_last_slice( 434 self, lower_boundary: CursorValueType 435 ) -> CursorValueType: 436 if self._lookback_window: 437 return lower_boundary - self._lookback_window 438 return lower_boundary 439 440 def _split_per_slice_range( 441 self, lower: CursorValueType, upper: CursorValueType, upper_is_end: bool 442 ) -> Iterable[StreamSlice]: 443 if lower >= upper: 444 return 445 446 if self._start and upper < self._start: 447 return 448 449 lower = max(lower, self._start) if self._start else lower 450 if not self._slice_range or self._evaluate_upper_safely(lower, self._slice_range) >= upper: 451 clamped_lower = self._clamping_strategy.clamp(lower) 452 clamped_upper = self._clamping_strategy.clamp(upper) 453 start_value, end_value = ( 454 (clamped_lower, clamped_upper - self._cursor_granularity) 455 if self._cursor_granularity and not upper_is_end 456 else (clamped_lower, clamped_upper) 457 ) 458 yield StreamSlice( 459 partition={}, 460 cursor_slice={ 461 self._slice_boundary_fields_wrapper[ 462 self._START_BOUNDARY 463 ]: self._connector_state_converter.output_format(start_value), 464 self._slice_boundary_fields_wrapper[ 465 self._END_BOUNDARY 466 ]: self._connector_state_converter.output_format(end_value), 467 }, 468 ) 469 else: 470 stop_processing = False 471 current_lower_boundary = lower 472 while not stop_processing: 473 current_upper_boundary = min( 474 self._evaluate_upper_safely(current_lower_boundary, self._slice_range), upper 475 ) 476 has_reached_upper_boundary = current_upper_boundary >= upper 477 478 clamped_upper = ( 479 self._clamping_strategy.clamp(current_upper_boundary) 480 if current_upper_boundary != upper 481 else current_upper_boundary 482 ) 483 clamped_lower = self._clamping_strategy.clamp(current_lower_boundary) 484 if clamped_lower >= clamped_upper: 485 # clamping collapsed both values which means that it is time to stop processing 486 # FIXME should this be replace by proper end_provider 487 break 488 start_value, end_value = ( 489 (clamped_lower, clamped_upper - self._cursor_granularity) 490 if self._cursor_granularity 491 and (not upper_is_end or not has_reached_upper_boundary) 492 else (clamped_lower, clamped_upper) 493 ) 494 yield StreamSlice( 495 partition={}, 496 cursor_slice={ 497 self._slice_boundary_fields_wrapper[ 498 self._START_BOUNDARY 499 ]: self._connector_state_converter.output_format(start_value), 500 self._slice_boundary_fields_wrapper[ 501 self._END_BOUNDARY 502 ]: self._connector_state_converter.output_format(end_value), 503 }, 504 ) 505 current_lower_boundary = clamped_upper 506 if current_upper_boundary >= upper: 507 stop_processing = True 508 509 def _evaluate_upper_safely(self, lower: CursorValueType, step: GapType) -> CursorValueType: 510 """ 511 Given that we set the default step at datetime.timedelta.max, we will generate an OverflowError when evaluating the next start_date 512 This method assumes that users would never enter a step that would generate an overflow. Given that would be the case, the code 513 would have broken anyway. 514 """ 515 try: 516 return lower + step 517 except OverflowError: 518 return self._end_provider() 519 520 def should_be_synced(self, record: Record) -> bool: 521 """ 522 Determines if a record should be synced based on its cursor value. 523 :param record: The record to evaluate 524 525 :return: True if the record's cursor value falls within the sync boundaries 526 """ 527 try: 528 record_cursor_value: CursorValueType = self._extract_cursor_value(record) 529 except ValueError: 530 self._log_for_record_without_cursor_value() 531 return True 532 return self.start <= record_cursor_value <= self._end_provider() 533 534 def _log_for_record_without_cursor_value(self) -> None: 535 if not self._should_be_synced_logger_triggered: 536 LOGGER.warning( 537 f"Could not find cursor field `{self.cursor_field.cursor_field_key}` in record for stream {self._stream_name}. The incremental sync will assume it needs to be synced" 538 ) 539 self._should_be_synced_logger_triggered = True 540 541 def reduce_slice_range(self, stream_slice: StreamSlice) -> StreamSlice: 542 # In theory, we might be more flexible here meaning that it doesn't need to be in ascending order but it just 543 # needs to be ordered. For now though, we will only support ascending order. 544 if not self._is_ascending_order: 545 LOGGER.warning( 546 "Attempting to reduce slice while records are not returned in incremental order might lead to missing records" 547 ) 548 549 if stream_slice in self._most_recent_cursor_value_per_partition: 550 return StreamSlice( 551 partition=stream_slice.partition, 552 cursor_slice={ 553 self._slice_boundary_fields_wrapper[ 554 self._START_BOUNDARY 555 ]: self._connector_state_converter.output_format( 556 self._most_recent_cursor_value_per_partition[stream_slice] 557 ), 558 self._slice_boundary_fields_wrapper[ 559 self._END_BOUNDARY 560 ]: stream_slice.cursor_slice[ 561 self._slice_boundary_fields_wrapper[self._END_BOUNDARY] 562 ], 563 }, 564 extra_fields=stream_slice.extra_fields, 565 ) 566 else: 567 return stream_slice
41class CursorField: 42 def __init__(self, cursor_field_key: str) -> None: 43 self.cursor_field_key = cursor_field_key 44 45 def extract_value(self, record: Record) -> Any: 46 cursor_value = record.data.get(self.cursor_field_key) 47 if cursor_value is None: 48 raise ValueError(f"Could not find cursor field {self.cursor_field_key} in record") 49 return cursor_value # type: ignore # we assume that the value the path points at is a comparable
45 def extract_value(self, record: Record) -> Any: 46 cursor_value = record.data.get(self.cursor_field_key) 47 if cursor_value is None: 48 raise ValueError(f"Could not find cursor field {self.cursor_field_key} in record") 49 return cursor_value # type: ignore # we assume that the value the path points at is a comparable
52class Cursor(StreamSlicer, ABC): 53 @property 54 @abstractmethod 55 def state(self) -> MutableMapping[str, Any]: ... 56 57 @abstractmethod 58 def observe(self, record: Record) -> None: 59 """ 60 Indicate to the cursor that the record has been emitted 61 """ 62 raise NotImplementedError() 63 64 @abstractmethod 65 def close_partition(self, partition: Partition) -> None: 66 """ 67 Indicate to the cursor that the partition has been successfully processed 68 """ 69 raise NotImplementedError() 70 71 @abstractmethod 72 def ensure_at_least_one_state_emitted(self) -> None: 73 """ 74 State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per 75 stream. Hence, if no partitions are generated, this method needs to be called. 76 """ 77 raise NotImplementedError() 78 79 @abstractmethod 80 def should_be_synced(self, record: Record) -> bool: 81 pass 82 83 def stream_slices(self) -> Iterable[StreamSlice]: 84 """ 85 Default placeholder implementation of generate_slices. 86 Subclasses can override this method to provide actual behavior. 87 """ 88 yield StreamSlice(partition={}, cursor_slice={})
Slices the stream into chunks that can be fetched independently. Slices enable state checkpointing and data retrieval parallelization.
57 @abstractmethod 58 def observe(self, record: Record) -> None: 59 """ 60 Indicate to the cursor that the record has been emitted 61 """ 62 raise NotImplementedError()
Indicate to the cursor that the record has been emitted
64 @abstractmethod 65 def close_partition(self, partition: Partition) -> None: 66 """ 67 Indicate to the cursor that the partition has been successfully processed 68 """ 69 raise NotImplementedError()
Indicate to the cursor that the partition has been successfully processed
71 @abstractmethod 72 def ensure_at_least_one_state_emitted(self) -> None: 73 """ 74 State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per 75 stream. Hence, if no partitions are generated, this method needs to be called. 76 """ 77 raise NotImplementedError()
State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per stream. Hence, if no partitions are generated, this method needs to be called.
83 def stream_slices(self) -> Iterable[StreamSlice]: 84 """ 85 Default placeholder implementation of generate_slices. 86 Subclasses can override this method to provide actual behavior. 87 """ 88 yield StreamSlice(partition={}, cursor_slice={})
Default placeholder implementation of generate_slices. Subclasses can override this method to provide actual behavior.
91class FinalStateCursor(Cursor): 92 """Cursor that is used to guarantee at least one state message is emitted for a concurrent stream.""" 93 94 def __init__( 95 self, 96 stream_name: str, 97 stream_namespace: Optional[str], 98 message_repository: MessageRepository, 99 ) -> None: 100 self._stream_name = stream_name 101 self._stream_namespace = stream_namespace 102 self._message_repository = message_repository 103 # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel 104 # state message rather than manage overall source state. This is also only temporary as we move to the resumable 105 # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state. 106 self._connector_state_manager = ConnectorStateManager() 107 self._has_closed_at_least_one_slice = False 108 109 @property 110 def state(self) -> MutableMapping[str, Any]: 111 return {NO_CURSOR_STATE_KEY: True} 112 113 def observe(self, record: Record) -> None: 114 pass 115 116 def close_partition(self, partition: Partition) -> None: 117 pass 118 119 def ensure_at_least_one_state_emitted(self) -> None: 120 """ 121 Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync 122 """ 123 124 self._connector_state_manager.update_state_for_stream( 125 self._stream_name, self._stream_namespace, self.state 126 ) 127 state_message = self._connector_state_manager.create_state_message( 128 self._stream_name, self._stream_namespace 129 ) 130 self._message_repository.emit_message(state_message) 131 132 def should_be_synced(self, record: Record) -> bool: 133 return True
Cursor that is used to guarantee at least one state message is emitted for a concurrent stream.
94 def __init__( 95 self, 96 stream_name: str, 97 stream_namespace: Optional[str], 98 message_repository: MessageRepository, 99 ) -> None: 100 self._stream_name = stream_name 101 self._stream_namespace = stream_namespace 102 self._message_repository = message_repository 103 # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel 104 # state message rather than manage overall source state. This is also only temporary as we move to the resumable 105 # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state. 106 self._connector_state_manager = ConnectorStateManager() 107 self._has_closed_at_least_one_slice = False
Indicate to the cursor that the partition has been successfully processed
119 def ensure_at_least_one_state_emitted(self) -> None: 120 """ 121 Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync 122 """ 123 124 self._connector_state_manager.update_state_for_stream( 125 self._stream_name, self._stream_namespace, self.state 126 ) 127 state_message = self._connector_state_manager.create_state_message( 128 self._stream_name, self._stream_namespace 129 ) 130 self._message_repository.emit_message(state_message)
Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync
Inherited Members
136class ConcurrentCursor(Cursor): 137 _START_BOUNDARY = 0 138 _END_BOUNDARY = 1 139 140 def copy_without_state(self) -> "ConcurrentCursor": 141 return self.__class__( 142 stream_name=self._stream_name, 143 stream_namespace=self._stream_namespace, 144 stream_state={}, 145 message_repository=NoopMessageRepository(), 146 connector_state_manager=ConnectorStateManager(), 147 connector_state_converter=self._connector_state_converter, 148 cursor_field=self._cursor_field, 149 slice_boundary_fields=self._slice_boundary_fields, 150 start=self._start, 151 end_provider=self._end_provider, 152 lookback_window=self._lookback_window, 153 slice_range=self._slice_range, 154 cursor_granularity=self._cursor_granularity, 155 clamping_strategy=self._clamping_strategy, 156 ) 157 158 def __init__( 159 self, 160 stream_name: str, 161 stream_namespace: Optional[str], 162 stream_state: Any, 163 message_repository: MessageRepository, 164 connector_state_manager: ConnectorStateManager, 165 connector_state_converter: AbstractStreamStateConverter, 166 cursor_field: CursorField, 167 slice_boundary_fields: Optional[Tuple[str, str]], 168 start: Optional[CursorValueType], 169 end_provider: Callable[[], CursorValueType], 170 lookback_window: Optional[GapType] = None, 171 slice_range: Optional[GapType] = None, 172 cursor_granularity: Optional[GapType] = None, 173 clamping_strategy: ClampingStrategy = NoClamping(), 174 ) -> None: 175 self._stream_name = stream_name 176 self._stream_namespace = stream_namespace 177 self._message_repository = message_repository 178 self._connector_state_converter = connector_state_converter 179 self._connector_state_manager = connector_state_manager 180 self._cursor_field = cursor_field 181 # To see some example where the slice boundaries might not be defined, check https://github.com/airbytehq/airbyte/blob/1ce84d6396e446e1ac2377362446e3fb94509461/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py#L363-L379 182 self._slice_boundary_fields = slice_boundary_fields 183 self._start = start 184 self._end_provider = end_provider 185 self.start, self._concurrent_state = self._get_concurrent_state(stream_state) 186 self._lookback_window = lookback_window 187 self._slice_range = slice_range 188 self._most_recent_cursor_value_per_partition: MutableMapping[ 189 Union[StreamSlice, Mapping[str, Any], None], Any 190 ] = {} 191 self._has_closed_at_least_one_slice = False 192 self._cursor_granularity = cursor_granularity 193 # Flag to track if the logger has been triggered (per stream) 194 self._should_be_synced_logger_triggered = False 195 self._clamping_strategy = clamping_strategy 196 self._is_ascending_order = True 197 198 # A lock is required when closing a partition because updating the cursor's concurrent_state is 199 # not thread safe. When multiple partitions are being closed by the cursor at the same time, it is 200 # possible for one partition to update concurrent_state after a second partition has already read 201 # the previous state. This can lead to the second partition overwriting the previous one's state. 202 self._lock = threading.Lock() 203 204 @property 205 def state(self) -> MutableMapping[str, Any]: 206 return self._connector_state_converter.convert_to_state_message( 207 self.cursor_field, self._concurrent_state 208 ) 209 210 @property 211 def cursor_field(self) -> CursorField: 212 return self._cursor_field 213 214 @property 215 def _slice_boundary_fields_wrapper(self) -> Tuple[str, str]: 216 return ( 217 self._slice_boundary_fields 218 if self._slice_boundary_fields 219 else ( 220 self._connector_state_converter.START_KEY, 221 self._connector_state_converter.END_KEY, 222 ) 223 ) 224 225 def _get_concurrent_state( 226 self, state: MutableMapping[str, Any] 227 ) -> Tuple[CursorValueType, MutableMapping[str, Any]]: 228 if self._connector_state_converter.is_state_message_compatible(state): 229 partitioned_state = self._connector_state_converter.deserialize(state) 230 slices_from_partitioned_state = partitioned_state.get("slices", []) 231 232 value_from_partitioned_state = None 233 if slices_from_partitioned_state: 234 # We assume here that the slices have been already merged 235 first_slice = slices_from_partitioned_state[0] 236 value_from_partitioned_state = ( 237 first_slice[self._connector_state_converter.MOST_RECENT_RECORD_KEY] 238 if self._connector_state_converter.MOST_RECENT_RECORD_KEY in first_slice 239 else first_slice[self._connector_state_converter.END_KEY] 240 ) 241 return ( 242 value_from_partitioned_state 243 or self._start 244 or self._connector_state_converter.zero_value, 245 partitioned_state, 246 ) 247 return self._connector_state_converter.convert_from_sequential_state( 248 self._cursor_field, state, self._start 249 ) 250 251 def observe(self, record: Record) -> None: 252 # Because observe writes to the most_recent_cursor_value_per_partition mapping, 253 # it is not thread-safe. However, this shouldn't lead to concurrency issues because 254 # observe() is only invoked by PartitionReader.process_partition(). Since the map is 255 # broken down according to partition, concurrent threads processing only read/write 256 # from different keys which avoids any conflicts. 257 # 258 # If we were to add thread safety, we should implement a lock per-partition 259 # which is instantiated during stream_slices() 260 most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get( 261 record.associated_slice 262 ) 263 try: 264 cursor_value = self._extract_cursor_value(record) 265 266 if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value: 267 self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value 268 elif most_recent_cursor_value > cursor_value: 269 self._is_ascending_order = False 270 except ValueError: 271 self._log_for_record_without_cursor_value() 272 273 def _extract_cursor_value(self, record: Record) -> Any: 274 return self._connector_state_converter.parse_value(self._cursor_field.extract_value(record)) 275 276 def close_partition(self, partition: Partition) -> None: 277 with self._lock: 278 slice_count_before = len(self._concurrent_state.get("slices", [])) 279 self._add_slice_to_state(partition) 280 if slice_count_before < len( 281 self._concurrent_state["slices"] 282 ): # only emit if at least one slice has been processed 283 self._merge_partitions() 284 self._emit_state_message() 285 self._has_closed_at_least_one_slice = True 286 287 def _add_slice_to_state(self, partition: Partition) -> None: 288 most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get( 289 partition.to_slice() 290 ) 291 292 if self._slice_boundary_fields: 293 if "slices" not in self._concurrent_state: 294 raise RuntimeError( 295 f"The state for stream {self._stream_name} should have at least one slice to delineate the sync start time, but no slices are present. This is unexpected. Please contact Support." 296 ) 297 self._concurrent_state["slices"].append( 298 { 299 self._connector_state_converter.START_KEY: self._extract_from_slice( 300 partition, self._slice_boundary_fields[self._START_BOUNDARY] 301 ), 302 self._connector_state_converter.END_KEY: self._extract_from_slice( 303 partition, self._slice_boundary_fields[self._END_BOUNDARY] 304 ), 305 self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value, 306 } 307 ) 308 elif most_recent_cursor_value: 309 if self._has_closed_at_least_one_slice: 310 # If we track state value using records cursor field, we can only do that if there is one partition. This is because we save 311 # the state every time we close a partition. We assume that if there are multiple slices, they need to be providing 312 # boundaries. There are cases where partitions could not have boundaries: 313 # * The cursor should be per-partition 314 # * The stream state is actually the parent stream state 315 # There might be other cases not listed above. Those are not supported today hence the stream should not use this cursor for 316 # state management. For the specific user that was affected with this issue, we need to: 317 # * Fix state tracking (which is currently broken) 318 # * Make the new version available 319 # * (Probably) ask the user to reset the stream to avoid data loss 320 raise ValueError( 321 "Given that slice_boundary_fields is not defined and that per-partition state is not supported, only one slice is " 322 "expected. Please contact the Airbyte team." 323 ) 324 325 self._concurrent_state["slices"].append( 326 { 327 self._connector_state_converter.START_KEY: self.start, 328 self._connector_state_converter.END_KEY: most_recent_cursor_value, 329 self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value, 330 } 331 ) 332 333 def _emit_state_message(self) -> None: 334 self._connector_state_manager.update_state_for_stream( 335 self._stream_name, 336 self._stream_namespace, 337 self.state, 338 ) 339 state_message = self._connector_state_manager.create_state_message( 340 self._stream_name, self._stream_namespace 341 ) 342 self._message_repository.emit_message(state_message) 343 344 def _merge_partitions(self) -> None: 345 self._concurrent_state["slices"] = self._connector_state_converter.merge_intervals( 346 self._concurrent_state["slices"] 347 ) 348 349 def _extract_from_slice(self, partition: Partition, key: str) -> CursorValueType: 350 try: 351 _slice = partition.to_slice() 352 if not _slice: 353 raise KeyError(f"Could not find key `{key}` in empty slice") 354 return self._connector_state_converter.parse_value(_slice[key]) # type: ignore # we expect the devs to specify a key that would return a CursorValueType 355 except KeyError as exception: 356 raise KeyError( 357 f"Partition is expected to have key `{key}` but could not be found" 358 ) from exception 359 360 def ensure_at_least_one_state_emitted(self) -> None: 361 """ 362 The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be 363 called. 364 """ 365 self._emit_state_message() 366 367 def stream_slices(self) -> Iterable[StreamSlice]: 368 """ 369 Generating slices based on a few parameters: 370 * lookback_window: Buffer to remove from END_KEY of the highest slice 371 * slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created 372 * start: `_split_per_slice_range` will clip any value to `self._start which means that: 373 * if upper is less than self._start, no slices will be generated 374 * if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case) 375 376 Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be 377 inclusive in the API that is queried. 378 """ 379 self._merge_partitions() 380 381 if self._start is not None and self._is_start_before_first_slice(): 382 yield from self._split_per_slice_range( 383 self._start, 384 self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY], 385 False, 386 ) 387 388 if len(self._concurrent_state["slices"]) == 1: 389 yield from self._split_per_slice_range( 390 self._calculate_lower_boundary_of_last_slice( 391 self._concurrent_state["slices"][0][self._connector_state_converter.END_KEY] 392 ), 393 self._end_provider(), 394 True, 395 ) 396 elif len(self._concurrent_state["slices"]) > 1: 397 for i in range(len(self._concurrent_state["slices"]) - 1): 398 if self._cursor_granularity: 399 yield from self._split_per_slice_range( 400 self._concurrent_state["slices"][i][self._connector_state_converter.END_KEY] 401 + self._cursor_granularity, 402 self._concurrent_state["slices"][i + 1][ 403 self._connector_state_converter.START_KEY 404 ], 405 False, 406 ) 407 else: 408 yield from self._split_per_slice_range( 409 self._concurrent_state["slices"][i][ 410 self._connector_state_converter.END_KEY 411 ], 412 self._concurrent_state["slices"][i + 1][ 413 self._connector_state_converter.START_KEY 414 ], 415 False, 416 ) 417 yield from self._split_per_slice_range( 418 self._calculate_lower_boundary_of_last_slice( 419 self._concurrent_state["slices"][-1][self._connector_state_converter.END_KEY] 420 ), 421 self._end_provider(), 422 True, 423 ) 424 else: 425 raise ValueError("Expected at least one slice") 426 427 def _is_start_before_first_slice(self) -> bool: 428 return ( 429 self._start is not None 430 and self._start 431 < self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY] 432 ) 433 434 def _calculate_lower_boundary_of_last_slice( 435 self, lower_boundary: CursorValueType 436 ) -> CursorValueType: 437 if self._lookback_window: 438 return lower_boundary - self._lookback_window 439 return lower_boundary 440 441 def _split_per_slice_range( 442 self, lower: CursorValueType, upper: CursorValueType, upper_is_end: bool 443 ) -> Iterable[StreamSlice]: 444 if lower >= upper: 445 return 446 447 if self._start and upper < self._start: 448 return 449 450 lower = max(lower, self._start) if self._start else lower 451 if not self._slice_range or self._evaluate_upper_safely(lower, self._slice_range) >= upper: 452 clamped_lower = self._clamping_strategy.clamp(lower) 453 clamped_upper = self._clamping_strategy.clamp(upper) 454 start_value, end_value = ( 455 (clamped_lower, clamped_upper - self._cursor_granularity) 456 if self._cursor_granularity and not upper_is_end 457 else (clamped_lower, clamped_upper) 458 ) 459 yield StreamSlice( 460 partition={}, 461 cursor_slice={ 462 self._slice_boundary_fields_wrapper[ 463 self._START_BOUNDARY 464 ]: self._connector_state_converter.output_format(start_value), 465 self._slice_boundary_fields_wrapper[ 466 self._END_BOUNDARY 467 ]: self._connector_state_converter.output_format(end_value), 468 }, 469 ) 470 else: 471 stop_processing = False 472 current_lower_boundary = lower 473 while not stop_processing: 474 current_upper_boundary = min( 475 self._evaluate_upper_safely(current_lower_boundary, self._slice_range), upper 476 ) 477 has_reached_upper_boundary = current_upper_boundary >= upper 478 479 clamped_upper = ( 480 self._clamping_strategy.clamp(current_upper_boundary) 481 if current_upper_boundary != upper 482 else current_upper_boundary 483 ) 484 clamped_lower = self._clamping_strategy.clamp(current_lower_boundary) 485 if clamped_lower >= clamped_upper: 486 # clamping collapsed both values which means that it is time to stop processing 487 # FIXME should this be replace by proper end_provider 488 break 489 start_value, end_value = ( 490 (clamped_lower, clamped_upper - self._cursor_granularity) 491 if self._cursor_granularity 492 and (not upper_is_end or not has_reached_upper_boundary) 493 else (clamped_lower, clamped_upper) 494 ) 495 yield StreamSlice( 496 partition={}, 497 cursor_slice={ 498 self._slice_boundary_fields_wrapper[ 499 self._START_BOUNDARY 500 ]: self._connector_state_converter.output_format(start_value), 501 self._slice_boundary_fields_wrapper[ 502 self._END_BOUNDARY 503 ]: self._connector_state_converter.output_format(end_value), 504 }, 505 ) 506 current_lower_boundary = clamped_upper 507 if current_upper_boundary >= upper: 508 stop_processing = True 509 510 def _evaluate_upper_safely(self, lower: CursorValueType, step: GapType) -> CursorValueType: 511 """ 512 Given that we set the default step at datetime.timedelta.max, we will generate an OverflowError when evaluating the next start_date 513 This method assumes that users would never enter a step that would generate an overflow. Given that would be the case, the code 514 would have broken anyway. 515 """ 516 try: 517 return lower + step 518 except OverflowError: 519 return self._end_provider() 520 521 def should_be_synced(self, record: Record) -> bool: 522 """ 523 Determines if a record should be synced based on its cursor value. 524 :param record: The record to evaluate 525 526 :return: True if the record's cursor value falls within the sync boundaries 527 """ 528 try: 529 record_cursor_value: CursorValueType = self._extract_cursor_value(record) 530 except ValueError: 531 self._log_for_record_without_cursor_value() 532 return True 533 return self.start <= record_cursor_value <= self._end_provider() 534 535 def _log_for_record_without_cursor_value(self) -> None: 536 if not self._should_be_synced_logger_triggered: 537 LOGGER.warning( 538 f"Could not find cursor field `{self.cursor_field.cursor_field_key}` in record for stream {self._stream_name}. The incremental sync will assume it needs to be synced" 539 ) 540 self._should_be_synced_logger_triggered = True 541 542 def reduce_slice_range(self, stream_slice: StreamSlice) -> StreamSlice: 543 # In theory, we might be more flexible here meaning that it doesn't need to be in ascending order but it just 544 # needs to be ordered. For now though, we will only support ascending order. 545 if not self._is_ascending_order: 546 LOGGER.warning( 547 "Attempting to reduce slice while records are not returned in incremental order might lead to missing records" 548 ) 549 550 if stream_slice in self._most_recent_cursor_value_per_partition: 551 return StreamSlice( 552 partition=stream_slice.partition, 553 cursor_slice={ 554 self._slice_boundary_fields_wrapper[ 555 self._START_BOUNDARY 556 ]: self._connector_state_converter.output_format( 557 self._most_recent_cursor_value_per_partition[stream_slice] 558 ), 559 self._slice_boundary_fields_wrapper[ 560 self._END_BOUNDARY 561 ]: stream_slice.cursor_slice[ 562 self._slice_boundary_fields_wrapper[self._END_BOUNDARY] 563 ], 564 }, 565 extra_fields=stream_slice.extra_fields, 566 ) 567 else: 568 return stream_slice
Slices the stream into chunks that can be fetched independently. Slices enable state checkpointing and data retrieval parallelization.
158 def __init__( 159 self, 160 stream_name: str, 161 stream_namespace: Optional[str], 162 stream_state: Any, 163 message_repository: MessageRepository, 164 connector_state_manager: ConnectorStateManager, 165 connector_state_converter: AbstractStreamStateConverter, 166 cursor_field: CursorField, 167 slice_boundary_fields: Optional[Tuple[str, str]], 168 start: Optional[CursorValueType], 169 end_provider: Callable[[], CursorValueType], 170 lookback_window: Optional[GapType] = None, 171 slice_range: Optional[GapType] = None, 172 cursor_granularity: Optional[GapType] = None, 173 clamping_strategy: ClampingStrategy = NoClamping(), 174 ) -> None: 175 self._stream_name = stream_name 176 self._stream_namespace = stream_namespace 177 self._message_repository = message_repository 178 self._connector_state_converter = connector_state_converter 179 self._connector_state_manager = connector_state_manager 180 self._cursor_field = cursor_field 181 # To see some example where the slice boundaries might not be defined, check https://github.com/airbytehq/airbyte/blob/1ce84d6396e446e1ac2377362446e3fb94509461/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py#L363-L379 182 self._slice_boundary_fields = slice_boundary_fields 183 self._start = start 184 self._end_provider = end_provider 185 self.start, self._concurrent_state = self._get_concurrent_state(stream_state) 186 self._lookback_window = lookback_window 187 self._slice_range = slice_range 188 self._most_recent_cursor_value_per_partition: MutableMapping[ 189 Union[StreamSlice, Mapping[str, Any], None], Any 190 ] = {} 191 self._has_closed_at_least_one_slice = False 192 self._cursor_granularity = cursor_granularity 193 # Flag to track if the logger has been triggered (per stream) 194 self._should_be_synced_logger_triggered = False 195 self._clamping_strategy = clamping_strategy 196 self._is_ascending_order = True 197 198 # A lock is required when closing a partition because updating the cursor's concurrent_state is 199 # not thread safe. When multiple partitions are being closed by the cursor at the same time, it is 200 # possible for one partition to update concurrent_state after a second partition has already read 201 # the previous state. This can lead to the second partition overwriting the previous one's state. 202 self._lock = threading.Lock()
140 def copy_without_state(self) -> "ConcurrentCursor": 141 return self.__class__( 142 stream_name=self._stream_name, 143 stream_namespace=self._stream_namespace, 144 stream_state={}, 145 message_repository=NoopMessageRepository(), 146 connector_state_manager=ConnectorStateManager(), 147 connector_state_converter=self._connector_state_converter, 148 cursor_field=self._cursor_field, 149 slice_boundary_fields=self._slice_boundary_fields, 150 start=self._start, 151 end_provider=self._end_provider, 152 lookback_window=self._lookback_window, 153 slice_range=self._slice_range, 154 cursor_granularity=self._cursor_granularity, 155 clamping_strategy=self._clamping_strategy, 156 )
251 def observe(self, record: Record) -> None: 252 # Because observe writes to the most_recent_cursor_value_per_partition mapping, 253 # it is not thread-safe. However, this shouldn't lead to concurrency issues because 254 # observe() is only invoked by PartitionReader.process_partition(). Since the map is 255 # broken down according to partition, concurrent threads processing only read/write 256 # from different keys which avoids any conflicts. 257 # 258 # If we were to add thread safety, we should implement a lock per-partition 259 # which is instantiated during stream_slices() 260 most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get( 261 record.associated_slice 262 ) 263 try: 264 cursor_value = self._extract_cursor_value(record) 265 266 if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value: 267 self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value 268 elif most_recent_cursor_value > cursor_value: 269 self._is_ascending_order = False 270 except ValueError: 271 self._log_for_record_without_cursor_value()
Indicate to the cursor that the record has been emitted
276 def close_partition(self, partition: Partition) -> None: 277 with self._lock: 278 slice_count_before = len(self._concurrent_state.get("slices", [])) 279 self._add_slice_to_state(partition) 280 if slice_count_before < len( 281 self._concurrent_state["slices"] 282 ): # only emit if at least one slice has been processed 283 self._merge_partitions() 284 self._emit_state_message() 285 self._has_closed_at_least_one_slice = True
Indicate to the cursor that the partition has been successfully processed
360 def ensure_at_least_one_state_emitted(self) -> None: 361 """ 362 The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be 363 called. 364 """ 365 self._emit_state_message()
The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be called.
367 def stream_slices(self) -> Iterable[StreamSlice]: 368 """ 369 Generating slices based on a few parameters: 370 * lookback_window: Buffer to remove from END_KEY of the highest slice 371 * slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created 372 * start: `_split_per_slice_range` will clip any value to `self._start which means that: 373 * if upper is less than self._start, no slices will be generated 374 * if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case) 375 376 Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be 377 inclusive in the API that is queried. 378 """ 379 self._merge_partitions() 380 381 if self._start is not None and self._is_start_before_first_slice(): 382 yield from self._split_per_slice_range( 383 self._start, 384 self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY], 385 False, 386 ) 387 388 if len(self._concurrent_state["slices"]) == 1: 389 yield from self._split_per_slice_range( 390 self._calculate_lower_boundary_of_last_slice( 391 self._concurrent_state["slices"][0][self._connector_state_converter.END_KEY] 392 ), 393 self._end_provider(), 394 True, 395 ) 396 elif len(self._concurrent_state["slices"]) > 1: 397 for i in range(len(self._concurrent_state["slices"]) - 1): 398 if self._cursor_granularity: 399 yield from self._split_per_slice_range( 400 self._concurrent_state["slices"][i][self._connector_state_converter.END_KEY] 401 + self._cursor_granularity, 402 self._concurrent_state["slices"][i + 1][ 403 self._connector_state_converter.START_KEY 404 ], 405 False, 406 ) 407 else: 408 yield from self._split_per_slice_range( 409 self._concurrent_state["slices"][i][ 410 self._connector_state_converter.END_KEY 411 ], 412 self._concurrent_state["slices"][i + 1][ 413 self._connector_state_converter.START_KEY 414 ], 415 False, 416 ) 417 yield from self._split_per_slice_range( 418 self._calculate_lower_boundary_of_last_slice( 419 self._concurrent_state["slices"][-1][self._connector_state_converter.END_KEY] 420 ), 421 self._end_provider(), 422 True, 423 ) 424 else: 425 raise ValueError("Expected at least one slice")
Generating slices based on a few parameters:
- lookback_window: Buffer to remove from END_KEY of the highest slice
- slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created
- start:
_split_per_slice_rangewill clip any value to `self._start which means that:- if upper is less than self._start, no slices will be generated
- if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case)
Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be inclusive in the API that is queried.
521 def should_be_synced(self, record: Record) -> bool: 522 """ 523 Determines if a record should be synced based on its cursor value. 524 :param record: The record to evaluate 525 526 :return: True if the record's cursor value falls within the sync boundaries 527 """ 528 try: 529 record_cursor_value: CursorValueType = self._extract_cursor_value(record) 530 except ValueError: 531 self._log_for_record_without_cursor_value() 532 return True 533 return self.start <= record_cursor_value <= self._end_provider()
Determines if a record should be synced based on its cursor value.
Parameters
- record: The record to evaluate
Returns
True if the record's cursor value falls within the sync boundaries
542 def reduce_slice_range(self, stream_slice: StreamSlice) -> StreamSlice: 543 # In theory, we might be more flexible here meaning that it doesn't need to be in ascending order but it just 544 # needs to be ordered. For now though, we will only support ascending order. 545 if not self._is_ascending_order: 546 LOGGER.warning( 547 "Attempting to reduce slice while records are not returned in incremental order might lead to missing records" 548 ) 549 550 if stream_slice in self._most_recent_cursor_value_per_partition: 551 return StreamSlice( 552 partition=stream_slice.partition, 553 cursor_slice={ 554 self._slice_boundary_fields_wrapper[ 555 self._START_BOUNDARY 556 ]: self._connector_state_converter.output_format( 557 self._most_recent_cursor_value_per_partition[stream_slice] 558 ), 559 self._slice_boundary_fields_wrapper[ 560 self._END_BOUNDARY 561 ]: stream_slice.cursor_slice[ 562 self._slice_boundary_fields_wrapper[self._END_BOUNDARY] 563 ], 564 }, 565 extra_fields=stream_slice.extra_fields, 566 ) 567 else: 568 return stream_slice