airbyte_cdk.sources.streams
95@deprecated( 96 "Deprecated as of CDK version 0.87.0. " 97 "Deprecated in favor of the `CheckpointMixin` which offers similar functionality." 98) 99class IncrementalMixin(CheckpointMixin, ABC): 100 """Mixin to make stream incremental. 101 102 class IncrementalStream(Stream, IncrementalMixin): 103 @property 104 def state(self): 105 return self._state 106 107 @state.setter 108 def state(self, value): 109 self._state[self.cursor_field] = value[self.cursor_field] 110 """
Mixin to make stream incremental.
class IncrementalStream(Stream, IncrementalMixin): @property def state(self): return self._state
@state.setter
def state(self, value):
self._state[self.cursor_field] = value[self.cursor_field]
Inherited Members
61class CheckpointMixin(ABC): 62 """Mixin for a stream that implements reading and writing the internal state used to checkpoint sync progress to the platform 63 64 class CheckpointedStream(Stream, CheckpointMixin): 65 @property 66 def state(self): 67 return self._state 68 69 @state.setter 70 def state(self, value): 71 self._state[self.cursor_field] = value[self.cursor_field] 72 """ 73 74 @property 75 @abstractmethod 76 def state(self) -> MutableMapping[str, Any]: 77 """State getter, should return state in form that can serialized to a string and send to the output 78 as a STATE AirbyteMessage. 79 80 A good example of a state is a cursor_value: 81 { 82 self.cursor_field: "cursor_value" 83 } 84 85 State should try to be as small as possible but at the same time descriptive enough to restore 86 syncing process from the point where it stopped. 87 """ 88 89 @state.setter 90 @abstractmethod 91 def state(self, value: MutableMapping[str, Any]) -> None: 92 """State setter, accept state serialized by state getter."""
Mixin for a stream that implements reading and writing the internal state used to checkpoint sync progress to the platform
class CheckpointedStream(Stream, CheckpointMixin): @property def state(self): return self._state
@state.setter
def state(self, value):
self._state[self.cursor_field] = value[self.cursor_field]
74 @property 75 @abstractmethod 76 def state(self) -> MutableMapping[str, Any]: 77 """State getter, should return state in form that can serialized to a string and send to the output 78 as a STATE AirbyteMessage. 79 80 A good example of a state is a cursor_value: 81 { 82 self.cursor_field: "cursor_value" 83 } 84 85 State should try to be as small as possible but at the same time descriptive enough to restore 86 syncing process from the point where it stopped. 87 """
State getter, should return state in form that can serialized to a string and send to the output as a STATE AirbyteMessage.
A good example of a state is a cursor_value: { self.cursor_field: "cursor_value" }
State should try to be as small as possible but at the same time descriptive enough to restore syncing process from the point where it stopped.
119class Stream(ABC): 120 """ 121 Base abstract class for an Airbyte Stream. Makes no assumption of the Stream's underlying transport protocol. 122 """ 123 124 _configured_json_schema: Optional[Dict[str, Any]] = None 125 _exit_on_rate_limit: bool = False 126 127 # Use self.logger in subclasses to log any messages 128 @property 129 def logger(self) -> logging.Logger: 130 return logging.getLogger(f"airbyte.streams.{self.name}") 131 132 # TypeTransformer object to perform output data transformation 133 transformer: TypeTransformer = TypeTransformer(TransformConfig.NoTransform) 134 135 cursor: Optional[Cursor] = None 136 137 has_multiple_slices = False 138 139 @cached_property 140 def name(self) -> str: 141 """ 142 :return: Stream name. By default this is the implementing class name, but it can be overridden as needed. 143 """ 144 return casing.camel_to_snake(self.__class__.__name__) 145 146 def get_error_display_message(self, exception: BaseException) -> Optional[str]: 147 """ 148 Retrieves the user-friendly display message that corresponds to an exception. 149 This will be called when encountering an exception while reading records from the stream, and used to build the AirbyteTraceMessage. 150 151 The default implementation of this method does not return user-friendly messages for any exception type, but it should be overriden as needed. 152 153 :param exception: The exception that was raised 154 :return: A user-friendly message that indicates the cause of the error 155 """ 156 return None 157 158 def read( # type: ignore # ignoring typing for ConnectorStateManager because of circular dependencies 159 self, 160 configured_stream: ConfiguredAirbyteStream, 161 logger: logging.Logger, 162 slice_logger: SliceLogger, 163 stream_state: MutableMapping[str, Any], 164 state_manager, 165 internal_config: InternalConfig, 166 ) -> Iterable[StreamData]: 167 sync_mode = configured_stream.sync_mode 168 cursor_field = configured_stream.cursor_field 169 self.configured_json_schema = configured_stream.stream.json_schema 170 171 # WARNING: When performing a read() that uses incoming stream state, we MUST use the self.state that is defined as 172 # opposed to the incoming stream_state value. Because some connectors like ones using the file-based CDK modify 173 # state before setting the value on the Stream attribute, the most up-to-date state is derived from Stream.state 174 # instead of the stream_state parameter. This does not apply to legacy connectors using get_updated_state(). 175 try: 176 stream_state = self.state # type: ignore # we know the field might not exist... 177 except AttributeError: 178 pass 179 180 should_checkpoint = bool(state_manager) 181 checkpoint_reader = self._get_checkpoint_reader( 182 logger=logger, cursor_field=cursor_field, sync_mode=sync_mode, stream_state=stream_state 183 ) 184 185 next_slice = checkpoint_reader.next() 186 record_counter = 0 187 stream_state_tracker = copy.deepcopy(stream_state) 188 while next_slice is not None: 189 if slice_logger.should_log_slice_message(logger): 190 yield slice_logger.create_slice_log_message(next_slice) 191 records = self.read_records( 192 sync_mode=sync_mode, # todo: change this interface to no longer rely on sync_mode for behavior 193 stream_slice=next_slice, 194 stream_state=stream_state, 195 cursor_field=cursor_field or None, 196 ) 197 for record_data_or_message in records: 198 yield record_data_or_message 199 if isinstance(record_data_or_message, Mapping) or ( 200 hasattr(record_data_or_message, "type") 201 and record_data_or_message.type == MessageType.RECORD 202 ): 203 record_data = ( 204 record_data_or_message 205 if isinstance(record_data_or_message, Mapping) 206 else record_data_or_message.record 207 ) 208 209 # Thanks I hate it. RFR fundamentally doesn't fit with the concept of the legacy Stream.get_updated_state() 210 # method because RFR streams rely on pagination as a cursor. Stream.get_updated_state() was designed to make 211 # the CDK manage state using specifically the last seen record. don't @ brian.lai 212 # 213 # Also, because the legacy incremental state case decouples observing incoming records from emitting state, it 214 # requires that we separate CheckpointReader.observe() and CheckpointReader.get_checkpoint() which could 215 # otherwise be combined. 216 if self.cursor_field: 217 # Some connectors have streams that implement get_updated_state(), but do not define a cursor_field. This 218 # should be fixed on the stream implementation, but we should also protect against this in the CDK as well 219 stream_state_tracker = self.get_updated_state( 220 stream_state_tracker, 221 record_data, # type: ignore [arg-type] 222 ) 223 self._observe_state(checkpoint_reader, stream_state_tracker) 224 record_counter += 1 225 226 checkpoint_interval = self.state_checkpoint_interval 227 if ( 228 should_checkpoint 229 and checkpoint_interval 230 and record_counter % checkpoint_interval == 0 231 ): 232 checkpoint = checkpoint_reader.get_checkpoint() 233 if checkpoint: 234 airbyte_state_message = self._checkpoint_state( 235 checkpoint, state_manager=state_manager 236 ) 237 yield airbyte_state_message 238 239 if internal_config.is_limit_reached(record_counter): 240 break 241 self._observe_state(checkpoint_reader) 242 checkpoint_state = checkpoint_reader.get_checkpoint() 243 if should_checkpoint and checkpoint_state is not None: 244 airbyte_state_message = self._checkpoint_state( 245 checkpoint_state, state_manager=state_manager 246 ) 247 yield airbyte_state_message 248 249 next_slice = checkpoint_reader.next() 250 251 checkpoint = checkpoint_reader.get_checkpoint() 252 if should_checkpoint and checkpoint is not None: 253 airbyte_state_message = self._checkpoint_state(checkpoint, state_manager=state_manager) 254 yield airbyte_state_message 255 256 def read_only_records(self, state: Optional[Mapping[str, Any]] = None) -> Iterable[StreamData]: 257 """ 258 Helper method that performs a read on a stream with an optional state and emits records. If the parent stream supports 259 incremental, this operation does not update the stream's internal state (if it uses the modern state setter/getter) 260 or emit state messages. 261 """ 262 263 configured_stream = ConfiguredAirbyteStream( 264 stream=AirbyteStream( 265 name=self.name, 266 json_schema={}, 267 supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental], 268 ), 269 sync_mode=SyncMode.incremental if state else SyncMode.full_refresh, 270 destination_sync_mode=DestinationSyncMode.append, 271 ) 272 273 yield from self.read( 274 configured_stream=configured_stream, 275 logger=self.logger, 276 slice_logger=DebugSliceLogger(), 277 stream_state=dict(state) 278 if state 279 else {}, # read() expects MutableMapping instead of Mapping which is used more often 280 state_manager=None, 281 internal_config=InternalConfig(), # type: ignore [call-arg] 282 ) 283 284 @abstractmethod 285 def read_records( 286 self, 287 sync_mode: SyncMode, 288 cursor_field: Optional[List[str]] = None, 289 stream_slice: Optional[Mapping[str, Any]] = None, 290 stream_state: Optional[Mapping[str, Any]] = None, 291 ) -> Iterable[StreamData]: 292 """ 293 This method should be overridden by subclasses to read records based on the inputs 294 """ 295 296 @lru_cache(maxsize=None) 297 def get_json_schema(self) -> Mapping[str, Any]: 298 """ 299 :return: A dict of the JSON schema representing this stream. 300 301 The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property. 302 Override as needed. 303 """ 304 # TODO show an example of using pydantic to define the JSON schema, or reading an OpenAPI spec 305 return ResourceSchemaLoader(package_name_from_class(self.__class__)).get_schema(self.name) 306 307 def as_airbyte_stream(self) -> AirbyteStream: 308 stream = AirbyteStream( 309 name=self.name, 310 json_schema=dict(self.get_json_schema()), 311 supported_sync_modes=[SyncMode.full_refresh], 312 is_resumable=self.is_resumable, 313 ) 314 315 if self.namespace: 316 stream.namespace = self.namespace 317 318 # If we can offer incremental we always should. RFR is always less reliable than incremental which uses a real cursor value 319 if self.supports_incremental: 320 stream.source_defined_cursor = self.source_defined_cursor 321 stream.supported_sync_modes.append(SyncMode.incremental) 322 stream.default_cursor_field = self._wrapped_cursor_field() 323 324 keys = Stream._wrapped_primary_key(self.primary_key) 325 if keys and len(keys) > 0: 326 stream.source_defined_primary_key = keys 327 328 return stream 329 330 @property 331 def supports_incremental(self) -> bool: 332 """ 333 :return: True if this stream supports incrementally reading data 334 """ 335 return len(self._wrapped_cursor_field()) > 0 336 337 @property 338 def is_resumable(self) -> bool: 339 """ 340 :return: True if this stream allows the checkpointing of sync progress and can resume from it on subsequent attempts. 341 This differs from supports_incremental because certain kinds of streams like those supporting resumable full refresh 342 can checkpoint progress in between attempts for improved fault tolerance. However, they will start from the beginning 343 on the next sync job. 344 """ 345 if self.supports_incremental: 346 return True 347 if self.has_multiple_slices: 348 # We temporarily gate substream to not support RFR because puts a pretty high burden on connector developers 349 # to structure stream state in a very specific way. We also can't check for issubclass(HttpSubStream) because 350 # not all substreams implement the interface and it would be a circular dependency so we use parent as a surrogate 351 return False 352 elif hasattr(type(self), "state") and getattr(type(self), "state").fset is not None: 353 # Modern case where a stream manages state using getter/setter 354 return True 355 else: 356 # Legacy case where the CDK manages state via the get_updated_state() method. This is determined by checking if 357 # the stream's get_updated_state() differs from the Stream class and therefore has been overridden 358 return type(self).get_updated_state != Stream.get_updated_state 359 360 def _wrapped_cursor_field(self) -> List[str]: 361 return [self.cursor_field] if isinstance(self.cursor_field, str) else self.cursor_field 362 363 @property 364 def cursor_field(self) -> Union[str, List[str]]: 365 """ 366 Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field. 367 :return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor. 368 """ 369 return [] 370 371 @property 372 def namespace(self) -> Optional[str]: 373 """ 374 Override to return the namespace of this stream, e.g. the Postgres schema which this stream will emit records for. 375 :return: A string containing the name of the namespace. 376 """ 377 return None 378 379 @property 380 def source_defined_cursor(self) -> bool: 381 """ 382 Return False if the cursor can be configured by the user. 383 """ 384 return True 385 386 @property 387 def exit_on_rate_limit(self) -> bool: 388 """Exit on rate limit getter, should return bool value. False if the stream will retry endlessly when rate limited.""" 389 return self._exit_on_rate_limit 390 391 @exit_on_rate_limit.setter 392 def exit_on_rate_limit(self, value: bool) -> None: 393 """Exit on rate limit setter, accept bool value.""" 394 self._exit_on_rate_limit = value 395 396 @property 397 @abstractmethod 398 def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: 399 """ 400 :return: string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields. 401 If the stream has no primary keys, return None. 402 """ 403 404 def stream_slices( 405 self, 406 *, 407 sync_mode: SyncMode, 408 cursor_field: Optional[List[str]] = None, 409 stream_state: Optional[Mapping[str, Any]] = None, 410 ) -> Iterable[Optional[Mapping[str, Any]]]: 411 """ 412 Override to define the slices for this stream. See the stream slicing section of the docs for more information. 413 414 :param sync_mode: 415 :param cursor_field: 416 :param stream_state: 417 :return: 418 """ 419 yield StreamSlice(partition={}, cursor_slice={}) 420 421 @property 422 def state_checkpoint_interval(self) -> Optional[int]: 423 """ 424 Decides how often to checkpoint state (i.e: emit a STATE message). E.g: if this returns a value of 100, then state is persisted after reading 425 100 records, then 200, 300, etc.. A good default value is 1000 although your mileage may vary depending on the underlying data source. 426 427 Checkpointing a stream avoids re-reading records in the case a sync is failed or cancelled. 428 429 return None if state should not be checkpointed e.g: because records returned from the underlying data source are not returned in 430 ascending order with respect to the cursor field. This can happen if the source does not support reading records in ascending order of 431 created_at date (or whatever the cursor is). In those cases, state must only be saved once the full stream has been read. 432 """ 433 return None 434 435 # Commented-out to avoid any runtime penalty, since this is used in a hot per-record codepath. 436 # To be evaluated for re-introduction here: https://github.com/airbytehq/airbyte-python-cdk/issues/116 437 # @deprecated( 438 # "Deprecated method `get_updated_state` as of CDK version 0.1.49. " 439 # "Please use explicit state property instead, see `IncrementalMixin` docs." 440 # ) 441 def get_updated_state( 442 self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any] 443 ) -> MutableMapping[str, Any]: 444 """DEPRECATED. Please use explicit state property instead, see `IncrementalMixin` docs. 445 446 Override to extract state from the latest record. Needed to implement incremental sync. 447 448 Inspects the latest record extracted from the data source and the current state object and return an updated state object. 449 450 For example: if the state object is based on created_at timestamp, and the current state is {'created_at': 10}, and the latest_record is 451 {'name': 'octavia', 'created_at': 20 } then this method would return {'created_at': 20} to indicate state should be updated to this object. 452 453 :param current_stream_state: The stream's current state object 454 :param latest_record: The latest record extracted from the stream 455 :return: An updated state object 456 """ 457 return {} 458 459 def get_cursor(self) -> Optional[Cursor]: 460 """ 461 A Cursor is an interface that a stream can implement to manage how its internal state is read and updated while 462 reading records. Historically, Python connectors had no concept of a cursor to manage state. Python streams need 463 to define a cursor implementation and override this method to manage state through a Cursor. 464 """ 465 return self.cursor 466 467 def _get_checkpoint_reader( 468 self, 469 logger: logging.Logger, 470 cursor_field: Optional[List[str]], 471 sync_mode: SyncMode, 472 stream_state: MutableMapping[str, Any], 473 ) -> CheckpointReader: 474 mappings_or_slices = self.stream_slices( 475 cursor_field=cursor_field, 476 sync_mode=sync_mode, # todo: change this interface to no longer rely on sync_mode for behavior 477 stream_state=stream_state, 478 ) 479 480 # Because of poor foresight, we wrote the default Stream.stream_slices() method to return [None] which is confusing and 481 # has now normalized this behavior for connector developers. Now some connectors return [None]. This is objectively 482 # misleading and a more ideal interface is [{}] to indicate we still want to iterate over one slice, but with no 483 # specific slice values. None is bad, and now I feel bad that I have to write this hack. 484 if mappings_or_slices == [None]: 485 mappings_or_slices = [{}] 486 487 slices_iterable_copy, iterable_for_detecting_format = itertools.tee(mappings_or_slices, 2) 488 stream_classification = self._classify_stream( 489 mappings_or_slices=iterable_for_detecting_format 490 ) 491 492 # Streams that override has_multiple_slices are explicitly indicating that they will iterate over 493 # multiple partitions. Inspecting slices to automatically apply the correct cursor is only needed as 494 # a backup. So if this value was already assigned to True by the stream, we don't need to reassign it 495 self.has_multiple_slices = ( 496 self.has_multiple_slices or stream_classification.has_multiple_slices 497 ) 498 499 cursor = self.get_cursor() 500 if cursor: 501 cursor.set_initial_state(stream_state=stream_state) 502 503 checkpoint_mode = self._checkpoint_mode 504 505 if cursor and stream_classification.is_legacy_format: 506 return LegacyCursorBasedCheckpointReader( 507 stream_slices=slices_iterable_copy, cursor=cursor, read_state_from_cursor=True 508 ) 509 elif cursor: 510 return CursorBasedCheckpointReader( 511 stream_slices=slices_iterable_copy, 512 cursor=cursor, 513 read_state_from_cursor=checkpoint_mode == CheckpointMode.RESUMABLE_FULL_REFRESH, 514 ) 515 elif checkpoint_mode == CheckpointMode.RESUMABLE_FULL_REFRESH: 516 # Resumable full refresh readers rely on the stream state dynamically being updated during pagination and does 517 # not iterate over a static set of slices. 518 return ResumableFullRefreshCheckpointReader(stream_state=stream_state) 519 elif checkpoint_mode == CheckpointMode.INCREMENTAL: 520 return IncrementalCheckpointReader( 521 stream_slices=slices_iterable_copy, stream_state=stream_state 522 ) 523 else: 524 return FullRefreshCheckpointReader(stream_slices=slices_iterable_copy) 525 526 @property 527 def _checkpoint_mode(self) -> CheckpointMode: 528 if self.is_resumable and len(self._wrapped_cursor_field()) > 0: 529 return CheckpointMode.INCREMENTAL 530 elif self.is_resumable: 531 return CheckpointMode.RESUMABLE_FULL_REFRESH 532 else: 533 return CheckpointMode.FULL_REFRESH 534 535 @staticmethod 536 def _classify_stream( 537 mappings_or_slices: Iterator[Optional[Union[Mapping[str, Any], StreamSlice]]], 538 ) -> StreamClassification: 539 """ 540 This is a bit of a crazy solution, but also the only way we can detect certain attributes about the stream since Python 541 streams do not follow consistent implementation patterns. We care about the following two attributes: 542 - is_substream: Helps to incrementally release changes since substreams w/ parents are much more complicated. Also 543 helps de-risk the release of changes that might impact all connectors 544 - uses_legacy_slice_format: Since the checkpoint reader must manage a complex state object, we opted to have it always 545 use the structured StreamSlice object. However, this requires backwards compatibility with Python sources that only 546 support the legacy mapping object 547 548 Both attributes can eventually be deprecated once stream's define this method deleted once substreams have been implemented and 549 legacy connectors all adhere to the StreamSlice object. 550 """ 551 if not mappings_or_slices: 552 raise ValueError("A stream should always have at least one slice") 553 try: 554 next_slice = next(mappings_or_slices) 555 if isinstance(next_slice, StreamSlice) and next_slice == StreamSlice( 556 partition={}, cursor_slice={} 557 ): 558 is_legacy_format = False 559 slice_has_value = False 560 elif next_slice == {}: 561 is_legacy_format = True 562 slice_has_value = False 563 elif isinstance(next_slice, StreamSlice): 564 is_legacy_format = False 565 slice_has_value = True 566 else: 567 is_legacy_format = True 568 slice_has_value = True 569 except StopIteration: 570 # If the stream has no slices, the format ultimately does not matter since no data will get synced. This is technically 571 # a valid case because it is up to the stream to define its slicing behavior 572 return StreamClassification(is_legacy_format=False, has_multiple_slices=False) 573 574 if slice_has_value: 575 # If the first slice contained a partition value from the result of stream_slices(), this is a substream that might 576 # have multiple parent records to iterate over 577 return StreamClassification( 578 is_legacy_format=is_legacy_format, has_multiple_slices=slice_has_value 579 ) 580 581 try: 582 # If stream_slices() returns multiple slices, this is also a substream that can potentially generate empty slices 583 next(mappings_or_slices) 584 return StreamClassification(is_legacy_format=is_legacy_format, has_multiple_slices=True) 585 except StopIteration: 586 # If the result of stream_slices() only returns a single empty stream slice, then we know this is a regular stream 587 return StreamClassification( 588 is_legacy_format=is_legacy_format, has_multiple_slices=False 589 ) 590 591 def log_stream_sync_configuration(self) -> None: 592 """ 593 Logs the configuration of this stream. 594 """ 595 self.logger.debug( 596 f"Syncing stream instance: {self.name}", 597 extra={ 598 "primary_key": self.primary_key, 599 "cursor_field": self.cursor_field, 600 }, 601 ) 602 603 @staticmethod 604 def _wrapped_primary_key( 605 keys: Optional[Union[str, List[str], List[List[str]]]], 606 ) -> Optional[List[List[str]]]: 607 """ 608 :return: wrap the primary_key property in a list of list of strings required by the Airbyte Stream object. 609 """ 610 if not keys: 611 return None 612 613 if isinstance(keys, str): 614 return [[keys]] 615 elif isinstance(keys, list): 616 wrapped_keys = [] 617 for component in keys: 618 if isinstance(component, str): 619 wrapped_keys.append([component]) 620 elif isinstance(component, list): 621 wrapped_keys.append(component) 622 else: 623 raise ValueError(f"Element must be either list or str. Got: {type(component)}") 624 return wrapped_keys 625 else: 626 raise ValueError(f"Element must be either list or str. Got: {type(keys)}") 627 628 def _observe_state( 629 self, checkpoint_reader: CheckpointReader, stream_state: Optional[Mapping[str, Any]] = None 630 ) -> None: 631 """ 632 Convenience method that attempts to read the Stream's state using the recommended way of connector's managing their 633 own state via state setter/getter. But if we get back an AttributeError, then the legacy Stream.get_updated_state() 634 method is used as a fallback method. 635 """ 636 637 # This is an inversion of the original logic that used to try state getter/setters first. As part of the work to 638 # automatically apply resumable full refresh to all streams, all HttpStream classes implement default state 639 # getter/setter methods, we should default to only using the incoming stream_state parameter value is {} which 640 # indicates the stream does not override the default get_updated_state() implementation. When the default method 641 # is not overridden, then the stream defers to self.state getter 642 if stream_state: 643 checkpoint_reader.observe(stream_state) 644 elif type(self).get_updated_state == Stream.get_updated_state: 645 # We only default to the state getter/setter if the stream does not use the legacy get_updated_state() method 646 try: 647 new_state = self.state # type: ignore # This will always exist on HttpStreams, but may not for Stream 648 if new_state: 649 checkpoint_reader.observe(new_state) 650 except AttributeError: 651 pass 652 653 def _checkpoint_state( # type: ignore # ignoring typing for ConnectorStateManager because of circular dependencies 654 self, 655 stream_state: Mapping[str, Any], 656 state_manager, 657 ) -> AirbyteMessage: 658 # todo: This can be consolidated into one ConnectorStateManager.update_and_create_state_message() method, but I want 659 # to reduce changes right now and this would span concurrent as well 660 state_manager.update_state_for_stream(self.name, self.namespace, stream_state) 661 return state_manager.create_state_message(self.name, self.namespace) # type: ignore [no-any-return] 662 663 @property 664 def configured_json_schema(self) -> Optional[Dict[str, Any]]: 665 """ 666 This property is set from the read method. 667 668 :return Optional[Dict]: JSON schema from configured catalog if provided, otherwise None. 669 """ 670 return self._configured_json_schema 671 672 @configured_json_schema.setter 673 def configured_json_schema(self, json_schema: Dict[str, Any]) -> None: 674 self._configured_json_schema = self._filter_schema_invalid_properties(json_schema) 675 676 def _filter_schema_invalid_properties( 677 self, configured_catalog_json_schema: Dict[str, Any] 678 ) -> Dict[str, Any]: 679 """ 680 Filters the properties in json_schema that are not present in the stream schema. 681 Configured Schemas can have very old fields, so we need to housekeeping ourselves. 682 """ 683 configured_schema: Any = configured_catalog_json_schema.get("properties", {}) 684 stream_schema_properties: Any = self.get_json_schema().get("properties", {}) 685 686 configured_keys = configured_schema.keys() 687 stream_keys = stream_schema_properties.keys() 688 invalid_properties = configured_keys - stream_keys 689 if not invalid_properties: 690 return configured_catalog_json_schema 691 692 self.logger.warning( 693 f"Stream {self.name}: the following fields are deprecated and cannot be synced. {invalid_properties}. Refresh the connection's source schema to resolve this warning." 694 ) 695 696 valid_configured_schema_properties_keys = stream_keys & configured_keys 697 valid_configured_schema_properties = {} 698 699 for configured_schema_property in valid_configured_schema_properties_keys: 700 valid_configured_schema_properties[configured_schema_property] = ( 701 stream_schema_properties[configured_schema_property] 702 ) 703 704 return {**configured_catalog_json_schema, "properties": valid_configured_schema_properties}
Base abstract class for an Airbyte Stream. Makes no assumption of the Stream's underlying transport protocol.
139 @cached_property 140 def name(self) -> str: 141 """ 142 :return: Stream name. By default this is the implementing class name, but it can be overridden as needed. 143 """ 144 return casing.camel_to_snake(self.__class__.__name__)
Returns
Stream name. By default this is the implementing class name, but it can be overridden as needed.
146 def get_error_display_message(self, exception: BaseException) -> Optional[str]: 147 """ 148 Retrieves the user-friendly display message that corresponds to an exception. 149 This will be called when encountering an exception while reading records from the stream, and used to build the AirbyteTraceMessage. 150 151 The default implementation of this method does not return user-friendly messages for any exception type, but it should be overriden as needed. 152 153 :param exception: The exception that was raised 154 :return: A user-friendly message that indicates the cause of the error 155 """ 156 return None
Retrieves the user-friendly display message that corresponds to an exception. This will be called when encountering an exception while reading records from the stream, and used to build the AirbyteTraceMessage.
The default implementation of this method does not return user-friendly messages for any exception type, but it should be overriden as needed.
Parameters
- exception: The exception that was raised
Returns
A user-friendly message that indicates the cause of the error
158 def read( # type: ignore # ignoring typing for ConnectorStateManager because of circular dependencies 159 self, 160 configured_stream: ConfiguredAirbyteStream, 161 logger: logging.Logger, 162 slice_logger: SliceLogger, 163 stream_state: MutableMapping[str, Any], 164 state_manager, 165 internal_config: InternalConfig, 166 ) -> Iterable[StreamData]: 167 sync_mode = configured_stream.sync_mode 168 cursor_field = configured_stream.cursor_field 169 self.configured_json_schema = configured_stream.stream.json_schema 170 171 # WARNING: When performing a read() that uses incoming stream state, we MUST use the self.state that is defined as 172 # opposed to the incoming stream_state value. Because some connectors like ones using the file-based CDK modify 173 # state before setting the value on the Stream attribute, the most up-to-date state is derived from Stream.state 174 # instead of the stream_state parameter. This does not apply to legacy connectors using get_updated_state(). 175 try: 176 stream_state = self.state # type: ignore # we know the field might not exist... 177 except AttributeError: 178 pass 179 180 should_checkpoint = bool(state_manager) 181 checkpoint_reader = self._get_checkpoint_reader( 182 logger=logger, cursor_field=cursor_field, sync_mode=sync_mode, stream_state=stream_state 183 ) 184 185 next_slice = checkpoint_reader.next() 186 record_counter = 0 187 stream_state_tracker = copy.deepcopy(stream_state) 188 while next_slice is not None: 189 if slice_logger.should_log_slice_message(logger): 190 yield slice_logger.create_slice_log_message(next_slice) 191 records = self.read_records( 192 sync_mode=sync_mode, # todo: change this interface to no longer rely on sync_mode for behavior 193 stream_slice=next_slice, 194 stream_state=stream_state, 195 cursor_field=cursor_field or None, 196 ) 197 for record_data_or_message in records: 198 yield record_data_or_message 199 if isinstance(record_data_or_message, Mapping) or ( 200 hasattr(record_data_or_message, "type") 201 and record_data_or_message.type == MessageType.RECORD 202 ): 203 record_data = ( 204 record_data_or_message 205 if isinstance(record_data_or_message, Mapping) 206 else record_data_or_message.record 207 ) 208 209 # Thanks I hate it. RFR fundamentally doesn't fit with the concept of the legacy Stream.get_updated_state() 210 # method because RFR streams rely on pagination as a cursor. Stream.get_updated_state() was designed to make 211 # the CDK manage state using specifically the last seen record. don't @ brian.lai 212 # 213 # Also, because the legacy incremental state case decouples observing incoming records from emitting state, it 214 # requires that we separate CheckpointReader.observe() and CheckpointReader.get_checkpoint() which could 215 # otherwise be combined. 216 if self.cursor_field: 217 # Some connectors have streams that implement get_updated_state(), but do not define a cursor_field. This 218 # should be fixed on the stream implementation, but we should also protect against this in the CDK as well 219 stream_state_tracker = self.get_updated_state( 220 stream_state_tracker, 221 record_data, # type: ignore [arg-type] 222 ) 223 self._observe_state(checkpoint_reader, stream_state_tracker) 224 record_counter += 1 225 226 checkpoint_interval = self.state_checkpoint_interval 227 if ( 228 should_checkpoint 229 and checkpoint_interval 230 and record_counter % checkpoint_interval == 0 231 ): 232 checkpoint = checkpoint_reader.get_checkpoint() 233 if checkpoint: 234 airbyte_state_message = self._checkpoint_state( 235 checkpoint, state_manager=state_manager 236 ) 237 yield airbyte_state_message 238 239 if internal_config.is_limit_reached(record_counter): 240 break 241 self._observe_state(checkpoint_reader) 242 checkpoint_state = checkpoint_reader.get_checkpoint() 243 if should_checkpoint and checkpoint_state is not None: 244 airbyte_state_message = self._checkpoint_state( 245 checkpoint_state, state_manager=state_manager 246 ) 247 yield airbyte_state_message 248 249 next_slice = checkpoint_reader.next() 250 251 checkpoint = checkpoint_reader.get_checkpoint() 252 if should_checkpoint and checkpoint is not None: 253 airbyte_state_message = self._checkpoint_state(checkpoint, state_manager=state_manager) 254 yield airbyte_state_message
256 def read_only_records(self, state: Optional[Mapping[str, Any]] = None) -> Iterable[StreamData]: 257 """ 258 Helper method that performs a read on a stream with an optional state and emits records. If the parent stream supports 259 incremental, this operation does not update the stream's internal state (if it uses the modern state setter/getter) 260 or emit state messages. 261 """ 262 263 configured_stream = ConfiguredAirbyteStream( 264 stream=AirbyteStream( 265 name=self.name, 266 json_schema={}, 267 supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental], 268 ), 269 sync_mode=SyncMode.incremental if state else SyncMode.full_refresh, 270 destination_sync_mode=DestinationSyncMode.append, 271 ) 272 273 yield from self.read( 274 configured_stream=configured_stream, 275 logger=self.logger, 276 slice_logger=DebugSliceLogger(), 277 stream_state=dict(state) 278 if state 279 else {}, # read() expects MutableMapping instead of Mapping which is used more often 280 state_manager=None, 281 internal_config=InternalConfig(), # type: ignore [call-arg] 282 )
Helper method that performs a read on a stream with an optional state and emits records. If the parent stream supports incremental, this operation does not update the stream's internal state (if it uses the modern state setter/getter) or emit state messages.
284 @abstractmethod 285 def read_records( 286 self, 287 sync_mode: SyncMode, 288 cursor_field: Optional[List[str]] = None, 289 stream_slice: Optional[Mapping[str, Any]] = None, 290 stream_state: Optional[Mapping[str, Any]] = None, 291 ) -> Iterable[StreamData]: 292 """ 293 This method should be overridden by subclasses to read records based on the inputs 294 """
This method should be overridden by subclasses to read records based on the inputs
296 @lru_cache(maxsize=None) 297 def get_json_schema(self) -> Mapping[str, Any]: 298 """ 299 :return: A dict of the JSON schema representing this stream. 300 301 The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property. 302 Override as needed. 303 """ 304 # TODO show an example of using pydantic to define the JSON schema, or reading an OpenAPI spec 305 return ResourceSchemaLoader(package_name_from_class(self.__class__)).get_schema(self.name)
Returns
A dict of the JSON schema representing this stream.
The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property. Override as needed.
307 def as_airbyte_stream(self) -> AirbyteStream: 308 stream = AirbyteStream( 309 name=self.name, 310 json_schema=dict(self.get_json_schema()), 311 supported_sync_modes=[SyncMode.full_refresh], 312 is_resumable=self.is_resumable, 313 ) 314 315 if self.namespace: 316 stream.namespace = self.namespace 317 318 # If we can offer incremental we always should. RFR is always less reliable than incremental which uses a real cursor value 319 if self.supports_incremental: 320 stream.source_defined_cursor = self.source_defined_cursor 321 stream.supported_sync_modes.append(SyncMode.incremental) 322 stream.default_cursor_field = self._wrapped_cursor_field() 323 324 keys = Stream._wrapped_primary_key(self.primary_key) 325 if keys and len(keys) > 0: 326 stream.source_defined_primary_key = keys 327 328 return stream
330 @property 331 def supports_incremental(self) -> bool: 332 """ 333 :return: True if this stream supports incrementally reading data 334 """ 335 return len(self._wrapped_cursor_field()) > 0
Returns
True if this stream supports incrementally reading data
337 @property 338 def is_resumable(self) -> bool: 339 """ 340 :return: True if this stream allows the checkpointing of sync progress and can resume from it on subsequent attempts. 341 This differs from supports_incremental because certain kinds of streams like those supporting resumable full refresh 342 can checkpoint progress in between attempts for improved fault tolerance. However, they will start from the beginning 343 on the next sync job. 344 """ 345 if self.supports_incremental: 346 return True 347 if self.has_multiple_slices: 348 # We temporarily gate substream to not support RFR because puts a pretty high burden on connector developers 349 # to structure stream state in a very specific way. We also can't check for issubclass(HttpSubStream) because 350 # not all substreams implement the interface and it would be a circular dependency so we use parent as a surrogate 351 return False 352 elif hasattr(type(self), "state") and getattr(type(self), "state").fset is not None: 353 # Modern case where a stream manages state using getter/setter 354 return True 355 else: 356 # Legacy case where the CDK manages state via the get_updated_state() method. This is determined by checking if 357 # the stream's get_updated_state() differs from the Stream class and therefore has been overridden 358 return type(self).get_updated_state != Stream.get_updated_state
Returns
True if this stream allows the checkpointing of sync progress and can resume from it on subsequent attempts. This differs from supports_incremental because certain kinds of streams like those supporting resumable full refresh can checkpoint progress in between attempts for improved fault tolerance. However, they will start from the beginning on the next sync job.
363 @property 364 def cursor_field(self) -> Union[str, List[str]]: 365 """ 366 Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field. 367 :return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor. 368 """ 369 return []
Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
Returns
The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.
371 @property 372 def namespace(self) -> Optional[str]: 373 """ 374 Override to return the namespace of this stream, e.g. the Postgres schema which this stream will emit records for. 375 :return: A string containing the name of the namespace. 376 """ 377 return None
Override to return the namespace of this stream, e.g. the Postgres schema which this stream will emit records for.
Returns
A string containing the name of the namespace.
379 @property 380 def source_defined_cursor(self) -> bool: 381 """ 382 Return False if the cursor can be configured by the user. 383 """ 384 return True
Return False if the cursor can be configured by the user.
386 @property 387 def exit_on_rate_limit(self) -> bool: 388 """Exit on rate limit getter, should return bool value. False if the stream will retry endlessly when rate limited.""" 389 return self._exit_on_rate_limit
Exit on rate limit getter, should return bool value. False if the stream will retry endlessly when rate limited.
396 @property 397 @abstractmethod 398 def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: 399 """ 400 :return: string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields. 401 If the stream has no primary keys, return None. 402 """
Returns
string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields. If the stream has no primary keys, return None.
404 def stream_slices( 405 self, 406 *, 407 sync_mode: SyncMode, 408 cursor_field: Optional[List[str]] = None, 409 stream_state: Optional[Mapping[str, Any]] = None, 410 ) -> Iterable[Optional[Mapping[str, Any]]]: 411 """ 412 Override to define the slices for this stream. See the stream slicing section of the docs for more information. 413 414 :param sync_mode: 415 :param cursor_field: 416 :param stream_state: 417 :return: 418 """ 419 yield StreamSlice(partition={}, cursor_slice={})
Override to define the slices for this stream. See the stream slicing section of the docs for more information.
Parameters
- sync_mode:
- cursor_field:
- stream_state:
Returns
421 @property 422 def state_checkpoint_interval(self) -> Optional[int]: 423 """ 424 Decides how often to checkpoint state (i.e: emit a STATE message). E.g: if this returns a value of 100, then state is persisted after reading 425 100 records, then 200, 300, etc.. A good default value is 1000 although your mileage may vary depending on the underlying data source. 426 427 Checkpointing a stream avoids re-reading records in the case a sync is failed or cancelled. 428 429 return None if state should not be checkpointed e.g: because records returned from the underlying data source are not returned in 430 ascending order with respect to the cursor field. This can happen if the source does not support reading records in ascending order of 431 created_at date (or whatever the cursor is). In those cases, state must only be saved once the full stream has been read. 432 """ 433 return None
Decides how often to checkpoint state (i.e: emit a STATE message). E.g: if this returns a value of 100, then state is persisted after reading 100 records, then 200, 300, etc.. A good default value is 1000 although your mileage may vary depending on the underlying data source.
Checkpointing a stream avoids re-reading records in the case a sync is failed or cancelled.
return None if state should not be checkpointed e.g: because records returned from the underlying data source are not returned in ascending order with respect to the cursor field. This can happen if the source does not support reading records in ascending order of created_at date (or whatever the cursor is). In those cases, state must only be saved once the full stream has been read.
441 def get_updated_state( 442 self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any] 443 ) -> MutableMapping[str, Any]: 444 """DEPRECATED. Please use explicit state property instead, see `IncrementalMixin` docs. 445 446 Override to extract state from the latest record. Needed to implement incremental sync. 447 448 Inspects the latest record extracted from the data source and the current state object and return an updated state object. 449 450 For example: if the state object is based on created_at timestamp, and the current state is {'created_at': 10}, and the latest_record is 451 {'name': 'octavia', 'created_at': 20 } then this method would return {'created_at': 20} to indicate state should be updated to this object. 452 453 :param current_stream_state: The stream's current state object 454 :param latest_record: The latest record extracted from the stream 455 :return: An updated state object 456 """ 457 return {}
DEPRECATED. Please use explicit state property instead, see IncrementalMixin
docs.
Override to extract state from the latest record. Needed to implement incremental sync.
Inspects the latest record extracted from the data source and the current state object and return an updated state object.
For example: if the state object is based on created_at timestamp, and the current state is {'created_at': 10}, and the latest_record is {'name': 'octavia', 'created_at': 20 } then this method would return {'created_at': 20} to indicate state should be updated to this object.
Parameters
- current_stream_state: The stream's current state object
- latest_record: The latest record extracted from the stream
Returns
An updated state object
459 def get_cursor(self) -> Optional[Cursor]: 460 """ 461 A Cursor is an interface that a stream can implement to manage how its internal state is read and updated while 462 reading records. Historically, Python connectors had no concept of a cursor to manage state. Python streams need 463 to define a cursor implementation and override this method to manage state through a Cursor. 464 """ 465 return self.cursor
A Cursor is an interface that a stream can implement to manage how its internal state is read and updated while reading records. Historically, Python connectors had no concept of a cursor to manage state. Python streams need to define a cursor implementation and override this method to manage state through a Cursor.
591 def log_stream_sync_configuration(self) -> None: 592 """ 593 Logs the configuration of this stream. 594 """ 595 self.logger.debug( 596 f"Syncing stream instance: {self.name}", 597 extra={ 598 "primary_key": self.primary_key, 599 "cursor_field": self.cursor_field, 600 }, 601 )
Logs the configuration of this stream.
663 @property 664 def configured_json_schema(self) -> Optional[Dict[str, Any]]: 665 """ 666 This property is set from the read method. 667 668 :return Optional[Dict]: JSON schema from configured catalog if provided, otherwise None. 669 """ 670 return self._configured_json_schema
This property is set from the read method.
Returns
JSON schema from configured catalog if provided, otherwise None.