airbyte_cdk.sources.streams.concurrent.cursor
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import functools 6import logging 7import threading 8from abc import ABC, abstractmethod 9from typing import ( 10 Any, 11 Callable, 12 Iterable, 13 List, 14 Mapping, 15 MutableMapping, 16 Optional, 17 Tuple, 18 Union, 19) 20 21from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager 22from airbyte_cdk.sources.message import MessageRepository, NoopMessageRepository 23from airbyte_cdk.sources.streams import NO_CURSOR_STATE_KEY 24from airbyte_cdk.sources.streams.concurrent.clamping import ClampingStrategy, NoClamping 25from airbyte_cdk.sources.streams.concurrent.cursor_types import CursorValueType, GapType 26from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition 27from airbyte_cdk.sources.streams.concurrent.partitions.stream_slicer import StreamSlicer 28from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import ( 29 AbstractStreamStateConverter, 30) 31from airbyte_cdk.sources.types import Record, StreamSlice 32 33LOGGER = logging.getLogger("airbyte") 34 35 36def _extract_value(mapping: Mapping[str, Any], path: List[str]) -> Any: 37 return functools.reduce(lambda a, b: a[b], path, mapping) 38 39 40class CursorField: 41 def __init__( 42 self, cursor_field_key: str, supports_catalog_defined_cursor_field: bool = False 43 ) -> None: 44 self.cursor_field_key = cursor_field_key 45 self.supports_catalog_defined_cursor_field = supports_catalog_defined_cursor_field 46 47 def extract_value(self, record: Record) -> Any: 48 cursor_value = record.data.get(self.cursor_field_key) 49 if cursor_value is None: 50 raise ValueError(f"Could not find cursor field {self.cursor_field_key} in record") 51 return cursor_value # type: ignore # we assume that the value the path points at is a comparable 52 53 54class Cursor(StreamSlicer, ABC): 55 @property 56 @abstractmethod 57 def state(self) -> MutableMapping[str, Any]: ... 58 59 @abstractmethod 60 def observe(self, record: Record) -> None: 61 """ 62 Indicate to the cursor that the record has been emitted 63 """ 64 raise NotImplementedError() 65 66 @abstractmethod 67 def close_partition(self, partition: Partition) -> None: 68 """ 69 Indicate to the cursor that the partition has been successfully processed 70 """ 71 raise NotImplementedError() 72 73 @abstractmethod 74 def ensure_at_least_one_state_emitted(self) -> None: 75 """ 76 State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per 77 stream. Hence, if no partitions are generated, this method needs to be called. 78 """ 79 raise NotImplementedError() 80 81 @abstractmethod 82 def should_be_synced(self, record: Record) -> bool: 83 pass 84 85 def stream_slices(self) -> Iterable[StreamSlice]: 86 """ 87 Default placeholder implementation of generate_slices. 88 Subclasses can override this method to provide actual behavior. 89 """ 90 yield StreamSlice(partition={}, cursor_slice={}) 91 92 93class FinalStateCursor(Cursor): 94 """Cursor that is used to guarantee at least one state message is emitted for a concurrent stream.""" 95 96 def __init__( 97 self, 98 stream_name: str, 99 stream_namespace: Optional[str], 100 message_repository: MessageRepository, 101 ) -> None: 102 self._stream_name = stream_name 103 self._stream_namespace = stream_namespace 104 self._message_repository = message_repository 105 # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel 106 # state message rather than manage overall source state. This is also only temporary as we move to the resumable 107 # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state. 108 self._connector_state_manager = ConnectorStateManager() 109 self._has_closed_at_least_one_slice = False 110 111 @property 112 def state(self) -> MutableMapping[str, Any]: 113 return {NO_CURSOR_STATE_KEY: True} 114 115 def observe(self, record: Record) -> None: 116 pass 117 118 def close_partition(self, partition: Partition) -> None: 119 pass 120 121 def ensure_at_least_one_state_emitted(self) -> None: 122 """ 123 Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync 124 """ 125 126 self._connector_state_manager.update_state_for_stream( 127 self._stream_name, self._stream_namespace, self.state 128 ) 129 state_message = self._connector_state_manager.create_state_message( 130 self._stream_name, self._stream_namespace 131 ) 132 self._message_repository.emit_message(state_message) 133 134 def should_be_synced(self, record: Record) -> bool: 135 return True 136 137 138class ConcurrentCursor(Cursor): 139 _START_BOUNDARY = 0 140 _END_BOUNDARY = 1 141 142 def copy_without_state(self) -> "ConcurrentCursor": 143 return self.__class__( 144 stream_name=self._stream_name, 145 stream_namespace=self._stream_namespace, 146 stream_state={}, 147 message_repository=NoopMessageRepository(), 148 connector_state_manager=ConnectorStateManager(), 149 connector_state_converter=self._connector_state_converter, 150 cursor_field=self._cursor_field, 151 slice_boundary_fields=self._slice_boundary_fields, 152 start=self._start, 153 end_provider=self._end_provider, 154 lookback_window=self._lookback_window, 155 slice_range=self._slice_range, 156 cursor_granularity=self._cursor_granularity, 157 clamping_strategy=self._clamping_strategy, 158 ) 159 160 def __init__( 161 self, 162 stream_name: str, 163 stream_namespace: Optional[str], 164 stream_state: Any, 165 message_repository: MessageRepository, 166 connector_state_manager: ConnectorStateManager, 167 connector_state_converter: AbstractStreamStateConverter, 168 cursor_field: CursorField, 169 slice_boundary_fields: Optional[Tuple[str, str]], 170 start: Optional[CursorValueType], 171 end_provider: Callable[[], CursorValueType], 172 lookback_window: Optional[GapType] = None, 173 slice_range: Optional[GapType] = None, 174 cursor_granularity: Optional[GapType] = None, 175 clamping_strategy: ClampingStrategy = NoClamping(), 176 ) -> None: 177 self._stream_name = stream_name 178 self._stream_namespace = stream_namespace 179 self._message_repository = message_repository 180 self._connector_state_converter = connector_state_converter 181 self._connector_state_manager = connector_state_manager 182 self._cursor_field = cursor_field 183 # To see some example where the slice boundaries might not be defined, check https://github.com/airbytehq/airbyte/blob/1ce84d6396e446e1ac2377362446e3fb94509461/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py#L363-L379 184 self._slice_boundary_fields = slice_boundary_fields 185 self._start = start 186 self._end_provider = end_provider 187 self.start, self._concurrent_state = self._get_concurrent_state(stream_state) 188 self._lookback_window = lookback_window 189 self._slice_range = slice_range 190 self._most_recent_cursor_value_per_partition: MutableMapping[ 191 Union[StreamSlice, Mapping[str, Any], None], Any 192 ] = {} 193 self._has_closed_at_least_one_slice = False 194 self._cursor_granularity = cursor_granularity 195 # Flag to track if the logger has been triggered (per stream) 196 self._should_be_synced_logger_triggered = False 197 self._clamping_strategy = clamping_strategy 198 self._is_ascending_order = True 199 200 # A lock is required when closing a partition because updating the cursor's concurrent_state is 201 # not thread safe. When multiple partitions are being closed by the cursor at the same time, it is 202 # possible for one partition to update concurrent_state after a second partition has already read 203 # the previous state. This can lead to the second partition overwriting the previous one's state. 204 self._lock = threading.Lock() 205 206 @property 207 def state(self) -> MutableMapping[str, Any]: 208 return self._connector_state_converter.convert_to_state_message( 209 self.cursor_field, self._concurrent_state 210 ) 211 212 @property 213 def cursor_field(self) -> CursorField: 214 return self._cursor_field 215 216 @property 217 def _slice_boundary_fields_wrapper(self) -> Tuple[str, str]: 218 return ( 219 self._slice_boundary_fields 220 if self._slice_boundary_fields 221 else ( 222 self._connector_state_converter.START_KEY, 223 self._connector_state_converter.END_KEY, 224 ) 225 ) 226 227 def _get_concurrent_state( 228 self, state: MutableMapping[str, Any] 229 ) -> Tuple[CursorValueType, MutableMapping[str, Any]]: 230 if self._connector_state_converter.is_state_message_compatible(state): 231 partitioned_state = self._connector_state_converter.deserialize(state) 232 slices_from_partitioned_state = partitioned_state.get("slices", []) 233 234 value_from_partitioned_state = None 235 if slices_from_partitioned_state: 236 # We assume here that the slices have been already merged 237 first_slice = slices_from_partitioned_state[0] 238 value_from_partitioned_state = ( 239 first_slice[self._connector_state_converter.MOST_RECENT_RECORD_KEY] 240 if self._connector_state_converter.MOST_RECENT_RECORD_KEY in first_slice 241 else first_slice[self._connector_state_converter.END_KEY] 242 ) 243 return ( 244 value_from_partitioned_state 245 or self._start 246 or self._connector_state_converter.zero_value, 247 partitioned_state, 248 ) 249 return self._connector_state_converter.convert_from_sequential_state( 250 self._cursor_field, state, self._start 251 ) 252 253 def observe(self, record: Record) -> None: 254 # Because observe writes to the most_recent_cursor_value_per_partition mapping, 255 # it is not thread-safe. However, this shouldn't lead to concurrency issues because 256 # observe() is only invoked by PartitionReader.process_partition(). Since the map is 257 # broken down according to partition, concurrent threads processing only read/write 258 # from different keys which avoids any conflicts. 259 # 260 # If we were to add thread safety, we should implement a lock per-partition 261 # which is instantiated during stream_slices() 262 most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get( 263 record.associated_slice 264 ) 265 try: 266 cursor_value = self._extract_cursor_value(record) 267 268 if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value: 269 self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value 270 elif most_recent_cursor_value > cursor_value: 271 self._is_ascending_order = False 272 except ValueError: 273 self._log_for_record_without_cursor_value() 274 275 def _extract_cursor_value(self, record: Record) -> Any: 276 return self._connector_state_converter.parse_value(self._cursor_field.extract_value(record)) 277 278 def close_partition(self, partition: Partition) -> None: 279 with self._lock: 280 slice_count_before = len(self._concurrent_state.get("slices", [])) 281 self._add_slice_to_state(partition) 282 if slice_count_before < len( 283 self._concurrent_state["slices"] 284 ): # only emit if at least one slice has been processed 285 self._merge_partitions() 286 self._emit_state_message() 287 self._has_closed_at_least_one_slice = True 288 289 def _add_slice_to_state(self, partition: Partition) -> None: 290 most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get( 291 partition.to_slice() 292 ) 293 294 if self._slice_boundary_fields: 295 if "slices" not in self._concurrent_state: 296 raise RuntimeError( 297 f"The state for stream {self._stream_name} should have at least one slice to delineate the sync start time, but no slices are present. This is unexpected. Please contact Support." 298 ) 299 self._concurrent_state["slices"].append( 300 { 301 self._connector_state_converter.START_KEY: self._extract_from_slice( 302 partition, self._slice_boundary_fields[self._START_BOUNDARY] 303 ), 304 self._connector_state_converter.END_KEY: self._extract_from_slice( 305 partition, self._slice_boundary_fields[self._END_BOUNDARY] 306 ), 307 self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value, 308 } 309 ) 310 elif most_recent_cursor_value: 311 if self._has_closed_at_least_one_slice: 312 # If we track state value using records cursor field, we can only do that if there is one partition. This is because we save 313 # the state every time we close a partition. We assume that if there are multiple slices, they need to be providing 314 # boundaries. There are cases where partitions could not have boundaries: 315 # * The cursor should be per-partition 316 # * The stream state is actually the parent stream state 317 # There might be other cases not listed above. Those are not supported today hence the stream should not use this cursor for 318 # state management. For the specific user that was affected with this issue, we need to: 319 # * Fix state tracking (which is currently broken) 320 # * Make the new version available 321 # * (Probably) ask the user to reset the stream to avoid data loss 322 raise ValueError( 323 "Given that slice_boundary_fields is not defined and that per-partition state is not supported, only one slice is " 324 "expected. Please contact the Airbyte team." 325 ) 326 327 self._concurrent_state["slices"].append( 328 { 329 self._connector_state_converter.START_KEY: self.start, 330 self._connector_state_converter.END_KEY: most_recent_cursor_value, 331 self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value, 332 } 333 ) 334 335 def _emit_state_message(self) -> None: 336 self._connector_state_manager.update_state_for_stream( 337 self._stream_name, 338 self._stream_namespace, 339 self.state, 340 ) 341 state_message = self._connector_state_manager.create_state_message( 342 self._stream_name, self._stream_namespace 343 ) 344 self._message_repository.emit_message(state_message) 345 346 def _merge_partitions(self) -> None: 347 self._concurrent_state["slices"] = self._connector_state_converter.merge_intervals( 348 self._concurrent_state["slices"] 349 ) 350 351 def _extract_from_slice(self, partition: Partition, key: str) -> CursorValueType: 352 try: 353 _slice = partition.to_slice() 354 if not _slice: 355 raise KeyError(f"Could not find key `{key}` in empty slice") 356 return self._connector_state_converter.parse_value(_slice[key]) # type: ignore # we expect the devs to specify a key that would return a CursorValueType 357 except KeyError as exception: 358 raise KeyError( 359 f"Partition is expected to have key `{key}` but could not be found" 360 ) from exception 361 362 def ensure_at_least_one_state_emitted(self) -> None: 363 """ 364 The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be 365 called. 366 """ 367 self._emit_state_message() 368 369 def stream_slices(self) -> Iterable[StreamSlice]: 370 """ 371 Generating slices based on a few parameters: 372 * lookback_window: Buffer to remove from END_KEY of the highest slice 373 * slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created 374 * start: `_split_per_slice_range` will clip any value to `self._start which means that: 375 * if upper is less than self._start, no slices will be generated 376 * if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case) 377 378 Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be 379 inclusive in the API that is queried. 380 """ 381 self._merge_partitions() 382 383 if self._start is not None and self._is_start_before_first_slice(): 384 yield from self._split_per_slice_range( 385 self._start, 386 self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY], 387 False, 388 ) 389 390 if len(self._concurrent_state["slices"]) == 1: 391 yield from self._split_per_slice_range( 392 self._calculate_lower_boundary_of_last_slice( 393 self._concurrent_state["slices"][0][self._connector_state_converter.END_KEY] 394 ), 395 self._end_provider(), 396 True, 397 ) 398 elif len(self._concurrent_state["slices"]) > 1: 399 for i in range(len(self._concurrent_state["slices"]) - 1): 400 if self._cursor_granularity: 401 yield from self._split_per_slice_range( 402 self._concurrent_state["slices"][i][self._connector_state_converter.END_KEY] 403 + self._cursor_granularity, 404 self._concurrent_state["slices"][i + 1][ 405 self._connector_state_converter.START_KEY 406 ], 407 False, 408 ) 409 else: 410 yield from self._split_per_slice_range( 411 self._concurrent_state["slices"][i][ 412 self._connector_state_converter.END_KEY 413 ], 414 self._concurrent_state["slices"][i + 1][ 415 self._connector_state_converter.START_KEY 416 ], 417 False, 418 ) 419 yield from self._split_per_slice_range( 420 self._calculate_lower_boundary_of_last_slice( 421 self._concurrent_state["slices"][-1][self._connector_state_converter.END_KEY] 422 ), 423 self._end_provider(), 424 True, 425 ) 426 else: 427 raise ValueError("Expected at least one slice") 428 429 def _is_start_before_first_slice(self) -> bool: 430 return ( 431 self._start is not None 432 and self._start 433 < self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY] 434 ) 435 436 def _calculate_lower_boundary_of_last_slice( 437 self, lower_boundary: CursorValueType 438 ) -> CursorValueType: 439 if self._lookback_window: 440 return lower_boundary - self._lookback_window 441 return lower_boundary 442 443 def _split_per_slice_range( 444 self, lower: CursorValueType, upper: CursorValueType, upper_is_end: bool 445 ) -> Iterable[StreamSlice]: 446 if lower >= upper: 447 return 448 449 if self._start and upper < self._start: 450 return 451 452 lower = max(lower, self._start) if self._start else lower 453 if not self._slice_range or self._evaluate_upper_safely(lower, self._slice_range) >= upper: 454 clamped_lower = self._clamping_strategy.clamp(lower) 455 clamped_upper = self._clamping_strategy.clamp(upper) 456 start_value, end_value = ( 457 (clamped_lower, clamped_upper - self._cursor_granularity) 458 if self._cursor_granularity and not upper_is_end 459 else (clamped_lower, clamped_upper) 460 ) 461 yield StreamSlice( 462 partition={}, 463 cursor_slice={ 464 self._slice_boundary_fields_wrapper[ 465 self._START_BOUNDARY 466 ]: self._connector_state_converter.output_format(start_value), 467 self._slice_boundary_fields_wrapper[ 468 self._END_BOUNDARY 469 ]: self._connector_state_converter.output_format(end_value), 470 }, 471 ) 472 else: 473 stop_processing = False 474 current_lower_boundary = lower 475 while not stop_processing: 476 current_upper_boundary = min( 477 self._evaluate_upper_safely(current_lower_boundary, self._slice_range), upper 478 ) 479 has_reached_upper_boundary = current_upper_boundary >= upper 480 481 clamped_upper = ( 482 self._clamping_strategy.clamp(current_upper_boundary) 483 if current_upper_boundary != upper 484 else current_upper_boundary 485 ) 486 clamped_lower = self._clamping_strategy.clamp(current_lower_boundary) 487 if clamped_lower >= clamped_upper: 488 # clamping collapsed both values which means that it is time to stop processing 489 # FIXME should this be replace by proper end_provider 490 break 491 start_value, end_value = ( 492 (clamped_lower, clamped_upper - self._cursor_granularity) 493 if self._cursor_granularity 494 and (not upper_is_end or not has_reached_upper_boundary) 495 else (clamped_lower, clamped_upper) 496 ) 497 yield StreamSlice( 498 partition={}, 499 cursor_slice={ 500 self._slice_boundary_fields_wrapper[ 501 self._START_BOUNDARY 502 ]: self._connector_state_converter.output_format(start_value), 503 self._slice_boundary_fields_wrapper[ 504 self._END_BOUNDARY 505 ]: self._connector_state_converter.output_format(end_value), 506 }, 507 ) 508 current_lower_boundary = clamped_upper 509 if current_upper_boundary >= upper: 510 stop_processing = True 511 512 def _evaluate_upper_safely(self, lower: CursorValueType, step: GapType) -> CursorValueType: 513 """ 514 Given that we set the default step at datetime.timedelta.max, we will generate an OverflowError when evaluating the next start_date 515 This method assumes that users would never enter a step that would generate an overflow. Given that would be the case, the code 516 would have broken anyway. 517 """ 518 try: 519 return lower + step 520 except OverflowError: 521 return self._end_provider() 522 523 def should_be_synced(self, record: Record) -> bool: 524 """ 525 Determines if a record should be synced based on its cursor value. 526 :param record: The record to evaluate 527 528 :return: True if the record's cursor value falls within the sync boundaries 529 """ 530 try: 531 record_cursor_value: CursorValueType = self._extract_cursor_value(record) 532 except ValueError: 533 self._log_for_record_without_cursor_value() 534 return True 535 return self.start <= record_cursor_value <= self._end_provider() 536 537 def _log_for_record_without_cursor_value(self) -> None: 538 if not self._should_be_synced_logger_triggered: 539 LOGGER.warning( 540 f"Could not find cursor field `{self.cursor_field.cursor_field_key}` in record for stream {self._stream_name}. The incremental sync will assume it needs to be synced" 541 ) 542 self._should_be_synced_logger_triggered = True 543 544 def reduce_slice_range(self, stream_slice: StreamSlice) -> StreamSlice: 545 # In theory, we might be more flexible here meaning that it doesn't need to be in ascending order but it just 546 # needs to be ordered. For now though, we will only support ascending order. 547 if not self._is_ascending_order: 548 LOGGER.warning( 549 "Attempting to reduce slice while records are not returned in incremental order might lead to missing records" 550 ) 551 552 if stream_slice in self._most_recent_cursor_value_per_partition: 553 return StreamSlice( 554 partition=stream_slice.partition, 555 cursor_slice={ 556 self._slice_boundary_fields_wrapper[ 557 self._START_BOUNDARY 558 ]: self._connector_state_converter.output_format( 559 self._most_recent_cursor_value_per_partition[stream_slice] 560 ), 561 self._slice_boundary_fields_wrapper[ 562 self._END_BOUNDARY 563 ]: stream_slice.cursor_slice[ 564 self._slice_boundary_fields_wrapper[self._END_BOUNDARY] 565 ], 566 }, 567 extra_fields=stream_slice.extra_fields, 568 ) 569 else: 570 return stream_slice
41class CursorField: 42 def __init__( 43 self, cursor_field_key: str, supports_catalog_defined_cursor_field: bool = False 44 ) -> None: 45 self.cursor_field_key = cursor_field_key 46 self.supports_catalog_defined_cursor_field = supports_catalog_defined_cursor_field 47 48 def extract_value(self, record: Record) -> Any: 49 cursor_value = record.data.get(self.cursor_field_key) 50 if cursor_value is None: 51 raise ValueError(f"Could not find cursor field {self.cursor_field_key} in record") 52 return cursor_value # type: ignore # we assume that the value the path points at is a comparable
48 def extract_value(self, record: Record) -> Any: 49 cursor_value = record.data.get(self.cursor_field_key) 50 if cursor_value is None: 51 raise ValueError(f"Could not find cursor field {self.cursor_field_key} in record") 52 return cursor_value # type: ignore # we assume that the value the path points at is a comparable
55class Cursor(StreamSlicer, ABC): 56 @property 57 @abstractmethod 58 def state(self) -> MutableMapping[str, Any]: ... 59 60 @abstractmethod 61 def observe(self, record: Record) -> None: 62 """ 63 Indicate to the cursor that the record has been emitted 64 """ 65 raise NotImplementedError() 66 67 @abstractmethod 68 def close_partition(self, partition: Partition) -> None: 69 """ 70 Indicate to the cursor that the partition has been successfully processed 71 """ 72 raise NotImplementedError() 73 74 @abstractmethod 75 def ensure_at_least_one_state_emitted(self) -> None: 76 """ 77 State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per 78 stream. Hence, if no partitions are generated, this method needs to be called. 79 """ 80 raise NotImplementedError() 81 82 @abstractmethod 83 def should_be_synced(self, record: Record) -> bool: 84 pass 85 86 def stream_slices(self) -> Iterable[StreamSlice]: 87 """ 88 Default placeholder implementation of generate_slices. 89 Subclasses can override this method to provide actual behavior. 90 """ 91 yield StreamSlice(partition={}, cursor_slice={})
Slices the stream into chunks that can be fetched independently. Slices enable state checkpointing and data retrieval parallelization.
60 @abstractmethod 61 def observe(self, record: Record) -> None: 62 """ 63 Indicate to the cursor that the record has been emitted 64 """ 65 raise NotImplementedError()
Indicate to the cursor that the record has been emitted
67 @abstractmethod 68 def close_partition(self, partition: Partition) -> None: 69 """ 70 Indicate to the cursor that the partition has been successfully processed 71 """ 72 raise NotImplementedError()
Indicate to the cursor that the partition has been successfully processed
74 @abstractmethod 75 def ensure_at_least_one_state_emitted(self) -> None: 76 """ 77 State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per 78 stream. Hence, if no partitions are generated, this method needs to be called. 79 """ 80 raise NotImplementedError()
State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per stream. Hence, if no partitions are generated, this method needs to be called.
86 def stream_slices(self) -> Iterable[StreamSlice]: 87 """ 88 Default placeholder implementation of generate_slices. 89 Subclasses can override this method to provide actual behavior. 90 """ 91 yield StreamSlice(partition={}, cursor_slice={})
Default placeholder implementation of generate_slices. Subclasses can override this method to provide actual behavior.
94class FinalStateCursor(Cursor): 95 """Cursor that is used to guarantee at least one state message is emitted for a concurrent stream.""" 96 97 def __init__( 98 self, 99 stream_name: str, 100 stream_namespace: Optional[str], 101 message_repository: MessageRepository, 102 ) -> None: 103 self._stream_name = stream_name 104 self._stream_namespace = stream_namespace 105 self._message_repository = message_repository 106 # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel 107 # state message rather than manage overall source state. This is also only temporary as we move to the resumable 108 # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state. 109 self._connector_state_manager = ConnectorStateManager() 110 self._has_closed_at_least_one_slice = False 111 112 @property 113 def state(self) -> MutableMapping[str, Any]: 114 return {NO_CURSOR_STATE_KEY: True} 115 116 def observe(self, record: Record) -> None: 117 pass 118 119 def close_partition(self, partition: Partition) -> None: 120 pass 121 122 def ensure_at_least_one_state_emitted(self) -> None: 123 """ 124 Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync 125 """ 126 127 self._connector_state_manager.update_state_for_stream( 128 self._stream_name, self._stream_namespace, self.state 129 ) 130 state_message = self._connector_state_manager.create_state_message( 131 self._stream_name, self._stream_namespace 132 ) 133 self._message_repository.emit_message(state_message) 134 135 def should_be_synced(self, record: Record) -> bool: 136 return True
Cursor that is used to guarantee at least one state message is emitted for a concurrent stream.
97 def __init__( 98 self, 99 stream_name: str, 100 stream_namespace: Optional[str], 101 message_repository: MessageRepository, 102 ) -> None: 103 self._stream_name = stream_name 104 self._stream_namespace = stream_namespace 105 self._message_repository = message_repository 106 # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel 107 # state message rather than manage overall source state. This is also only temporary as we move to the resumable 108 # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state. 109 self._connector_state_manager = ConnectorStateManager() 110 self._has_closed_at_least_one_slice = False
Indicate to the cursor that the partition has been successfully processed
122 def ensure_at_least_one_state_emitted(self) -> None: 123 """ 124 Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync 125 """ 126 127 self._connector_state_manager.update_state_for_stream( 128 self._stream_name, self._stream_namespace, self.state 129 ) 130 state_message = self._connector_state_manager.create_state_message( 131 self._stream_name, self._stream_namespace 132 ) 133 self._message_repository.emit_message(state_message)
Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync
Inherited Members
139class ConcurrentCursor(Cursor): 140 _START_BOUNDARY = 0 141 _END_BOUNDARY = 1 142 143 def copy_without_state(self) -> "ConcurrentCursor": 144 return self.__class__( 145 stream_name=self._stream_name, 146 stream_namespace=self._stream_namespace, 147 stream_state={}, 148 message_repository=NoopMessageRepository(), 149 connector_state_manager=ConnectorStateManager(), 150 connector_state_converter=self._connector_state_converter, 151 cursor_field=self._cursor_field, 152 slice_boundary_fields=self._slice_boundary_fields, 153 start=self._start, 154 end_provider=self._end_provider, 155 lookback_window=self._lookback_window, 156 slice_range=self._slice_range, 157 cursor_granularity=self._cursor_granularity, 158 clamping_strategy=self._clamping_strategy, 159 ) 160 161 def __init__( 162 self, 163 stream_name: str, 164 stream_namespace: Optional[str], 165 stream_state: Any, 166 message_repository: MessageRepository, 167 connector_state_manager: ConnectorStateManager, 168 connector_state_converter: AbstractStreamStateConverter, 169 cursor_field: CursorField, 170 slice_boundary_fields: Optional[Tuple[str, str]], 171 start: Optional[CursorValueType], 172 end_provider: Callable[[], CursorValueType], 173 lookback_window: Optional[GapType] = None, 174 slice_range: Optional[GapType] = None, 175 cursor_granularity: Optional[GapType] = None, 176 clamping_strategy: ClampingStrategy = NoClamping(), 177 ) -> None: 178 self._stream_name = stream_name 179 self._stream_namespace = stream_namespace 180 self._message_repository = message_repository 181 self._connector_state_converter = connector_state_converter 182 self._connector_state_manager = connector_state_manager 183 self._cursor_field = cursor_field 184 # To see some example where the slice boundaries might not be defined, check https://github.com/airbytehq/airbyte/blob/1ce84d6396e446e1ac2377362446e3fb94509461/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py#L363-L379 185 self._slice_boundary_fields = slice_boundary_fields 186 self._start = start 187 self._end_provider = end_provider 188 self.start, self._concurrent_state = self._get_concurrent_state(stream_state) 189 self._lookback_window = lookback_window 190 self._slice_range = slice_range 191 self._most_recent_cursor_value_per_partition: MutableMapping[ 192 Union[StreamSlice, Mapping[str, Any], None], Any 193 ] = {} 194 self._has_closed_at_least_one_slice = False 195 self._cursor_granularity = cursor_granularity 196 # Flag to track if the logger has been triggered (per stream) 197 self._should_be_synced_logger_triggered = False 198 self._clamping_strategy = clamping_strategy 199 self._is_ascending_order = True 200 201 # A lock is required when closing a partition because updating the cursor's concurrent_state is 202 # not thread safe. When multiple partitions are being closed by the cursor at the same time, it is 203 # possible for one partition to update concurrent_state after a second partition has already read 204 # the previous state. This can lead to the second partition overwriting the previous one's state. 205 self._lock = threading.Lock() 206 207 @property 208 def state(self) -> MutableMapping[str, Any]: 209 return self._connector_state_converter.convert_to_state_message( 210 self.cursor_field, self._concurrent_state 211 ) 212 213 @property 214 def cursor_field(self) -> CursorField: 215 return self._cursor_field 216 217 @property 218 def _slice_boundary_fields_wrapper(self) -> Tuple[str, str]: 219 return ( 220 self._slice_boundary_fields 221 if self._slice_boundary_fields 222 else ( 223 self._connector_state_converter.START_KEY, 224 self._connector_state_converter.END_KEY, 225 ) 226 ) 227 228 def _get_concurrent_state( 229 self, state: MutableMapping[str, Any] 230 ) -> Tuple[CursorValueType, MutableMapping[str, Any]]: 231 if self._connector_state_converter.is_state_message_compatible(state): 232 partitioned_state = self._connector_state_converter.deserialize(state) 233 slices_from_partitioned_state = partitioned_state.get("slices", []) 234 235 value_from_partitioned_state = None 236 if slices_from_partitioned_state: 237 # We assume here that the slices have been already merged 238 first_slice = slices_from_partitioned_state[0] 239 value_from_partitioned_state = ( 240 first_slice[self._connector_state_converter.MOST_RECENT_RECORD_KEY] 241 if self._connector_state_converter.MOST_RECENT_RECORD_KEY in first_slice 242 else first_slice[self._connector_state_converter.END_KEY] 243 ) 244 return ( 245 value_from_partitioned_state 246 or self._start 247 or self._connector_state_converter.zero_value, 248 partitioned_state, 249 ) 250 return self._connector_state_converter.convert_from_sequential_state( 251 self._cursor_field, state, self._start 252 ) 253 254 def observe(self, record: Record) -> None: 255 # Because observe writes to the most_recent_cursor_value_per_partition mapping, 256 # it is not thread-safe. However, this shouldn't lead to concurrency issues because 257 # observe() is only invoked by PartitionReader.process_partition(). Since the map is 258 # broken down according to partition, concurrent threads processing only read/write 259 # from different keys which avoids any conflicts. 260 # 261 # If we were to add thread safety, we should implement a lock per-partition 262 # which is instantiated during stream_slices() 263 most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get( 264 record.associated_slice 265 ) 266 try: 267 cursor_value = self._extract_cursor_value(record) 268 269 if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value: 270 self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value 271 elif most_recent_cursor_value > cursor_value: 272 self._is_ascending_order = False 273 except ValueError: 274 self._log_for_record_without_cursor_value() 275 276 def _extract_cursor_value(self, record: Record) -> Any: 277 return self._connector_state_converter.parse_value(self._cursor_field.extract_value(record)) 278 279 def close_partition(self, partition: Partition) -> None: 280 with self._lock: 281 slice_count_before = len(self._concurrent_state.get("slices", [])) 282 self._add_slice_to_state(partition) 283 if slice_count_before < len( 284 self._concurrent_state["slices"] 285 ): # only emit if at least one slice has been processed 286 self._merge_partitions() 287 self._emit_state_message() 288 self._has_closed_at_least_one_slice = True 289 290 def _add_slice_to_state(self, partition: Partition) -> None: 291 most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get( 292 partition.to_slice() 293 ) 294 295 if self._slice_boundary_fields: 296 if "slices" not in self._concurrent_state: 297 raise RuntimeError( 298 f"The state for stream {self._stream_name} should have at least one slice to delineate the sync start time, but no slices are present. This is unexpected. Please contact Support." 299 ) 300 self._concurrent_state["slices"].append( 301 { 302 self._connector_state_converter.START_KEY: self._extract_from_slice( 303 partition, self._slice_boundary_fields[self._START_BOUNDARY] 304 ), 305 self._connector_state_converter.END_KEY: self._extract_from_slice( 306 partition, self._slice_boundary_fields[self._END_BOUNDARY] 307 ), 308 self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value, 309 } 310 ) 311 elif most_recent_cursor_value: 312 if self._has_closed_at_least_one_slice: 313 # If we track state value using records cursor field, we can only do that if there is one partition. This is because we save 314 # the state every time we close a partition. We assume that if there are multiple slices, they need to be providing 315 # boundaries. There are cases where partitions could not have boundaries: 316 # * The cursor should be per-partition 317 # * The stream state is actually the parent stream state 318 # There might be other cases not listed above. Those are not supported today hence the stream should not use this cursor for 319 # state management. For the specific user that was affected with this issue, we need to: 320 # * Fix state tracking (which is currently broken) 321 # * Make the new version available 322 # * (Probably) ask the user to reset the stream to avoid data loss 323 raise ValueError( 324 "Given that slice_boundary_fields is not defined and that per-partition state is not supported, only one slice is " 325 "expected. Please contact the Airbyte team." 326 ) 327 328 self._concurrent_state["slices"].append( 329 { 330 self._connector_state_converter.START_KEY: self.start, 331 self._connector_state_converter.END_KEY: most_recent_cursor_value, 332 self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value, 333 } 334 ) 335 336 def _emit_state_message(self) -> None: 337 self._connector_state_manager.update_state_for_stream( 338 self._stream_name, 339 self._stream_namespace, 340 self.state, 341 ) 342 state_message = self._connector_state_manager.create_state_message( 343 self._stream_name, self._stream_namespace 344 ) 345 self._message_repository.emit_message(state_message) 346 347 def _merge_partitions(self) -> None: 348 self._concurrent_state["slices"] = self._connector_state_converter.merge_intervals( 349 self._concurrent_state["slices"] 350 ) 351 352 def _extract_from_slice(self, partition: Partition, key: str) -> CursorValueType: 353 try: 354 _slice = partition.to_slice() 355 if not _slice: 356 raise KeyError(f"Could not find key `{key}` in empty slice") 357 return self._connector_state_converter.parse_value(_slice[key]) # type: ignore # we expect the devs to specify a key that would return a CursorValueType 358 except KeyError as exception: 359 raise KeyError( 360 f"Partition is expected to have key `{key}` but could not be found" 361 ) from exception 362 363 def ensure_at_least_one_state_emitted(self) -> None: 364 """ 365 The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be 366 called. 367 """ 368 self._emit_state_message() 369 370 def stream_slices(self) -> Iterable[StreamSlice]: 371 """ 372 Generating slices based on a few parameters: 373 * lookback_window: Buffer to remove from END_KEY of the highest slice 374 * slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created 375 * start: `_split_per_slice_range` will clip any value to `self._start which means that: 376 * if upper is less than self._start, no slices will be generated 377 * if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case) 378 379 Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be 380 inclusive in the API that is queried. 381 """ 382 self._merge_partitions() 383 384 if self._start is not None and self._is_start_before_first_slice(): 385 yield from self._split_per_slice_range( 386 self._start, 387 self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY], 388 False, 389 ) 390 391 if len(self._concurrent_state["slices"]) == 1: 392 yield from self._split_per_slice_range( 393 self._calculate_lower_boundary_of_last_slice( 394 self._concurrent_state["slices"][0][self._connector_state_converter.END_KEY] 395 ), 396 self._end_provider(), 397 True, 398 ) 399 elif len(self._concurrent_state["slices"]) > 1: 400 for i in range(len(self._concurrent_state["slices"]) - 1): 401 if self._cursor_granularity: 402 yield from self._split_per_slice_range( 403 self._concurrent_state["slices"][i][self._connector_state_converter.END_KEY] 404 + self._cursor_granularity, 405 self._concurrent_state["slices"][i + 1][ 406 self._connector_state_converter.START_KEY 407 ], 408 False, 409 ) 410 else: 411 yield from self._split_per_slice_range( 412 self._concurrent_state["slices"][i][ 413 self._connector_state_converter.END_KEY 414 ], 415 self._concurrent_state["slices"][i + 1][ 416 self._connector_state_converter.START_KEY 417 ], 418 False, 419 ) 420 yield from self._split_per_slice_range( 421 self._calculate_lower_boundary_of_last_slice( 422 self._concurrent_state["slices"][-1][self._connector_state_converter.END_KEY] 423 ), 424 self._end_provider(), 425 True, 426 ) 427 else: 428 raise ValueError("Expected at least one slice") 429 430 def _is_start_before_first_slice(self) -> bool: 431 return ( 432 self._start is not None 433 and self._start 434 < self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY] 435 ) 436 437 def _calculate_lower_boundary_of_last_slice( 438 self, lower_boundary: CursorValueType 439 ) -> CursorValueType: 440 if self._lookback_window: 441 return lower_boundary - self._lookback_window 442 return lower_boundary 443 444 def _split_per_slice_range( 445 self, lower: CursorValueType, upper: CursorValueType, upper_is_end: bool 446 ) -> Iterable[StreamSlice]: 447 if lower >= upper: 448 return 449 450 if self._start and upper < self._start: 451 return 452 453 lower = max(lower, self._start) if self._start else lower 454 if not self._slice_range or self._evaluate_upper_safely(lower, self._slice_range) >= upper: 455 clamped_lower = self._clamping_strategy.clamp(lower) 456 clamped_upper = self._clamping_strategy.clamp(upper) 457 start_value, end_value = ( 458 (clamped_lower, clamped_upper - self._cursor_granularity) 459 if self._cursor_granularity and not upper_is_end 460 else (clamped_lower, clamped_upper) 461 ) 462 yield StreamSlice( 463 partition={}, 464 cursor_slice={ 465 self._slice_boundary_fields_wrapper[ 466 self._START_BOUNDARY 467 ]: self._connector_state_converter.output_format(start_value), 468 self._slice_boundary_fields_wrapper[ 469 self._END_BOUNDARY 470 ]: self._connector_state_converter.output_format(end_value), 471 }, 472 ) 473 else: 474 stop_processing = False 475 current_lower_boundary = lower 476 while not stop_processing: 477 current_upper_boundary = min( 478 self._evaluate_upper_safely(current_lower_boundary, self._slice_range), upper 479 ) 480 has_reached_upper_boundary = current_upper_boundary >= upper 481 482 clamped_upper = ( 483 self._clamping_strategy.clamp(current_upper_boundary) 484 if current_upper_boundary != upper 485 else current_upper_boundary 486 ) 487 clamped_lower = self._clamping_strategy.clamp(current_lower_boundary) 488 if clamped_lower >= clamped_upper: 489 # clamping collapsed both values which means that it is time to stop processing 490 # FIXME should this be replace by proper end_provider 491 break 492 start_value, end_value = ( 493 (clamped_lower, clamped_upper - self._cursor_granularity) 494 if self._cursor_granularity 495 and (not upper_is_end or not has_reached_upper_boundary) 496 else (clamped_lower, clamped_upper) 497 ) 498 yield StreamSlice( 499 partition={}, 500 cursor_slice={ 501 self._slice_boundary_fields_wrapper[ 502 self._START_BOUNDARY 503 ]: self._connector_state_converter.output_format(start_value), 504 self._slice_boundary_fields_wrapper[ 505 self._END_BOUNDARY 506 ]: self._connector_state_converter.output_format(end_value), 507 }, 508 ) 509 current_lower_boundary = clamped_upper 510 if current_upper_boundary >= upper: 511 stop_processing = True 512 513 def _evaluate_upper_safely(self, lower: CursorValueType, step: GapType) -> CursorValueType: 514 """ 515 Given that we set the default step at datetime.timedelta.max, we will generate an OverflowError when evaluating the next start_date 516 This method assumes that users would never enter a step that would generate an overflow. Given that would be the case, the code 517 would have broken anyway. 518 """ 519 try: 520 return lower + step 521 except OverflowError: 522 return self._end_provider() 523 524 def should_be_synced(self, record: Record) -> bool: 525 """ 526 Determines if a record should be synced based on its cursor value. 527 :param record: The record to evaluate 528 529 :return: True if the record's cursor value falls within the sync boundaries 530 """ 531 try: 532 record_cursor_value: CursorValueType = self._extract_cursor_value(record) 533 except ValueError: 534 self._log_for_record_without_cursor_value() 535 return True 536 return self.start <= record_cursor_value <= self._end_provider() 537 538 def _log_for_record_without_cursor_value(self) -> None: 539 if not self._should_be_synced_logger_triggered: 540 LOGGER.warning( 541 f"Could not find cursor field `{self.cursor_field.cursor_field_key}` in record for stream {self._stream_name}. The incremental sync will assume it needs to be synced" 542 ) 543 self._should_be_synced_logger_triggered = True 544 545 def reduce_slice_range(self, stream_slice: StreamSlice) -> StreamSlice: 546 # In theory, we might be more flexible here meaning that it doesn't need to be in ascending order but it just 547 # needs to be ordered. For now though, we will only support ascending order. 548 if not self._is_ascending_order: 549 LOGGER.warning( 550 "Attempting to reduce slice while records are not returned in incremental order might lead to missing records" 551 ) 552 553 if stream_slice in self._most_recent_cursor_value_per_partition: 554 return StreamSlice( 555 partition=stream_slice.partition, 556 cursor_slice={ 557 self._slice_boundary_fields_wrapper[ 558 self._START_BOUNDARY 559 ]: self._connector_state_converter.output_format( 560 self._most_recent_cursor_value_per_partition[stream_slice] 561 ), 562 self._slice_boundary_fields_wrapper[ 563 self._END_BOUNDARY 564 ]: stream_slice.cursor_slice[ 565 self._slice_boundary_fields_wrapper[self._END_BOUNDARY] 566 ], 567 }, 568 extra_fields=stream_slice.extra_fields, 569 ) 570 else: 571 return stream_slice
Slices the stream into chunks that can be fetched independently. Slices enable state checkpointing and data retrieval parallelization.
161 def __init__( 162 self, 163 stream_name: str, 164 stream_namespace: Optional[str], 165 stream_state: Any, 166 message_repository: MessageRepository, 167 connector_state_manager: ConnectorStateManager, 168 connector_state_converter: AbstractStreamStateConverter, 169 cursor_field: CursorField, 170 slice_boundary_fields: Optional[Tuple[str, str]], 171 start: Optional[CursorValueType], 172 end_provider: Callable[[], CursorValueType], 173 lookback_window: Optional[GapType] = None, 174 slice_range: Optional[GapType] = None, 175 cursor_granularity: Optional[GapType] = None, 176 clamping_strategy: ClampingStrategy = NoClamping(), 177 ) -> None: 178 self._stream_name = stream_name 179 self._stream_namespace = stream_namespace 180 self._message_repository = message_repository 181 self._connector_state_converter = connector_state_converter 182 self._connector_state_manager = connector_state_manager 183 self._cursor_field = cursor_field 184 # To see some example where the slice boundaries might not be defined, check https://github.com/airbytehq/airbyte/blob/1ce84d6396e446e1ac2377362446e3fb94509461/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py#L363-L379 185 self._slice_boundary_fields = slice_boundary_fields 186 self._start = start 187 self._end_provider = end_provider 188 self.start, self._concurrent_state = self._get_concurrent_state(stream_state) 189 self._lookback_window = lookback_window 190 self._slice_range = slice_range 191 self._most_recent_cursor_value_per_partition: MutableMapping[ 192 Union[StreamSlice, Mapping[str, Any], None], Any 193 ] = {} 194 self._has_closed_at_least_one_slice = False 195 self._cursor_granularity = cursor_granularity 196 # Flag to track if the logger has been triggered (per stream) 197 self._should_be_synced_logger_triggered = False 198 self._clamping_strategy = clamping_strategy 199 self._is_ascending_order = True 200 201 # A lock is required when closing a partition because updating the cursor's concurrent_state is 202 # not thread safe. When multiple partitions are being closed by the cursor at the same time, it is 203 # possible for one partition to update concurrent_state after a second partition has already read 204 # the previous state. This can lead to the second partition overwriting the previous one's state. 205 self._lock = threading.Lock()
143 def copy_without_state(self) -> "ConcurrentCursor": 144 return self.__class__( 145 stream_name=self._stream_name, 146 stream_namespace=self._stream_namespace, 147 stream_state={}, 148 message_repository=NoopMessageRepository(), 149 connector_state_manager=ConnectorStateManager(), 150 connector_state_converter=self._connector_state_converter, 151 cursor_field=self._cursor_field, 152 slice_boundary_fields=self._slice_boundary_fields, 153 start=self._start, 154 end_provider=self._end_provider, 155 lookback_window=self._lookback_window, 156 slice_range=self._slice_range, 157 cursor_granularity=self._cursor_granularity, 158 clamping_strategy=self._clamping_strategy, 159 )
254 def observe(self, record: Record) -> None: 255 # Because observe writes to the most_recent_cursor_value_per_partition mapping, 256 # it is not thread-safe. However, this shouldn't lead to concurrency issues because 257 # observe() is only invoked by PartitionReader.process_partition(). Since the map is 258 # broken down according to partition, concurrent threads processing only read/write 259 # from different keys which avoids any conflicts. 260 # 261 # If we were to add thread safety, we should implement a lock per-partition 262 # which is instantiated during stream_slices() 263 most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get( 264 record.associated_slice 265 ) 266 try: 267 cursor_value = self._extract_cursor_value(record) 268 269 if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value: 270 self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value 271 elif most_recent_cursor_value > cursor_value: 272 self._is_ascending_order = False 273 except ValueError: 274 self._log_for_record_without_cursor_value()
Indicate to the cursor that the record has been emitted
279 def close_partition(self, partition: Partition) -> None: 280 with self._lock: 281 slice_count_before = len(self._concurrent_state.get("slices", [])) 282 self._add_slice_to_state(partition) 283 if slice_count_before < len( 284 self._concurrent_state["slices"] 285 ): # only emit if at least one slice has been processed 286 self._merge_partitions() 287 self._emit_state_message() 288 self._has_closed_at_least_one_slice = True
Indicate to the cursor that the partition has been successfully processed
363 def ensure_at_least_one_state_emitted(self) -> None: 364 """ 365 The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be 366 called. 367 """ 368 self._emit_state_message()
The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be called.
370 def stream_slices(self) -> Iterable[StreamSlice]: 371 """ 372 Generating slices based on a few parameters: 373 * lookback_window: Buffer to remove from END_KEY of the highest slice 374 * slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created 375 * start: `_split_per_slice_range` will clip any value to `self._start which means that: 376 * if upper is less than self._start, no slices will be generated 377 * if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case) 378 379 Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be 380 inclusive in the API that is queried. 381 """ 382 self._merge_partitions() 383 384 if self._start is not None and self._is_start_before_first_slice(): 385 yield from self._split_per_slice_range( 386 self._start, 387 self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY], 388 False, 389 ) 390 391 if len(self._concurrent_state["slices"]) == 1: 392 yield from self._split_per_slice_range( 393 self._calculate_lower_boundary_of_last_slice( 394 self._concurrent_state["slices"][0][self._connector_state_converter.END_KEY] 395 ), 396 self._end_provider(), 397 True, 398 ) 399 elif len(self._concurrent_state["slices"]) > 1: 400 for i in range(len(self._concurrent_state["slices"]) - 1): 401 if self._cursor_granularity: 402 yield from self._split_per_slice_range( 403 self._concurrent_state["slices"][i][self._connector_state_converter.END_KEY] 404 + self._cursor_granularity, 405 self._concurrent_state["slices"][i + 1][ 406 self._connector_state_converter.START_KEY 407 ], 408 False, 409 ) 410 else: 411 yield from self._split_per_slice_range( 412 self._concurrent_state["slices"][i][ 413 self._connector_state_converter.END_KEY 414 ], 415 self._concurrent_state["slices"][i + 1][ 416 self._connector_state_converter.START_KEY 417 ], 418 False, 419 ) 420 yield from self._split_per_slice_range( 421 self._calculate_lower_boundary_of_last_slice( 422 self._concurrent_state["slices"][-1][self._connector_state_converter.END_KEY] 423 ), 424 self._end_provider(), 425 True, 426 ) 427 else: 428 raise ValueError("Expected at least one slice")
Generating slices based on a few parameters:
- lookback_window: Buffer to remove from END_KEY of the highest slice
- slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created
- start:
_split_per_slice_rangewill clip any value to `self._start which means that:- if upper is less than self._start, no slices will be generated
- if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case)
Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be inclusive in the API that is queried.
524 def should_be_synced(self, record: Record) -> bool: 525 """ 526 Determines if a record should be synced based on its cursor value. 527 :param record: The record to evaluate 528 529 :return: True if the record's cursor value falls within the sync boundaries 530 """ 531 try: 532 record_cursor_value: CursorValueType = self._extract_cursor_value(record) 533 except ValueError: 534 self._log_for_record_without_cursor_value() 535 return True 536 return self.start <= record_cursor_value <= self._end_provider()
Determines if a record should be synced based on its cursor value.
Parameters
- record: The record to evaluate
Returns
True if the record's cursor value falls within the sync boundaries
545 def reduce_slice_range(self, stream_slice: StreamSlice) -> StreamSlice: 546 # In theory, we might be more flexible here meaning that it doesn't need to be in ascending order but it just 547 # needs to be ordered. For now though, we will only support ascending order. 548 if not self._is_ascending_order: 549 LOGGER.warning( 550 "Attempting to reduce slice while records are not returned in incremental order might lead to missing records" 551 ) 552 553 if stream_slice in self._most_recent_cursor_value_per_partition: 554 return StreamSlice( 555 partition=stream_slice.partition, 556 cursor_slice={ 557 self._slice_boundary_fields_wrapper[ 558 self._START_BOUNDARY 559 ]: self._connector_state_converter.output_format( 560 self._most_recent_cursor_value_per_partition[stream_slice] 561 ), 562 self._slice_boundary_fields_wrapper[ 563 self._END_BOUNDARY 564 ]: stream_slice.cursor_slice[ 565 self._slice_boundary_fields_wrapper[self._END_BOUNDARY] 566 ], 567 }, 568 extra_fields=stream_slice.extra_fields, 569 ) 570 else: 571 return stream_slice