airbyte_cdk.sources.streams.concurrent.cursor
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import functools 6import logging 7import threading 8from abc import ABC, abstractmethod 9from typing import ( 10 Any, 11 Callable, 12 Iterable, 13 List, 14 Mapping, 15 MutableMapping, 16 Optional, 17 Tuple, 18 Union, 19) 20 21from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager 22from airbyte_cdk.sources.message import MessageRepository 23from airbyte_cdk.sources.streams import NO_CURSOR_STATE_KEY 24from airbyte_cdk.sources.streams.concurrent.clamping import ClampingStrategy, NoClamping 25from airbyte_cdk.sources.streams.concurrent.cursor_types import CursorValueType, GapType 26from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition 27from airbyte_cdk.sources.streams.concurrent.partitions.stream_slicer import StreamSlicer 28from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import ( 29 AbstractStreamStateConverter, 30) 31from airbyte_cdk.sources.types import Record, StreamSlice 32 33LOGGER = logging.getLogger("airbyte") 34 35 36def _extract_value(mapping: Mapping[str, Any], path: List[str]) -> Any: 37 return functools.reduce(lambda a, b: a[b], path, mapping) 38 39 40class CursorField: 41 def __init__(self, cursor_field_key: str) -> None: 42 self.cursor_field_key = cursor_field_key 43 44 def extract_value(self, record: Record) -> CursorValueType: 45 cursor_value = record.data.get(self.cursor_field_key) 46 if cursor_value is None: 47 raise ValueError(f"Could not find cursor field {self.cursor_field_key} in record") 48 return cursor_value # type: ignore # we assume that the value the path points at is a comparable 49 50 51class Cursor(StreamSlicer, ABC): 52 @property 53 @abstractmethod 54 def state(self) -> MutableMapping[str, Any]: ... 55 56 @abstractmethod 57 def observe(self, record: Record) -> None: 58 """ 59 Indicate to the cursor that the record has been emitted 60 """ 61 raise NotImplementedError() 62 63 @abstractmethod 64 def close_partition(self, partition: Partition) -> None: 65 """ 66 Indicate to the cursor that the partition has been successfully processed 67 """ 68 raise NotImplementedError() 69 70 @abstractmethod 71 def ensure_at_least_one_state_emitted(self) -> None: 72 """ 73 State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per 74 stream. Hence, if no partitions are generated, this method needs to be called. 75 """ 76 raise NotImplementedError() 77 78 @abstractmethod 79 def should_be_synced(self, record: Record) -> bool: 80 pass 81 82 def stream_slices(self) -> Iterable[StreamSlice]: 83 """ 84 Default placeholder implementation of generate_slices. 85 Subclasses can override this method to provide actual behavior. 86 """ 87 yield StreamSlice(partition={}, cursor_slice={}) 88 89 90class FinalStateCursor(Cursor): 91 """Cursor that is used to guarantee at least one state message is emitted for a concurrent stream.""" 92 93 def __init__( 94 self, 95 stream_name: str, 96 stream_namespace: Optional[str], 97 message_repository: MessageRepository, 98 ) -> None: 99 self._stream_name = stream_name 100 self._stream_namespace = stream_namespace 101 self._message_repository = message_repository 102 # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel 103 # state message rather than manage overall source state. This is also only temporary as we move to the resumable 104 # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state. 105 self._connector_state_manager = ConnectorStateManager() 106 self._has_closed_at_least_one_slice = False 107 108 @property 109 def state(self) -> MutableMapping[str, Any]: 110 return {NO_CURSOR_STATE_KEY: True} 111 112 def observe(self, record: Record) -> None: 113 pass 114 115 def close_partition(self, partition: Partition) -> None: 116 pass 117 118 def ensure_at_least_one_state_emitted(self) -> None: 119 """ 120 Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync 121 """ 122 123 self._connector_state_manager.update_state_for_stream( 124 self._stream_name, self._stream_namespace, self.state 125 ) 126 state_message = self._connector_state_manager.create_state_message( 127 self._stream_name, self._stream_namespace 128 ) 129 self._message_repository.emit_message(state_message) 130 131 def should_be_synced(self, record: Record) -> bool: 132 return True 133 134 135class ConcurrentCursor(Cursor): 136 _START_BOUNDARY = 0 137 _END_BOUNDARY = 1 138 139 def __init__( 140 self, 141 stream_name: str, 142 stream_namespace: Optional[str], 143 stream_state: Any, 144 message_repository: MessageRepository, 145 connector_state_manager: ConnectorStateManager, 146 connector_state_converter: AbstractStreamStateConverter, 147 cursor_field: CursorField, 148 slice_boundary_fields: Optional[Tuple[str, str]], 149 start: Optional[CursorValueType], 150 end_provider: Callable[[], CursorValueType], 151 lookback_window: Optional[GapType] = None, 152 slice_range: Optional[GapType] = None, 153 cursor_granularity: Optional[GapType] = None, 154 clamping_strategy: ClampingStrategy = NoClamping(), 155 ) -> None: 156 self._stream_name = stream_name 157 self._stream_namespace = stream_namespace 158 self._message_repository = message_repository 159 self._connector_state_converter = connector_state_converter 160 self._connector_state_manager = connector_state_manager 161 self._cursor_field = cursor_field 162 # To see some example where the slice boundaries might not be defined, check https://github.com/airbytehq/airbyte/blob/1ce84d6396e446e1ac2377362446e3fb94509461/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py#L363-L379 163 self._slice_boundary_fields = slice_boundary_fields 164 self._start = start 165 self._end_provider = end_provider 166 self.start, self._concurrent_state = self._get_concurrent_state(stream_state) 167 self._lookback_window = lookback_window 168 self._slice_range = slice_range 169 self._most_recent_cursor_value_per_partition: MutableMapping[ 170 Union[StreamSlice, Mapping[str, Any], None], Any 171 ] = {} 172 self._has_closed_at_least_one_slice = False 173 self._cursor_granularity = cursor_granularity 174 # Flag to track if the logger has been triggered (per stream) 175 self._should_be_synced_logger_triggered = False 176 self._clamping_strategy = clamping_strategy 177 178 # A lock is required when closing a partition because updating the cursor's concurrent_state is 179 # not thread safe. When multiple partitions are being closed by the cursor at the same time, it is 180 # possible for one partition to update concurrent_state after a second partition has already read 181 # the previous state. This can lead to the second partition overwriting the previous one's state. 182 self._lock = threading.Lock() 183 184 @property 185 def state(self) -> MutableMapping[str, Any]: 186 return self._connector_state_converter.convert_to_state_message( 187 self.cursor_field, self._concurrent_state 188 ) 189 190 @property 191 def cursor_field(self) -> CursorField: 192 return self._cursor_field 193 194 @property 195 def _slice_boundary_fields_wrapper(self) -> Tuple[str, str]: 196 return ( 197 self._slice_boundary_fields 198 if self._slice_boundary_fields 199 else ( 200 self._connector_state_converter.START_KEY, 201 self._connector_state_converter.END_KEY, 202 ) 203 ) 204 205 def _get_concurrent_state( 206 self, state: MutableMapping[str, Any] 207 ) -> Tuple[CursorValueType, MutableMapping[str, Any]]: 208 if self._connector_state_converter.is_state_message_compatible(state): 209 partitioned_state = self._connector_state_converter.deserialize(state) 210 slices_from_partitioned_state = partitioned_state.get("slices", []) 211 212 value_from_partitioned_state = None 213 if slices_from_partitioned_state: 214 # We assume here that the slices have been already merged 215 first_slice = slices_from_partitioned_state[0] 216 value_from_partitioned_state = ( 217 first_slice[self._connector_state_converter.MOST_RECENT_RECORD_KEY] 218 if self._connector_state_converter.MOST_RECENT_RECORD_KEY in first_slice 219 else first_slice[self._connector_state_converter.END_KEY] 220 ) 221 return ( 222 value_from_partitioned_state 223 or self._start 224 or self._connector_state_converter.zero_value, 225 partitioned_state, 226 ) 227 return self._connector_state_converter.convert_from_sequential_state( 228 self._cursor_field, state, self._start 229 ) 230 231 def observe(self, record: Record) -> None: 232 # Because observe writes to the most_recent_cursor_value_per_partition mapping, 233 # it is not thread-safe. However, this shouldn't lead to concurrency issues because 234 # observe() is only invoked by PartitionReader.process_partition(). Since the map is 235 # broken down according to partition, concurrent threads processing only read/write 236 # from different keys which avoids any conflicts. 237 # 238 # If we were to add thread safety, we should implement a lock per-partition 239 # which is instantiated during stream_slices() 240 most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get( 241 record.associated_slice 242 ) 243 try: 244 cursor_value = self._extract_cursor_value(record) 245 246 if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value: 247 self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value 248 except ValueError: 249 self._log_for_record_without_cursor_value() 250 251 def _extract_cursor_value(self, record: Record) -> Any: 252 return self._connector_state_converter.parse_value(self._cursor_field.extract_value(record)) 253 254 def close_partition(self, partition: Partition) -> None: 255 with self._lock: 256 slice_count_before = len(self._concurrent_state.get("slices", [])) 257 self._add_slice_to_state(partition) 258 if slice_count_before < len( 259 self._concurrent_state["slices"] 260 ): # only emit if at least one slice has been processed 261 self._merge_partitions() 262 self._emit_state_message() 263 self._has_closed_at_least_one_slice = True 264 265 def _add_slice_to_state(self, partition: Partition) -> None: 266 most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get( 267 partition.to_slice() 268 ) 269 270 if self._slice_boundary_fields: 271 if "slices" not in self._concurrent_state: 272 raise RuntimeError( 273 f"The state for stream {self._stream_name} should have at least one slice to delineate the sync start time, but no slices are present. This is unexpected. Please contact Support." 274 ) 275 self._concurrent_state["slices"].append( 276 { 277 self._connector_state_converter.START_KEY: self._extract_from_slice( 278 partition, self._slice_boundary_fields[self._START_BOUNDARY] 279 ), 280 self._connector_state_converter.END_KEY: self._extract_from_slice( 281 partition, self._slice_boundary_fields[self._END_BOUNDARY] 282 ), 283 self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value, 284 } 285 ) 286 elif most_recent_cursor_value: 287 if self._has_closed_at_least_one_slice: 288 # If we track state value using records cursor field, we can only do that if there is one partition. This is because we save 289 # the state every time we close a partition. We assume that if there are multiple slices, they need to be providing 290 # boundaries. There are cases where partitions could not have boundaries: 291 # * The cursor should be per-partition 292 # * The stream state is actually the parent stream state 293 # There might be other cases not listed above. Those are not supported today hence the stream should not use this cursor for 294 # state management. For the specific user that was affected with this issue, we need to: 295 # * Fix state tracking (which is currently broken) 296 # * Make the new version available 297 # * (Probably) ask the user to reset the stream to avoid data loss 298 raise ValueError( 299 "Given that slice_boundary_fields is not defined and that per-partition state is not supported, only one slice is " 300 "expected. Please contact the Airbyte team." 301 ) 302 303 self._concurrent_state["slices"].append( 304 { 305 self._connector_state_converter.START_KEY: self.start, 306 self._connector_state_converter.END_KEY: most_recent_cursor_value, 307 self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value, 308 } 309 ) 310 311 def _emit_state_message(self) -> None: 312 self._connector_state_manager.update_state_for_stream( 313 self._stream_name, 314 self._stream_namespace, 315 self.state, 316 ) 317 state_message = self._connector_state_manager.create_state_message( 318 self._stream_name, self._stream_namespace 319 ) 320 self._message_repository.emit_message(state_message) 321 322 def _merge_partitions(self) -> None: 323 self._concurrent_state["slices"] = self._connector_state_converter.merge_intervals( 324 self._concurrent_state["slices"] 325 ) 326 327 def _extract_from_slice(self, partition: Partition, key: str) -> CursorValueType: 328 try: 329 _slice = partition.to_slice() 330 if not _slice: 331 raise KeyError(f"Could not find key `{key}` in empty slice") 332 return self._connector_state_converter.parse_value(_slice[key]) # type: ignore # we expect the devs to specify a key that would return a CursorValueType 333 except KeyError as exception: 334 raise KeyError( 335 f"Partition is expected to have key `{key}` but could not be found" 336 ) from exception 337 338 def ensure_at_least_one_state_emitted(self) -> None: 339 """ 340 The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be 341 called. 342 """ 343 self._emit_state_message() 344 345 def stream_slices(self) -> Iterable[StreamSlice]: 346 """ 347 Generating slices based on a few parameters: 348 * lookback_window: Buffer to remove from END_KEY of the highest slice 349 * slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created 350 * start: `_split_per_slice_range` will clip any value to `self._start which means that: 351 * if upper is less than self._start, no slices will be generated 352 * if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case) 353 354 Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be 355 inclusive in the API that is queried. 356 """ 357 self._merge_partitions() 358 359 if self._start is not None and self._is_start_before_first_slice(): 360 yield from self._split_per_slice_range( 361 self._start, 362 self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY], 363 False, 364 ) 365 366 if len(self._concurrent_state["slices"]) == 1: 367 yield from self._split_per_slice_range( 368 self._calculate_lower_boundary_of_last_slice( 369 self._concurrent_state["slices"][0][self._connector_state_converter.END_KEY] 370 ), 371 self._end_provider(), 372 True, 373 ) 374 elif len(self._concurrent_state["slices"]) > 1: 375 for i in range(len(self._concurrent_state["slices"]) - 1): 376 if self._cursor_granularity: 377 yield from self._split_per_slice_range( 378 self._concurrent_state["slices"][i][self._connector_state_converter.END_KEY] 379 + self._cursor_granularity, 380 self._concurrent_state["slices"][i + 1][ 381 self._connector_state_converter.START_KEY 382 ], 383 False, 384 ) 385 else: 386 yield from self._split_per_slice_range( 387 self._concurrent_state["slices"][i][ 388 self._connector_state_converter.END_KEY 389 ], 390 self._concurrent_state["slices"][i + 1][ 391 self._connector_state_converter.START_KEY 392 ], 393 False, 394 ) 395 yield from self._split_per_slice_range( 396 self._calculate_lower_boundary_of_last_slice( 397 self._concurrent_state["slices"][-1][self._connector_state_converter.END_KEY] 398 ), 399 self._end_provider(), 400 True, 401 ) 402 else: 403 raise ValueError("Expected at least one slice") 404 405 def _is_start_before_first_slice(self) -> bool: 406 return ( 407 self._start is not None 408 and self._start 409 < self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY] 410 ) 411 412 def _calculate_lower_boundary_of_last_slice( 413 self, lower_boundary: CursorValueType 414 ) -> CursorValueType: 415 if self._lookback_window: 416 return lower_boundary - self._lookback_window 417 return lower_boundary 418 419 def _split_per_slice_range( 420 self, lower: CursorValueType, upper: CursorValueType, upper_is_end: bool 421 ) -> Iterable[StreamSlice]: 422 if lower >= upper: 423 return 424 425 if self._start and upper < self._start: 426 return 427 428 lower = max(lower, self._start) if self._start else lower 429 if not self._slice_range or self._evaluate_upper_safely(lower, self._slice_range) >= upper: 430 clamped_lower = self._clamping_strategy.clamp(lower) 431 clamped_upper = self._clamping_strategy.clamp(upper) 432 start_value, end_value = ( 433 (clamped_lower, clamped_upper - self._cursor_granularity) 434 if self._cursor_granularity and not upper_is_end 435 else (clamped_lower, clamped_upper) 436 ) 437 yield StreamSlice( 438 partition={}, 439 cursor_slice={ 440 self._slice_boundary_fields_wrapper[ 441 self._START_BOUNDARY 442 ]: self._connector_state_converter.output_format(start_value), 443 self._slice_boundary_fields_wrapper[ 444 self._END_BOUNDARY 445 ]: self._connector_state_converter.output_format(end_value), 446 }, 447 ) 448 else: 449 stop_processing = False 450 current_lower_boundary = lower 451 while not stop_processing: 452 current_upper_boundary = min( 453 self._evaluate_upper_safely(current_lower_boundary, self._slice_range), upper 454 ) 455 has_reached_upper_boundary = current_upper_boundary >= upper 456 457 clamped_upper = ( 458 self._clamping_strategy.clamp(current_upper_boundary) 459 if current_upper_boundary != upper 460 else current_upper_boundary 461 ) 462 clamped_lower = self._clamping_strategy.clamp(current_lower_boundary) 463 if clamped_lower >= clamped_upper: 464 # clamping collapsed both values which means that it is time to stop processing 465 # FIXME should this be replace by proper end_provider 466 break 467 start_value, end_value = ( 468 (clamped_lower, clamped_upper - self._cursor_granularity) 469 if self._cursor_granularity 470 and (not upper_is_end or not has_reached_upper_boundary) 471 else (clamped_lower, clamped_upper) 472 ) 473 yield StreamSlice( 474 partition={}, 475 cursor_slice={ 476 self._slice_boundary_fields_wrapper[ 477 self._START_BOUNDARY 478 ]: self._connector_state_converter.output_format(start_value), 479 self._slice_boundary_fields_wrapper[ 480 self._END_BOUNDARY 481 ]: self._connector_state_converter.output_format(end_value), 482 }, 483 ) 484 current_lower_boundary = clamped_upper 485 if current_upper_boundary >= upper: 486 stop_processing = True 487 488 def _evaluate_upper_safely(self, lower: CursorValueType, step: GapType) -> CursorValueType: 489 """ 490 Given that we set the default step at datetime.timedelta.max, we will generate an OverflowError when evaluating the next start_date 491 This method assumes that users would never enter a step that would generate an overflow. Given that would be the case, the code 492 would have broken anyway. 493 """ 494 try: 495 return lower + step 496 except OverflowError: 497 return self._end_provider() 498 499 def should_be_synced(self, record: Record) -> bool: 500 """ 501 Determines if a record should be synced based on its cursor value. 502 :param record: The record to evaluate 503 504 :return: True if the record's cursor value falls within the sync boundaries 505 """ 506 try: 507 record_cursor_value: CursorValueType = self._extract_cursor_value(record) 508 except ValueError: 509 self._log_for_record_without_cursor_value() 510 return True 511 return self.start <= record_cursor_value <= self._end_provider() 512 513 def _log_for_record_without_cursor_value(self) -> None: 514 if not self._should_be_synced_logger_triggered: 515 LOGGER.warning( 516 f"Could not find cursor field `{self.cursor_field.cursor_field_key}` in record for stream {self._stream_name}. The incremental sync will assume it needs to be synced" 517 ) 518 self._should_be_synced_logger_triggered = True
41class CursorField: 42 def __init__(self, cursor_field_key: str) -> None: 43 self.cursor_field_key = cursor_field_key 44 45 def extract_value(self, record: Record) -> CursorValueType: 46 cursor_value = record.data.get(self.cursor_field_key) 47 if cursor_value is None: 48 raise ValueError(f"Could not find cursor field {self.cursor_field_key} in record") 49 return cursor_value # type: ignore # we assume that the value the path points at is a comparable
45 def extract_value(self, record: Record) -> CursorValueType: 46 cursor_value = record.data.get(self.cursor_field_key) 47 if cursor_value is None: 48 raise ValueError(f"Could not find cursor field {self.cursor_field_key} in record") 49 return cursor_value # type: ignore # we assume that the value the path points at is a comparable
52class Cursor(StreamSlicer, ABC): 53 @property 54 @abstractmethod 55 def state(self) -> MutableMapping[str, Any]: ... 56 57 @abstractmethod 58 def observe(self, record: Record) -> None: 59 """ 60 Indicate to the cursor that the record has been emitted 61 """ 62 raise NotImplementedError() 63 64 @abstractmethod 65 def close_partition(self, partition: Partition) -> None: 66 """ 67 Indicate to the cursor that the partition has been successfully processed 68 """ 69 raise NotImplementedError() 70 71 @abstractmethod 72 def ensure_at_least_one_state_emitted(self) -> None: 73 """ 74 State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per 75 stream. Hence, if no partitions are generated, this method needs to be called. 76 """ 77 raise NotImplementedError() 78 79 @abstractmethod 80 def should_be_synced(self, record: Record) -> bool: 81 pass 82 83 def stream_slices(self) -> Iterable[StreamSlice]: 84 """ 85 Default placeholder implementation of generate_slices. 86 Subclasses can override this method to provide actual behavior. 87 """ 88 yield StreamSlice(partition={}, cursor_slice={})
Slices the stream into chunks that can be fetched independently. Slices enable state checkpointing and data retrieval parallelization.
57 @abstractmethod 58 def observe(self, record: Record) -> None: 59 """ 60 Indicate to the cursor that the record has been emitted 61 """ 62 raise NotImplementedError()
Indicate to the cursor that the record has been emitted
64 @abstractmethod 65 def close_partition(self, partition: Partition) -> None: 66 """ 67 Indicate to the cursor that the partition has been successfully processed 68 """ 69 raise NotImplementedError()
Indicate to the cursor that the partition has been successfully processed
71 @abstractmethod 72 def ensure_at_least_one_state_emitted(self) -> None: 73 """ 74 State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per 75 stream. Hence, if no partitions are generated, this method needs to be called. 76 """ 77 raise NotImplementedError()
State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per stream. Hence, if no partitions are generated, this method needs to be called.
83 def stream_slices(self) -> Iterable[StreamSlice]: 84 """ 85 Default placeholder implementation of generate_slices. 86 Subclasses can override this method to provide actual behavior. 87 """ 88 yield StreamSlice(partition={}, cursor_slice={})
Default placeholder implementation of generate_slices. Subclasses can override this method to provide actual behavior.
91class FinalStateCursor(Cursor): 92 """Cursor that is used to guarantee at least one state message is emitted for a concurrent stream.""" 93 94 def __init__( 95 self, 96 stream_name: str, 97 stream_namespace: Optional[str], 98 message_repository: MessageRepository, 99 ) -> None: 100 self._stream_name = stream_name 101 self._stream_namespace = stream_namespace 102 self._message_repository = message_repository 103 # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel 104 # state message rather than manage overall source state. This is also only temporary as we move to the resumable 105 # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state. 106 self._connector_state_manager = ConnectorStateManager() 107 self._has_closed_at_least_one_slice = False 108 109 @property 110 def state(self) -> MutableMapping[str, Any]: 111 return {NO_CURSOR_STATE_KEY: True} 112 113 def observe(self, record: Record) -> None: 114 pass 115 116 def close_partition(self, partition: Partition) -> None: 117 pass 118 119 def ensure_at_least_one_state_emitted(self) -> None: 120 """ 121 Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync 122 """ 123 124 self._connector_state_manager.update_state_for_stream( 125 self._stream_name, self._stream_namespace, self.state 126 ) 127 state_message = self._connector_state_manager.create_state_message( 128 self._stream_name, self._stream_namespace 129 ) 130 self._message_repository.emit_message(state_message) 131 132 def should_be_synced(self, record: Record) -> bool: 133 return True
Cursor that is used to guarantee at least one state message is emitted for a concurrent stream.
94 def __init__( 95 self, 96 stream_name: str, 97 stream_namespace: Optional[str], 98 message_repository: MessageRepository, 99 ) -> None: 100 self._stream_name = stream_name 101 self._stream_namespace = stream_namespace 102 self._message_repository = message_repository 103 # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel 104 # state message rather than manage overall source state. This is also only temporary as we move to the resumable 105 # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state. 106 self._connector_state_manager = ConnectorStateManager() 107 self._has_closed_at_least_one_slice = False
Indicate to the cursor that the partition has been successfully processed
119 def ensure_at_least_one_state_emitted(self) -> None: 120 """ 121 Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync 122 """ 123 124 self._connector_state_manager.update_state_for_stream( 125 self._stream_name, self._stream_namespace, self.state 126 ) 127 state_message = self._connector_state_manager.create_state_message( 128 self._stream_name, self._stream_namespace 129 ) 130 self._message_repository.emit_message(state_message)
Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync
Inherited Members
136class ConcurrentCursor(Cursor): 137 _START_BOUNDARY = 0 138 _END_BOUNDARY = 1 139 140 def __init__( 141 self, 142 stream_name: str, 143 stream_namespace: Optional[str], 144 stream_state: Any, 145 message_repository: MessageRepository, 146 connector_state_manager: ConnectorStateManager, 147 connector_state_converter: AbstractStreamStateConverter, 148 cursor_field: CursorField, 149 slice_boundary_fields: Optional[Tuple[str, str]], 150 start: Optional[CursorValueType], 151 end_provider: Callable[[], CursorValueType], 152 lookback_window: Optional[GapType] = None, 153 slice_range: Optional[GapType] = None, 154 cursor_granularity: Optional[GapType] = None, 155 clamping_strategy: ClampingStrategy = NoClamping(), 156 ) -> None: 157 self._stream_name = stream_name 158 self._stream_namespace = stream_namespace 159 self._message_repository = message_repository 160 self._connector_state_converter = connector_state_converter 161 self._connector_state_manager = connector_state_manager 162 self._cursor_field = cursor_field 163 # To see some example where the slice boundaries might not be defined, check https://github.com/airbytehq/airbyte/blob/1ce84d6396e446e1ac2377362446e3fb94509461/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py#L363-L379 164 self._slice_boundary_fields = slice_boundary_fields 165 self._start = start 166 self._end_provider = end_provider 167 self.start, self._concurrent_state = self._get_concurrent_state(stream_state) 168 self._lookback_window = lookback_window 169 self._slice_range = slice_range 170 self._most_recent_cursor_value_per_partition: MutableMapping[ 171 Union[StreamSlice, Mapping[str, Any], None], Any 172 ] = {} 173 self._has_closed_at_least_one_slice = False 174 self._cursor_granularity = cursor_granularity 175 # Flag to track if the logger has been triggered (per stream) 176 self._should_be_synced_logger_triggered = False 177 self._clamping_strategy = clamping_strategy 178 179 # A lock is required when closing a partition because updating the cursor's concurrent_state is 180 # not thread safe. When multiple partitions are being closed by the cursor at the same time, it is 181 # possible for one partition to update concurrent_state after a second partition has already read 182 # the previous state. This can lead to the second partition overwriting the previous one's state. 183 self._lock = threading.Lock() 184 185 @property 186 def state(self) -> MutableMapping[str, Any]: 187 return self._connector_state_converter.convert_to_state_message( 188 self.cursor_field, self._concurrent_state 189 ) 190 191 @property 192 def cursor_field(self) -> CursorField: 193 return self._cursor_field 194 195 @property 196 def _slice_boundary_fields_wrapper(self) -> Tuple[str, str]: 197 return ( 198 self._slice_boundary_fields 199 if self._slice_boundary_fields 200 else ( 201 self._connector_state_converter.START_KEY, 202 self._connector_state_converter.END_KEY, 203 ) 204 ) 205 206 def _get_concurrent_state( 207 self, state: MutableMapping[str, Any] 208 ) -> Tuple[CursorValueType, MutableMapping[str, Any]]: 209 if self._connector_state_converter.is_state_message_compatible(state): 210 partitioned_state = self._connector_state_converter.deserialize(state) 211 slices_from_partitioned_state = partitioned_state.get("slices", []) 212 213 value_from_partitioned_state = None 214 if slices_from_partitioned_state: 215 # We assume here that the slices have been already merged 216 first_slice = slices_from_partitioned_state[0] 217 value_from_partitioned_state = ( 218 first_slice[self._connector_state_converter.MOST_RECENT_RECORD_KEY] 219 if self._connector_state_converter.MOST_RECENT_RECORD_KEY in first_slice 220 else first_slice[self._connector_state_converter.END_KEY] 221 ) 222 return ( 223 value_from_partitioned_state 224 or self._start 225 or self._connector_state_converter.zero_value, 226 partitioned_state, 227 ) 228 return self._connector_state_converter.convert_from_sequential_state( 229 self._cursor_field, state, self._start 230 ) 231 232 def observe(self, record: Record) -> None: 233 # Because observe writes to the most_recent_cursor_value_per_partition mapping, 234 # it is not thread-safe. However, this shouldn't lead to concurrency issues because 235 # observe() is only invoked by PartitionReader.process_partition(). Since the map is 236 # broken down according to partition, concurrent threads processing only read/write 237 # from different keys which avoids any conflicts. 238 # 239 # If we were to add thread safety, we should implement a lock per-partition 240 # which is instantiated during stream_slices() 241 most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get( 242 record.associated_slice 243 ) 244 try: 245 cursor_value = self._extract_cursor_value(record) 246 247 if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value: 248 self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value 249 except ValueError: 250 self._log_for_record_without_cursor_value() 251 252 def _extract_cursor_value(self, record: Record) -> Any: 253 return self._connector_state_converter.parse_value(self._cursor_field.extract_value(record)) 254 255 def close_partition(self, partition: Partition) -> None: 256 with self._lock: 257 slice_count_before = len(self._concurrent_state.get("slices", [])) 258 self._add_slice_to_state(partition) 259 if slice_count_before < len( 260 self._concurrent_state["slices"] 261 ): # only emit if at least one slice has been processed 262 self._merge_partitions() 263 self._emit_state_message() 264 self._has_closed_at_least_one_slice = True 265 266 def _add_slice_to_state(self, partition: Partition) -> None: 267 most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get( 268 partition.to_slice() 269 ) 270 271 if self._slice_boundary_fields: 272 if "slices" not in self._concurrent_state: 273 raise RuntimeError( 274 f"The state for stream {self._stream_name} should have at least one slice to delineate the sync start time, but no slices are present. This is unexpected. Please contact Support." 275 ) 276 self._concurrent_state["slices"].append( 277 { 278 self._connector_state_converter.START_KEY: self._extract_from_slice( 279 partition, self._slice_boundary_fields[self._START_BOUNDARY] 280 ), 281 self._connector_state_converter.END_KEY: self._extract_from_slice( 282 partition, self._slice_boundary_fields[self._END_BOUNDARY] 283 ), 284 self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value, 285 } 286 ) 287 elif most_recent_cursor_value: 288 if self._has_closed_at_least_one_slice: 289 # If we track state value using records cursor field, we can only do that if there is one partition. This is because we save 290 # the state every time we close a partition. We assume that if there are multiple slices, they need to be providing 291 # boundaries. There are cases where partitions could not have boundaries: 292 # * The cursor should be per-partition 293 # * The stream state is actually the parent stream state 294 # There might be other cases not listed above. Those are not supported today hence the stream should not use this cursor for 295 # state management. For the specific user that was affected with this issue, we need to: 296 # * Fix state tracking (which is currently broken) 297 # * Make the new version available 298 # * (Probably) ask the user to reset the stream to avoid data loss 299 raise ValueError( 300 "Given that slice_boundary_fields is not defined and that per-partition state is not supported, only one slice is " 301 "expected. Please contact the Airbyte team." 302 ) 303 304 self._concurrent_state["slices"].append( 305 { 306 self._connector_state_converter.START_KEY: self.start, 307 self._connector_state_converter.END_KEY: most_recent_cursor_value, 308 self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value, 309 } 310 ) 311 312 def _emit_state_message(self) -> None: 313 self._connector_state_manager.update_state_for_stream( 314 self._stream_name, 315 self._stream_namespace, 316 self.state, 317 ) 318 state_message = self._connector_state_manager.create_state_message( 319 self._stream_name, self._stream_namespace 320 ) 321 self._message_repository.emit_message(state_message) 322 323 def _merge_partitions(self) -> None: 324 self._concurrent_state["slices"] = self._connector_state_converter.merge_intervals( 325 self._concurrent_state["slices"] 326 ) 327 328 def _extract_from_slice(self, partition: Partition, key: str) -> CursorValueType: 329 try: 330 _slice = partition.to_slice() 331 if not _slice: 332 raise KeyError(f"Could not find key `{key}` in empty slice") 333 return self._connector_state_converter.parse_value(_slice[key]) # type: ignore # we expect the devs to specify a key that would return a CursorValueType 334 except KeyError as exception: 335 raise KeyError( 336 f"Partition is expected to have key `{key}` but could not be found" 337 ) from exception 338 339 def ensure_at_least_one_state_emitted(self) -> None: 340 """ 341 The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be 342 called. 343 """ 344 self._emit_state_message() 345 346 def stream_slices(self) -> Iterable[StreamSlice]: 347 """ 348 Generating slices based on a few parameters: 349 * lookback_window: Buffer to remove from END_KEY of the highest slice 350 * slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created 351 * start: `_split_per_slice_range` will clip any value to `self._start which means that: 352 * if upper is less than self._start, no slices will be generated 353 * if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case) 354 355 Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be 356 inclusive in the API that is queried. 357 """ 358 self._merge_partitions() 359 360 if self._start is not None and self._is_start_before_first_slice(): 361 yield from self._split_per_slice_range( 362 self._start, 363 self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY], 364 False, 365 ) 366 367 if len(self._concurrent_state["slices"]) == 1: 368 yield from self._split_per_slice_range( 369 self._calculate_lower_boundary_of_last_slice( 370 self._concurrent_state["slices"][0][self._connector_state_converter.END_KEY] 371 ), 372 self._end_provider(), 373 True, 374 ) 375 elif len(self._concurrent_state["slices"]) > 1: 376 for i in range(len(self._concurrent_state["slices"]) - 1): 377 if self._cursor_granularity: 378 yield from self._split_per_slice_range( 379 self._concurrent_state["slices"][i][self._connector_state_converter.END_KEY] 380 + self._cursor_granularity, 381 self._concurrent_state["slices"][i + 1][ 382 self._connector_state_converter.START_KEY 383 ], 384 False, 385 ) 386 else: 387 yield from self._split_per_slice_range( 388 self._concurrent_state["slices"][i][ 389 self._connector_state_converter.END_KEY 390 ], 391 self._concurrent_state["slices"][i + 1][ 392 self._connector_state_converter.START_KEY 393 ], 394 False, 395 ) 396 yield from self._split_per_slice_range( 397 self._calculate_lower_boundary_of_last_slice( 398 self._concurrent_state["slices"][-1][self._connector_state_converter.END_KEY] 399 ), 400 self._end_provider(), 401 True, 402 ) 403 else: 404 raise ValueError("Expected at least one slice") 405 406 def _is_start_before_first_slice(self) -> bool: 407 return ( 408 self._start is not None 409 and self._start 410 < self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY] 411 ) 412 413 def _calculate_lower_boundary_of_last_slice( 414 self, lower_boundary: CursorValueType 415 ) -> CursorValueType: 416 if self._lookback_window: 417 return lower_boundary - self._lookback_window 418 return lower_boundary 419 420 def _split_per_slice_range( 421 self, lower: CursorValueType, upper: CursorValueType, upper_is_end: bool 422 ) -> Iterable[StreamSlice]: 423 if lower >= upper: 424 return 425 426 if self._start and upper < self._start: 427 return 428 429 lower = max(lower, self._start) if self._start else lower 430 if not self._slice_range or self._evaluate_upper_safely(lower, self._slice_range) >= upper: 431 clamped_lower = self._clamping_strategy.clamp(lower) 432 clamped_upper = self._clamping_strategy.clamp(upper) 433 start_value, end_value = ( 434 (clamped_lower, clamped_upper - self._cursor_granularity) 435 if self._cursor_granularity and not upper_is_end 436 else (clamped_lower, clamped_upper) 437 ) 438 yield StreamSlice( 439 partition={}, 440 cursor_slice={ 441 self._slice_boundary_fields_wrapper[ 442 self._START_BOUNDARY 443 ]: self._connector_state_converter.output_format(start_value), 444 self._slice_boundary_fields_wrapper[ 445 self._END_BOUNDARY 446 ]: self._connector_state_converter.output_format(end_value), 447 }, 448 ) 449 else: 450 stop_processing = False 451 current_lower_boundary = lower 452 while not stop_processing: 453 current_upper_boundary = min( 454 self._evaluate_upper_safely(current_lower_boundary, self._slice_range), upper 455 ) 456 has_reached_upper_boundary = current_upper_boundary >= upper 457 458 clamped_upper = ( 459 self._clamping_strategy.clamp(current_upper_boundary) 460 if current_upper_boundary != upper 461 else current_upper_boundary 462 ) 463 clamped_lower = self._clamping_strategy.clamp(current_lower_boundary) 464 if clamped_lower >= clamped_upper: 465 # clamping collapsed both values which means that it is time to stop processing 466 # FIXME should this be replace by proper end_provider 467 break 468 start_value, end_value = ( 469 (clamped_lower, clamped_upper - self._cursor_granularity) 470 if self._cursor_granularity 471 and (not upper_is_end or not has_reached_upper_boundary) 472 else (clamped_lower, clamped_upper) 473 ) 474 yield StreamSlice( 475 partition={}, 476 cursor_slice={ 477 self._slice_boundary_fields_wrapper[ 478 self._START_BOUNDARY 479 ]: self._connector_state_converter.output_format(start_value), 480 self._slice_boundary_fields_wrapper[ 481 self._END_BOUNDARY 482 ]: self._connector_state_converter.output_format(end_value), 483 }, 484 ) 485 current_lower_boundary = clamped_upper 486 if current_upper_boundary >= upper: 487 stop_processing = True 488 489 def _evaluate_upper_safely(self, lower: CursorValueType, step: GapType) -> CursorValueType: 490 """ 491 Given that we set the default step at datetime.timedelta.max, we will generate an OverflowError when evaluating the next start_date 492 This method assumes that users would never enter a step that would generate an overflow. Given that would be the case, the code 493 would have broken anyway. 494 """ 495 try: 496 return lower + step 497 except OverflowError: 498 return self._end_provider() 499 500 def should_be_synced(self, record: Record) -> bool: 501 """ 502 Determines if a record should be synced based on its cursor value. 503 :param record: The record to evaluate 504 505 :return: True if the record's cursor value falls within the sync boundaries 506 """ 507 try: 508 record_cursor_value: CursorValueType = self._extract_cursor_value(record) 509 except ValueError: 510 self._log_for_record_without_cursor_value() 511 return True 512 return self.start <= record_cursor_value <= self._end_provider() 513 514 def _log_for_record_without_cursor_value(self) -> None: 515 if not self._should_be_synced_logger_triggered: 516 LOGGER.warning( 517 f"Could not find cursor field `{self.cursor_field.cursor_field_key}` in record for stream {self._stream_name}. The incremental sync will assume it needs to be synced" 518 ) 519 self._should_be_synced_logger_triggered = True
Slices the stream into chunks that can be fetched independently. Slices enable state checkpointing and data retrieval parallelization.
140 def __init__( 141 self, 142 stream_name: str, 143 stream_namespace: Optional[str], 144 stream_state: Any, 145 message_repository: MessageRepository, 146 connector_state_manager: ConnectorStateManager, 147 connector_state_converter: AbstractStreamStateConverter, 148 cursor_field: CursorField, 149 slice_boundary_fields: Optional[Tuple[str, str]], 150 start: Optional[CursorValueType], 151 end_provider: Callable[[], CursorValueType], 152 lookback_window: Optional[GapType] = None, 153 slice_range: Optional[GapType] = None, 154 cursor_granularity: Optional[GapType] = None, 155 clamping_strategy: ClampingStrategy = NoClamping(), 156 ) -> None: 157 self._stream_name = stream_name 158 self._stream_namespace = stream_namespace 159 self._message_repository = message_repository 160 self._connector_state_converter = connector_state_converter 161 self._connector_state_manager = connector_state_manager 162 self._cursor_field = cursor_field 163 # To see some example where the slice boundaries might not be defined, check https://github.com/airbytehq/airbyte/blob/1ce84d6396e446e1ac2377362446e3fb94509461/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py#L363-L379 164 self._slice_boundary_fields = slice_boundary_fields 165 self._start = start 166 self._end_provider = end_provider 167 self.start, self._concurrent_state = self._get_concurrent_state(stream_state) 168 self._lookback_window = lookback_window 169 self._slice_range = slice_range 170 self._most_recent_cursor_value_per_partition: MutableMapping[ 171 Union[StreamSlice, Mapping[str, Any], None], Any 172 ] = {} 173 self._has_closed_at_least_one_slice = False 174 self._cursor_granularity = cursor_granularity 175 # Flag to track if the logger has been triggered (per stream) 176 self._should_be_synced_logger_triggered = False 177 self._clamping_strategy = clamping_strategy 178 179 # A lock is required when closing a partition because updating the cursor's concurrent_state is 180 # not thread safe. When multiple partitions are being closed by the cursor at the same time, it is 181 # possible for one partition to update concurrent_state after a second partition has already read 182 # the previous state. This can lead to the second partition overwriting the previous one's state. 183 self._lock = threading.Lock()
232 def observe(self, record: Record) -> None: 233 # Because observe writes to the most_recent_cursor_value_per_partition mapping, 234 # it is not thread-safe. However, this shouldn't lead to concurrency issues because 235 # observe() is only invoked by PartitionReader.process_partition(). Since the map is 236 # broken down according to partition, concurrent threads processing only read/write 237 # from different keys which avoids any conflicts. 238 # 239 # If we were to add thread safety, we should implement a lock per-partition 240 # which is instantiated during stream_slices() 241 most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get( 242 record.associated_slice 243 ) 244 try: 245 cursor_value = self._extract_cursor_value(record) 246 247 if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value: 248 self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value 249 except ValueError: 250 self._log_for_record_without_cursor_value()
Indicate to the cursor that the record has been emitted
255 def close_partition(self, partition: Partition) -> None: 256 with self._lock: 257 slice_count_before = len(self._concurrent_state.get("slices", [])) 258 self._add_slice_to_state(partition) 259 if slice_count_before < len( 260 self._concurrent_state["slices"] 261 ): # only emit if at least one slice has been processed 262 self._merge_partitions() 263 self._emit_state_message() 264 self._has_closed_at_least_one_slice = True
Indicate to the cursor that the partition has been successfully processed
339 def ensure_at_least_one_state_emitted(self) -> None: 340 """ 341 The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be 342 called. 343 """ 344 self._emit_state_message()
The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be called.
346 def stream_slices(self) -> Iterable[StreamSlice]: 347 """ 348 Generating slices based on a few parameters: 349 * lookback_window: Buffer to remove from END_KEY of the highest slice 350 * slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created 351 * start: `_split_per_slice_range` will clip any value to `self._start which means that: 352 * if upper is less than self._start, no slices will be generated 353 * if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case) 354 355 Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be 356 inclusive in the API that is queried. 357 """ 358 self._merge_partitions() 359 360 if self._start is not None and self._is_start_before_first_slice(): 361 yield from self._split_per_slice_range( 362 self._start, 363 self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY], 364 False, 365 ) 366 367 if len(self._concurrent_state["slices"]) == 1: 368 yield from self._split_per_slice_range( 369 self._calculate_lower_boundary_of_last_slice( 370 self._concurrent_state["slices"][0][self._connector_state_converter.END_KEY] 371 ), 372 self._end_provider(), 373 True, 374 ) 375 elif len(self._concurrent_state["slices"]) > 1: 376 for i in range(len(self._concurrent_state["slices"]) - 1): 377 if self._cursor_granularity: 378 yield from self._split_per_slice_range( 379 self._concurrent_state["slices"][i][self._connector_state_converter.END_KEY] 380 + self._cursor_granularity, 381 self._concurrent_state["slices"][i + 1][ 382 self._connector_state_converter.START_KEY 383 ], 384 False, 385 ) 386 else: 387 yield from self._split_per_slice_range( 388 self._concurrent_state["slices"][i][ 389 self._connector_state_converter.END_KEY 390 ], 391 self._concurrent_state["slices"][i + 1][ 392 self._connector_state_converter.START_KEY 393 ], 394 False, 395 ) 396 yield from self._split_per_slice_range( 397 self._calculate_lower_boundary_of_last_slice( 398 self._concurrent_state["slices"][-1][self._connector_state_converter.END_KEY] 399 ), 400 self._end_provider(), 401 True, 402 ) 403 else: 404 raise ValueError("Expected at least one slice")
Generating slices based on a few parameters:
- lookback_window: Buffer to remove from END_KEY of the highest slice
- slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created
- start:
_split_per_slice_range
will clip any value to `self._start which means that:- if upper is less than self._start, no slices will be generated
- if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case)
Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be inclusive in the API that is queried.
500 def should_be_synced(self, record: Record) -> bool: 501 """ 502 Determines if a record should be synced based on its cursor value. 503 :param record: The record to evaluate 504 505 :return: True if the record's cursor value falls within the sync boundaries 506 """ 507 try: 508 record_cursor_value: CursorValueType = self._extract_cursor_value(record) 509 except ValueError: 510 self._log_for_record_without_cursor_value() 511 return True 512 return self.start <= record_cursor_value <= self._end_provider()
Determines if a record should be synced based on its cursor value.
Parameters
- record: The record to evaluate
Returns
True if the record's cursor value falls within the sync boundaries