airbyte_cdk.sources.streams.core
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4import copy 5import inspect 6import itertools 7import logging 8from abc import ABC, abstractmethod 9from dataclasses import dataclass 10from functools import cached_property, lru_cache 11from typing import Any, Dict, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Union 12 13from typing_extensions import deprecated 14 15import airbyte_cdk.sources.utils.casing as casing 16from airbyte_cdk.models import ( 17 AirbyteMessage, 18 AirbyteStream, 19 ConfiguredAirbyteStream, 20 DestinationSyncMode, 21 SyncMode, 22) 23from airbyte_cdk.models import Type as MessageType 24from airbyte_cdk.sources.streams.checkpoint import ( 25 CheckpointMode, 26 CheckpointReader, 27 Cursor, 28 CursorBasedCheckpointReader, 29 FullRefreshCheckpointReader, 30 IncrementalCheckpointReader, 31 LegacyCursorBasedCheckpointReader, 32 ResumableFullRefreshCheckpointReader, 33) 34from airbyte_cdk.sources.types import StreamSlice 35 36# list of all possible HTTP methods which can be used for sending of request bodies 37from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, ResourceSchemaLoader 38from airbyte_cdk.sources.utils.slice_logger import DebugSliceLogger, SliceLogger 39from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer 40 41# A stream's read method can return one of the following types: 42# Mapping[str, Any]: The content of an AirbyteRecordMessage 43# AirbyteMessage: An AirbyteMessage. Could be of any type 44StreamData = Union[Mapping[str, Any], AirbyteMessage] 45 46JsonSchema = Mapping[str, Any] 47 48NO_CURSOR_STATE_KEY = "__ab_no_cursor_state_message" 49 50 51def package_name_from_class(cls: object) -> str: 52 """Find the package name given a class name""" 53 module = inspect.getmodule(cls) 54 if module is not None: 55 return module.__name__.split(".")[0] 56 else: 57 raise ValueError(f"Could not find package name for class {cls}") 58 59 60class CheckpointMixin(ABC): 61 """Mixin for a stream that implements reading and writing the internal state used to checkpoint sync progress to the platform 62 63 class CheckpointedStream(Stream, CheckpointMixin): 64 @property 65 def state(self): 66 return self._state 67 68 @state.setter 69 def state(self, value): 70 self._state[self.cursor_field] = value[self.cursor_field] 71 """ 72 73 @property 74 @abstractmethod 75 def state(self) -> MutableMapping[str, Any]: 76 """State getter, should return state in form that can serialized to a string and send to the output 77 as a STATE AirbyteMessage. 78 79 A good example of a state is a cursor_value: 80 { 81 self.cursor_field: "cursor_value" 82 } 83 84 State should try to be as small as possible but at the same time descriptive enough to restore 85 syncing process from the point where it stopped. 86 """ 87 88 @state.setter 89 @abstractmethod 90 def state(self, value: MutableMapping[str, Any]) -> None: 91 """State setter, accept state serialized by state getter.""" 92 93 94@deprecated( 95 "Deprecated as of CDK version 0.87.0. " 96 "Deprecated in favor of the `CheckpointMixin` which offers similar functionality." 97) 98class IncrementalMixin(CheckpointMixin, ABC): 99 """Mixin to make stream incremental. 100 101 class IncrementalStream(Stream, IncrementalMixin): 102 @property 103 def state(self): 104 return self._state 105 106 @state.setter 107 def state(self, value): 108 self._state[self.cursor_field] = value[self.cursor_field] 109 """ 110 111 112@dataclass 113class StreamClassification: 114 is_legacy_format: bool 115 has_multiple_slices: bool 116 117 118class Stream(ABC): 119 """ 120 Base abstract class for an Airbyte Stream. Makes no assumption of the Stream's underlying transport protocol. 121 """ 122 123 _configured_json_schema: Optional[Dict[str, Any]] = None 124 _exit_on_rate_limit: bool = False 125 126 # Use self.logger in subclasses to log any messages 127 @property 128 def logger(self) -> logging.Logger: 129 return logging.getLogger(f"airbyte.streams.{self.name}") 130 131 # TypeTransformer object to perform output data transformation 132 transformer: TypeTransformer = TypeTransformer(TransformConfig.NoTransform) 133 134 cursor: Optional[Cursor] = None 135 136 has_multiple_slices = False 137 138 @cached_property 139 def name(self) -> str: 140 """ 141 :return: Stream name. By default this is the implementing class name, but it can be overridden as needed. 142 """ 143 return casing.camel_to_snake(self.__class__.__name__) 144 145 def get_error_display_message(self, exception: BaseException) -> Optional[str]: 146 """ 147 Retrieves the user-friendly display message that corresponds to an exception. 148 This will be called when encountering an exception while reading records from the stream, and used to build the AirbyteTraceMessage. 149 150 The default implementation of this method does not return user-friendly messages for any exception type, but it should be overriden as needed. 151 152 :param exception: The exception that was raised 153 :return: A user-friendly message that indicates the cause of the error 154 """ 155 return None 156 157 def read( # type: ignore # ignoring typing for ConnectorStateManager because of circular dependencies 158 self, 159 configured_stream: ConfiguredAirbyteStream, 160 logger: logging.Logger, 161 slice_logger: SliceLogger, 162 stream_state: MutableMapping[str, Any], 163 state_manager, 164 internal_config: InternalConfig, 165 ) -> Iterable[StreamData]: 166 sync_mode = configured_stream.sync_mode 167 cursor_field = configured_stream.cursor_field 168 self.configured_json_schema = configured_stream.stream.json_schema 169 170 # WARNING: When performing a read() that uses incoming stream state, we MUST use the self.state that is defined as 171 # opposed to the incoming stream_state value. Because some connectors like ones using the file-based CDK modify 172 # state before setting the value on the Stream attribute, the most up-to-date state is derived from Stream.state 173 # instead of the stream_state parameter. This does not apply to legacy connectors using get_updated_state(). 174 try: 175 stream_state = self.state # type: ignore # we know the field might not exist... 176 except AttributeError: 177 pass 178 179 should_checkpoint = bool(state_manager) 180 checkpoint_reader = self._get_checkpoint_reader( 181 logger=logger, cursor_field=cursor_field, sync_mode=sync_mode, stream_state=stream_state 182 ) 183 184 next_slice = checkpoint_reader.next() 185 record_counter = 0 186 stream_state_tracker = copy.deepcopy(stream_state) 187 while next_slice is not None: 188 if slice_logger.should_log_slice_message(logger): 189 yield slice_logger.create_slice_log_message(next_slice) 190 records = self.read_records( 191 sync_mode=sync_mode, # todo: change this interface to no longer rely on sync_mode for behavior 192 stream_slice=next_slice, 193 stream_state=stream_state, 194 cursor_field=cursor_field or None, 195 ) 196 for record_data_or_message in records: 197 yield record_data_or_message 198 if isinstance(record_data_or_message, Mapping) or ( 199 hasattr(record_data_or_message, "type") 200 and record_data_or_message.type == MessageType.RECORD 201 ): 202 record_data = ( 203 record_data_or_message 204 if isinstance(record_data_or_message, Mapping) 205 else record_data_or_message.record 206 ) 207 208 # Thanks I hate it. RFR fundamentally doesn't fit with the concept of the legacy Stream.get_updated_state() 209 # method because RFR streams rely on pagination as a cursor. Stream.get_updated_state() was designed to make 210 # the CDK manage state using specifically the last seen record. don't @ brian.lai 211 # 212 # Also, because the legacy incremental state case decouples observing incoming records from emitting state, it 213 # requires that we separate CheckpointReader.observe() and CheckpointReader.get_checkpoint() which could 214 # otherwise be combined. 215 if self.cursor_field: 216 # Some connectors have streams that implement get_updated_state(), but do not define a cursor_field. This 217 # should be fixed on the stream implementation, but we should also protect against this in the CDK as well 218 stream_state_tracker = self.get_updated_state( 219 stream_state_tracker, 220 record_data, # type: ignore [arg-type] 221 ) 222 self._observe_state(checkpoint_reader, stream_state_tracker) 223 record_counter += 1 224 225 checkpoint_interval = self.state_checkpoint_interval 226 if ( 227 should_checkpoint 228 and checkpoint_interval 229 and record_counter % checkpoint_interval == 0 230 ): 231 checkpoint = checkpoint_reader.get_checkpoint() 232 if checkpoint: 233 airbyte_state_message = self._checkpoint_state( 234 checkpoint, state_manager=state_manager 235 ) 236 yield airbyte_state_message 237 238 if internal_config.is_limit_reached(record_counter): 239 break 240 self._observe_state(checkpoint_reader) 241 checkpoint_state = checkpoint_reader.get_checkpoint() 242 if should_checkpoint and checkpoint_state is not None: 243 airbyte_state_message = self._checkpoint_state( 244 checkpoint_state, state_manager=state_manager 245 ) 246 yield airbyte_state_message 247 248 next_slice = checkpoint_reader.next() 249 250 checkpoint = checkpoint_reader.get_checkpoint() 251 if should_checkpoint and checkpoint is not None: 252 airbyte_state_message = self._checkpoint_state(checkpoint, state_manager=state_manager) 253 yield airbyte_state_message 254 255 def read_only_records(self, state: Optional[Mapping[str, Any]] = None) -> Iterable[StreamData]: 256 """ 257 Helper method that performs a read on a stream with an optional state and emits records. If the parent stream supports 258 incremental, this operation does not update the stream's internal state (if it uses the modern state setter/getter) 259 or emit state messages. 260 """ 261 262 configured_stream = ConfiguredAirbyteStream( 263 stream=AirbyteStream( 264 name=self.name, 265 json_schema={}, 266 supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental], 267 ), 268 sync_mode=SyncMode.incremental if state else SyncMode.full_refresh, 269 destination_sync_mode=DestinationSyncMode.append, 270 ) 271 272 yield from self.read( 273 configured_stream=configured_stream, 274 logger=self.logger, 275 slice_logger=DebugSliceLogger(), 276 stream_state=dict(state) 277 if state 278 else {}, # read() expects MutableMapping instead of Mapping which is used more often 279 state_manager=None, 280 internal_config=InternalConfig(), # type: ignore [call-arg] 281 ) 282 283 @abstractmethod 284 def read_records( 285 self, 286 sync_mode: SyncMode, 287 cursor_field: Optional[List[str]] = None, 288 stream_slice: Optional[Mapping[str, Any]] = None, 289 stream_state: Optional[Mapping[str, Any]] = None, 290 ) -> Iterable[StreamData]: 291 """ 292 This method should be overridden by subclasses to read records based on the inputs 293 """ 294 295 @lru_cache(maxsize=None) 296 def get_json_schema(self) -> Mapping[str, Any]: 297 """ 298 :return: A dict of the JSON schema representing this stream. 299 300 The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property. 301 Override as needed. 302 """ 303 # TODO show an example of using pydantic to define the JSON schema, or reading an OpenAPI spec 304 return ResourceSchemaLoader(package_name_from_class(self.__class__)).get_schema(self.name) 305 306 def as_airbyte_stream(self) -> AirbyteStream: 307 stream = AirbyteStream( 308 name=self.name, 309 json_schema=dict(self.get_json_schema()), 310 supported_sync_modes=[SyncMode.full_refresh], 311 is_resumable=self.is_resumable, 312 ) 313 314 if self.namespace: 315 stream.namespace = self.namespace 316 317 # If we can offer incremental we always should. RFR is always less reliable than incremental which uses a real cursor value 318 if self.supports_incremental: 319 stream.source_defined_cursor = self.source_defined_cursor 320 stream.supported_sync_modes.append(SyncMode.incremental) 321 stream.default_cursor_field = self._wrapped_cursor_field() 322 323 keys = Stream._wrapped_primary_key(self.primary_key) 324 if keys and len(keys) > 0: 325 stream.source_defined_primary_key = keys 326 327 return stream 328 329 @property 330 def supports_incremental(self) -> bool: 331 """ 332 :return: True if this stream supports incrementally reading data 333 """ 334 return len(self._wrapped_cursor_field()) > 0 335 336 @property 337 def is_resumable(self) -> bool: 338 """ 339 :return: True if this stream allows the checkpointing of sync progress and can resume from it on subsequent attempts. 340 This differs from supports_incremental because certain kinds of streams like those supporting resumable full refresh 341 can checkpoint progress in between attempts for improved fault tolerance. However, they will start from the beginning 342 on the next sync job. 343 """ 344 if self.supports_incremental: 345 return True 346 if self.has_multiple_slices: 347 # We temporarily gate substream to not support RFR because puts a pretty high burden on connector developers 348 # to structure stream state in a very specific way. We also can't check for issubclass(HttpSubStream) because 349 # not all substreams implement the interface and it would be a circular dependency so we use parent as a surrogate 350 return False 351 elif hasattr(type(self), "state") and getattr(type(self), "state").fset is not None: 352 # Modern case where a stream manages state using getter/setter 353 return True 354 else: 355 # Legacy case where the CDK manages state via the get_updated_state() method. This is determined by checking if 356 # the stream's get_updated_state() differs from the Stream class and therefore has been overridden 357 return type(self).get_updated_state != Stream.get_updated_state 358 359 def _wrapped_cursor_field(self) -> List[str]: 360 return [self.cursor_field] if isinstance(self.cursor_field, str) else self.cursor_field 361 362 @property 363 def cursor_field(self) -> Union[str, List[str]]: 364 """ 365 Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field. 366 :return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor. 367 """ 368 return [] 369 370 @property 371 def namespace(self) -> Optional[str]: 372 """ 373 Override to return the namespace of this stream, e.g. the Postgres schema which this stream will emit records for. 374 :return: A string containing the name of the namespace. 375 """ 376 return None 377 378 @property 379 def source_defined_cursor(self) -> bool: 380 """ 381 Return False if the cursor can be configured by the user. 382 """ 383 return True 384 385 @property 386 def exit_on_rate_limit(self) -> bool: 387 """Exit on rate limit getter, should return bool value. False if the stream will retry endlessly when rate limited.""" 388 return self._exit_on_rate_limit 389 390 @exit_on_rate_limit.setter 391 def exit_on_rate_limit(self, value: bool) -> None: 392 """Exit on rate limit setter, accept bool value.""" 393 self._exit_on_rate_limit = value 394 395 @property 396 @abstractmethod 397 def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: 398 """ 399 :return: string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields. 400 If the stream has no primary keys, return None. 401 """ 402 403 def stream_slices( 404 self, 405 *, 406 sync_mode: SyncMode, 407 cursor_field: Optional[List[str]] = None, 408 stream_state: Optional[Mapping[str, Any]] = None, 409 ) -> Iterable[Optional[Mapping[str, Any]]]: 410 """ 411 Override to define the slices for this stream. See the stream slicing section of the docs for more information. 412 413 :param sync_mode: 414 :param cursor_field: 415 :param stream_state: 416 :return: 417 """ 418 yield StreamSlice(partition={}, cursor_slice={}) 419 420 @property 421 def state_checkpoint_interval(self) -> Optional[int]: 422 """ 423 Decides how often to checkpoint state (i.e: emit a STATE message). E.g: if this returns a value of 100, then state is persisted after reading 424 100 records, then 200, 300, etc.. A good default value is 1000 although your mileage may vary depending on the underlying data source. 425 426 Checkpointing a stream avoids re-reading records in the case a sync is failed or cancelled. 427 428 return None if state should not be checkpointed e.g: because records returned from the underlying data source are not returned in 429 ascending order with respect to the cursor field. This can happen if the source does not support reading records in ascending order of 430 created_at date (or whatever the cursor is). In those cases, state must only be saved once the full stream has been read. 431 """ 432 return None 433 434 # Commented-out to avoid any runtime penalty, since this is used in a hot per-record codepath. 435 # To be evaluated for re-introduction here: https://github.com/airbytehq/airbyte-python-cdk/issues/116 436 # @deprecated( 437 # "Deprecated method `get_updated_state` as of CDK version 0.1.49. " 438 # "Please use explicit state property instead, see `IncrementalMixin` docs." 439 # ) 440 def get_updated_state( 441 self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any] 442 ) -> MutableMapping[str, Any]: 443 """DEPRECATED. Please use explicit state property instead, see `IncrementalMixin` docs. 444 445 Override to extract state from the latest record. Needed to implement incremental sync. 446 447 Inspects the latest record extracted from the data source and the current state object and return an updated state object. 448 449 For example: if the state object is based on created_at timestamp, and the current state is {'created_at': 10}, and the latest_record is 450 {'name': 'octavia', 'created_at': 20 } then this method would return {'created_at': 20} to indicate state should be updated to this object. 451 452 :param current_stream_state: The stream's current state object 453 :param latest_record: The latest record extracted from the stream 454 :return: An updated state object 455 """ 456 return {} 457 458 def get_cursor(self) -> Optional[Cursor]: 459 """ 460 A Cursor is an interface that a stream can implement to manage how its internal state is read and updated while 461 reading records. Historically, Python connectors had no concept of a cursor to manage state. Python streams need 462 to define a cursor implementation and override this method to manage state through a Cursor. 463 """ 464 return self.cursor 465 466 def _get_checkpoint_reader( 467 self, 468 logger: logging.Logger, 469 cursor_field: Optional[List[str]], 470 sync_mode: SyncMode, 471 stream_state: MutableMapping[str, Any], 472 ) -> CheckpointReader: 473 mappings_or_slices = self.stream_slices( 474 cursor_field=cursor_field, 475 sync_mode=sync_mode, # todo: change this interface to no longer rely on sync_mode for behavior 476 stream_state=stream_state, 477 ) 478 479 # Because of poor foresight, we wrote the default Stream.stream_slices() method to return [None] which is confusing and 480 # has now normalized this behavior for connector developers. Now some connectors return [None]. This is objectively 481 # misleading and a more ideal interface is [{}] to indicate we still want to iterate over one slice, but with no 482 # specific slice values. None is bad, and now I feel bad that I have to write this hack. 483 if mappings_or_slices == [None]: 484 mappings_or_slices = [{}] 485 486 slices_iterable_copy, iterable_for_detecting_format = itertools.tee(mappings_or_slices, 2) 487 stream_classification = self._classify_stream( 488 mappings_or_slices=iterable_for_detecting_format 489 ) 490 491 # Streams that override has_multiple_slices are explicitly indicating that they will iterate over 492 # multiple partitions. Inspecting slices to automatically apply the correct cursor is only needed as 493 # a backup. So if this value was already assigned to True by the stream, we don't need to reassign it 494 self.has_multiple_slices = ( 495 self.has_multiple_slices or stream_classification.has_multiple_slices 496 ) 497 498 cursor = self.get_cursor() 499 if cursor: 500 cursor.set_initial_state(stream_state=stream_state) 501 502 checkpoint_mode = self._checkpoint_mode 503 504 if cursor and stream_classification.is_legacy_format: 505 return LegacyCursorBasedCheckpointReader( 506 stream_slices=slices_iterable_copy, cursor=cursor, read_state_from_cursor=True 507 ) 508 elif cursor: 509 return CursorBasedCheckpointReader( 510 stream_slices=slices_iterable_copy, 511 cursor=cursor, 512 read_state_from_cursor=checkpoint_mode == CheckpointMode.RESUMABLE_FULL_REFRESH, 513 ) 514 elif checkpoint_mode == CheckpointMode.RESUMABLE_FULL_REFRESH: 515 # Resumable full refresh readers rely on the stream state dynamically being updated during pagination and does 516 # not iterate over a static set of slices. 517 return ResumableFullRefreshCheckpointReader(stream_state=stream_state) 518 elif checkpoint_mode == CheckpointMode.INCREMENTAL: 519 return IncrementalCheckpointReader( 520 stream_slices=slices_iterable_copy, stream_state=stream_state 521 ) 522 else: 523 return FullRefreshCheckpointReader(stream_slices=slices_iterable_copy) 524 525 @property 526 def _checkpoint_mode(self) -> CheckpointMode: 527 if self.is_resumable and len(self._wrapped_cursor_field()) > 0: 528 return CheckpointMode.INCREMENTAL 529 elif self.is_resumable: 530 return CheckpointMode.RESUMABLE_FULL_REFRESH 531 else: 532 return CheckpointMode.FULL_REFRESH 533 534 @staticmethod 535 def _classify_stream( 536 mappings_or_slices: Iterator[Optional[Union[Mapping[str, Any], StreamSlice]]], 537 ) -> StreamClassification: 538 """ 539 This is a bit of a crazy solution, but also the only way we can detect certain attributes about the stream since Python 540 streams do not follow consistent implementation patterns. We care about the following two attributes: 541 - is_substream: Helps to incrementally release changes since substreams w/ parents are much more complicated. Also 542 helps de-risk the release of changes that might impact all connectors 543 - uses_legacy_slice_format: Since the checkpoint reader must manage a complex state object, we opted to have it always 544 use the structured StreamSlice object. However, this requires backwards compatibility with Python sources that only 545 support the legacy mapping object 546 547 Both attributes can eventually be deprecated once stream's define this method deleted once substreams have been implemented and 548 legacy connectors all adhere to the StreamSlice object. 549 """ 550 if not mappings_or_slices: 551 raise ValueError("A stream should always have at least one slice") 552 try: 553 next_slice = next(mappings_or_slices) 554 if isinstance(next_slice, StreamSlice) and next_slice == StreamSlice( 555 partition={}, cursor_slice={} 556 ): 557 is_legacy_format = False 558 slice_has_value = False 559 elif next_slice == {}: 560 is_legacy_format = True 561 slice_has_value = False 562 elif isinstance(next_slice, StreamSlice): 563 is_legacy_format = False 564 slice_has_value = True 565 else: 566 is_legacy_format = True 567 slice_has_value = True 568 except StopIteration: 569 # If the stream has no slices, the format ultimately does not matter since no data will get synced. This is technically 570 # a valid case because it is up to the stream to define its slicing behavior 571 return StreamClassification(is_legacy_format=False, has_multiple_slices=False) 572 573 if slice_has_value: 574 # If the first slice contained a partition value from the result of stream_slices(), this is a substream that might 575 # have multiple parent records to iterate over 576 return StreamClassification( 577 is_legacy_format=is_legacy_format, has_multiple_slices=slice_has_value 578 ) 579 580 try: 581 # If stream_slices() returns multiple slices, this is also a substream that can potentially generate empty slices 582 next(mappings_or_slices) 583 return StreamClassification(is_legacy_format=is_legacy_format, has_multiple_slices=True) 584 except StopIteration: 585 # If the result of stream_slices() only returns a single empty stream slice, then we know this is a regular stream 586 return StreamClassification( 587 is_legacy_format=is_legacy_format, has_multiple_slices=False 588 ) 589 590 def log_stream_sync_configuration(self) -> None: 591 """ 592 Logs the configuration of this stream. 593 """ 594 self.logger.debug( 595 f"Syncing stream instance: {self.name}", 596 extra={ 597 "primary_key": self.primary_key, 598 "cursor_field": self.cursor_field, 599 }, 600 ) 601 602 @staticmethod 603 def _wrapped_primary_key( 604 keys: Optional[Union[str, List[str], List[List[str]]]], 605 ) -> Optional[List[List[str]]]: 606 """ 607 :return: wrap the primary_key property in a list of list of strings required by the Airbyte Stream object. 608 """ 609 if not keys: 610 return None 611 612 if isinstance(keys, str): 613 return [[keys]] 614 elif isinstance(keys, list): 615 wrapped_keys = [] 616 for component in keys: 617 if isinstance(component, str): 618 wrapped_keys.append([component]) 619 elif isinstance(component, list): 620 wrapped_keys.append(component) 621 else: 622 raise ValueError(f"Element must be either list or str. Got: {type(component)}") 623 return wrapped_keys 624 else: 625 raise ValueError(f"Element must be either list or str. Got: {type(keys)}") 626 627 def _observe_state( 628 self, checkpoint_reader: CheckpointReader, stream_state: Optional[Mapping[str, Any]] = None 629 ) -> None: 630 """ 631 Convenience method that attempts to read the Stream's state using the recommended way of connector's managing their 632 own state via state setter/getter. But if we get back an AttributeError, then the legacy Stream.get_updated_state() 633 method is used as a fallback method. 634 """ 635 636 # This is an inversion of the original logic that used to try state getter/setters first. As part of the work to 637 # automatically apply resumable full refresh to all streams, all HttpStream classes implement default state 638 # getter/setter methods, we should default to only using the incoming stream_state parameter value is {} which 639 # indicates the stream does not override the default get_updated_state() implementation. When the default method 640 # is not overridden, then the stream defers to self.state getter 641 if stream_state: 642 checkpoint_reader.observe(stream_state) 643 elif type(self).get_updated_state == Stream.get_updated_state: 644 # We only default to the state getter/setter if the stream does not use the legacy get_updated_state() method 645 try: 646 new_state = self.state # type: ignore # This will always exist on HttpStreams, but may not for Stream 647 if new_state: 648 checkpoint_reader.observe(new_state) 649 except AttributeError: 650 pass 651 652 def _checkpoint_state( # type: ignore # ignoring typing for ConnectorStateManager because of circular dependencies 653 self, 654 stream_state: Mapping[str, Any], 655 state_manager, 656 ) -> AirbyteMessage: 657 # todo: This can be consolidated into one ConnectorStateManager.update_and_create_state_message() method, but I want 658 # to reduce changes right now and this would span concurrent as well 659 state_manager.update_state_for_stream(self.name, self.namespace, stream_state) 660 return state_manager.create_state_message(self.name, self.namespace) # type: ignore [no-any-return] 661 662 @property 663 def configured_json_schema(self) -> Optional[Dict[str, Any]]: 664 """ 665 This property is set from the read method. 666 667 :return Optional[Dict]: JSON schema from configured catalog if provided, otherwise None. 668 """ 669 return self._configured_json_schema 670 671 @configured_json_schema.setter 672 def configured_json_schema(self, json_schema: Dict[str, Any]) -> None: 673 self._configured_json_schema = self._filter_schema_invalid_properties(json_schema) 674 675 def _filter_schema_invalid_properties( 676 self, configured_catalog_json_schema: Dict[str, Any] 677 ) -> Dict[str, Any]: 678 """ 679 Filters the properties in json_schema that are not present in the stream schema. 680 Configured Schemas can have very old fields, so we need to housekeeping ourselves. 681 """ 682 configured_schema: Any = configured_catalog_json_schema.get("properties", {}) 683 stream_schema_properties: Any = self.get_json_schema().get("properties", {}) 684 685 configured_keys = configured_schema.keys() 686 stream_keys = stream_schema_properties.keys() 687 invalid_properties = configured_keys - stream_keys 688 if not invalid_properties: 689 return configured_catalog_json_schema 690 691 self.logger.warning( 692 f"Stream {self.name}: the following fields are deprecated and cannot be synced. {invalid_properties}. Refresh the connection's source schema to resolve this warning." 693 ) 694 695 valid_configured_schema_properties_keys = stream_keys & configured_keys 696 valid_configured_schema_properties = {} 697 698 for configured_schema_property in valid_configured_schema_properties_keys: 699 valid_configured_schema_properties[configured_schema_property] = ( 700 stream_schema_properties[configured_schema_property] 701 ) 702 703 return {**configured_catalog_json_schema, "properties": valid_configured_schema_properties}
52def package_name_from_class(cls: object) -> str: 53 """Find the package name given a class name""" 54 module = inspect.getmodule(cls) 55 if module is not None: 56 return module.__name__.split(".")[0] 57 else: 58 raise ValueError(f"Could not find package name for class {cls}")
Find the package name given a class name
61class CheckpointMixin(ABC): 62 """Mixin for a stream that implements reading and writing the internal state used to checkpoint sync progress to the platform 63 64 class CheckpointedStream(Stream, CheckpointMixin): 65 @property 66 def state(self): 67 return self._state 68 69 @state.setter 70 def state(self, value): 71 self._state[self.cursor_field] = value[self.cursor_field] 72 """ 73 74 @property 75 @abstractmethod 76 def state(self) -> MutableMapping[str, Any]: 77 """State getter, should return state in form that can serialized to a string and send to the output 78 as a STATE AirbyteMessage. 79 80 A good example of a state is a cursor_value: 81 { 82 self.cursor_field: "cursor_value" 83 } 84 85 State should try to be as small as possible but at the same time descriptive enough to restore 86 syncing process from the point where it stopped. 87 """ 88 89 @state.setter 90 @abstractmethod 91 def state(self, value: MutableMapping[str, Any]) -> None: 92 """State setter, accept state serialized by state getter."""
Mixin for a stream that implements reading and writing the internal state used to checkpoint sync progress to the platform
class CheckpointedStream(Stream, CheckpointMixin): @property def state(self): return self._state
@state.setter
def state(self, value):
self._state[self.cursor_field] = value[self.cursor_field]
74 @property 75 @abstractmethod 76 def state(self) -> MutableMapping[str, Any]: 77 """State getter, should return state in form that can serialized to a string and send to the output 78 as a STATE AirbyteMessage. 79 80 A good example of a state is a cursor_value: 81 { 82 self.cursor_field: "cursor_value" 83 } 84 85 State should try to be as small as possible but at the same time descriptive enough to restore 86 syncing process from the point where it stopped. 87 """
State getter, should return state in form that can serialized to a string and send to the output as a STATE AirbyteMessage.
A good example of a state is a cursor_value: { self.cursor_field: "cursor_value" }
State should try to be as small as possible but at the same time descriptive enough to restore syncing process from the point where it stopped.
95@deprecated( 96 "Deprecated as of CDK version 0.87.0. " 97 "Deprecated in favor of the `CheckpointMixin` which offers similar functionality." 98) 99class IncrementalMixin(CheckpointMixin, ABC): 100 """Mixin to make stream incremental. 101 102 class IncrementalStream(Stream, IncrementalMixin): 103 @property 104 def state(self): 105 return self._state 106 107 @state.setter 108 def state(self, value): 109 self._state[self.cursor_field] = value[self.cursor_field] 110 """
Mixin to make stream incremental.
class IncrementalStream(Stream, IncrementalMixin): @property def state(self): return self._state
@state.setter
def state(self, value):
self._state[self.cursor_field] = value[self.cursor_field]
Inherited Members
119class Stream(ABC): 120 """ 121 Base abstract class for an Airbyte Stream. Makes no assumption of the Stream's underlying transport protocol. 122 """ 123 124 _configured_json_schema: Optional[Dict[str, Any]] = None 125 _exit_on_rate_limit: bool = False 126 127 # Use self.logger in subclasses to log any messages 128 @property 129 def logger(self) -> logging.Logger: 130 return logging.getLogger(f"airbyte.streams.{self.name}") 131 132 # TypeTransformer object to perform output data transformation 133 transformer: TypeTransformer = TypeTransformer(TransformConfig.NoTransform) 134 135 cursor: Optional[Cursor] = None 136 137 has_multiple_slices = False 138 139 @cached_property 140 def name(self) -> str: 141 """ 142 :return: Stream name. By default this is the implementing class name, but it can be overridden as needed. 143 """ 144 return casing.camel_to_snake(self.__class__.__name__) 145 146 def get_error_display_message(self, exception: BaseException) -> Optional[str]: 147 """ 148 Retrieves the user-friendly display message that corresponds to an exception. 149 This will be called when encountering an exception while reading records from the stream, and used to build the AirbyteTraceMessage. 150 151 The default implementation of this method does not return user-friendly messages for any exception type, but it should be overriden as needed. 152 153 :param exception: The exception that was raised 154 :return: A user-friendly message that indicates the cause of the error 155 """ 156 return None 157 158 def read( # type: ignore # ignoring typing for ConnectorStateManager because of circular dependencies 159 self, 160 configured_stream: ConfiguredAirbyteStream, 161 logger: logging.Logger, 162 slice_logger: SliceLogger, 163 stream_state: MutableMapping[str, Any], 164 state_manager, 165 internal_config: InternalConfig, 166 ) -> Iterable[StreamData]: 167 sync_mode = configured_stream.sync_mode 168 cursor_field = configured_stream.cursor_field 169 self.configured_json_schema = configured_stream.stream.json_schema 170 171 # WARNING: When performing a read() that uses incoming stream state, we MUST use the self.state that is defined as 172 # opposed to the incoming stream_state value. Because some connectors like ones using the file-based CDK modify 173 # state before setting the value on the Stream attribute, the most up-to-date state is derived from Stream.state 174 # instead of the stream_state parameter. This does not apply to legacy connectors using get_updated_state(). 175 try: 176 stream_state = self.state # type: ignore # we know the field might not exist... 177 except AttributeError: 178 pass 179 180 should_checkpoint = bool(state_manager) 181 checkpoint_reader = self._get_checkpoint_reader( 182 logger=logger, cursor_field=cursor_field, sync_mode=sync_mode, stream_state=stream_state 183 ) 184 185 next_slice = checkpoint_reader.next() 186 record_counter = 0 187 stream_state_tracker = copy.deepcopy(stream_state) 188 while next_slice is not None: 189 if slice_logger.should_log_slice_message(logger): 190 yield slice_logger.create_slice_log_message(next_slice) 191 records = self.read_records( 192 sync_mode=sync_mode, # todo: change this interface to no longer rely on sync_mode for behavior 193 stream_slice=next_slice, 194 stream_state=stream_state, 195 cursor_field=cursor_field or None, 196 ) 197 for record_data_or_message in records: 198 yield record_data_or_message 199 if isinstance(record_data_or_message, Mapping) or ( 200 hasattr(record_data_or_message, "type") 201 and record_data_or_message.type == MessageType.RECORD 202 ): 203 record_data = ( 204 record_data_or_message 205 if isinstance(record_data_or_message, Mapping) 206 else record_data_or_message.record 207 ) 208 209 # Thanks I hate it. RFR fundamentally doesn't fit with the concept of the legacy Stream.get_updated_state() 210 # method because RFR streams rely on pagination as a cursor. Stream.get_updated_state() was designed to make 211 # the CDK manage state using specifically the last seen record. don't @ brian.lai 212 # 213 # Also, because the legacy incremental state case decouples observing incoming records from emitting state, it 214 # requires that we separate CheckpointReader.observe() and CheckpointReader.get_checkpoint() which could 215 # otherwise be combined. 216 if self.cursor_field: 217 # Some connectors have streams that implement get_updated_state(), but do not define a cursor_field. This 218 # should be fixed on the stream implementation, but we should also protect against this in the CDK as well 219 stream_state_tracker = self.get_updated_state( 220 stream_state_tracker, 221 record_data, # type: ignore [arg-type] 222 ) 223 self._observe_state(checkpoint_reader, stream_state_tracker) 224 record_counter += 1 225 226 checkpoint_interval = self.state_checkpoint_interval 227 if ( 228 should_checkpoint 229 and checkpoint_interval 230 and record_counter % checkpoint_interval == 0 231 ): 232 checkpoint = checkpoint_reader.get_checkpoint() 233 if checkpoint: 234 airbyte_state_message = self._checkpoint_state( 235 checkpoint, state_manager=state_manager 236 ) 237 yield airbyte_state_message 238 239 if internal_config.is_limit_reached(record_counter): 240 break 241 self._observe_state(checkpoint_reader) 242 checkpoint_state = checkpoint_reader.get_checkpoint() 243 if should_checkpoint and checkpoint_state is not None: 244 airbyte_state_message = self._checkpoint_state( 245 checkpoint_state, state_manager=state_manager 246 ) 247 yield airbyte_state_message 248 249 next_slice = checkpoint_reader.next() 250 251 checkpoint = checkpoint_reader.get_checkpoint() 252 if should_checkpoint and checkpoint is not None: 253 airbyte_state_message = self._checkpoint_state(checkpoint, state_manager=state_manager) 254 yield airbyte_state_message 255 256 def read_only_records(self, state: Optional[Mapping[str, Any]] = None) -> Iterable[StreamData]: 257 """ 258 Helper method that performs a read on a stream with an optional state and emits records. If the parent stream supports 259 incremental, this operation does not update the stream's internal state (if it uses the modern state setter/getter) 260 or emit state messages. 261 """ 262 263 configured_stream = ConfiguredAirbyteStream( 264 stream=AirbyteStream( 265 name=self.name, 266 json_schema={}, 267 supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental], 268 ), 269 sync_mode=SyncMode.incremental if state else SyncMode.full_refresh, 270 destination_sync_mode=DestinationSyncMode.append, 271 ) 272 273 yield from self.read( 274 configured_stream=configured_stream, 275 logger=self.logger, 276 slice_logger=DebugSliceLogger(), 277 stream_state=dict(state) 278 if state 279 else {}, # read() expects MutableMapping instead of Mapping which is used more often 280 state_manager=None, 281 internal_config=InternalConfig(), # type: ignore [call-arg] 282 ) 283 284 @abstractmethod 285 def read_records( 286 self, 287 sync_mode: SyncMode, 288 cursor_field: Optional[List[str]] = None, 289 stream_slice: Optional[Mapping[str, Any]] = None, 290 stream_state: Optional[Mapping[str, Any]] = None, 291 ) -> Iterable[StreamData]: 292 """ 293 This method should be overridden by subclasses to read records based on the inputs 294 """ 295 296 @lru_cache(maxsize=None) 297 def get_json_schema(self) -> Mapping[str, Any]: 298 """ 299 :return: A dict of the JSON schema representing this stream. 300 301 The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property. 302 Override as needed. 303 """ 304 # TODO show an example of using pydantic to define the JSON schema, or reading an OpenAPI spec 305 return ResourceSchemaLoader(package_name_from_class(self.__class__)).get_schema(self.name) 306 307 def as_airbyte_stream(self) -> AirbyteStream: 308 stream = AirbyteStream( 309 name=self.name, 310 json_schema=dict(self.get_json_schema()), 311 supported_sync_modes=[SyncMode.full_refresh], 312 is_resumable=self.is_resumable, 313 ) 314 315 if self.namespace: 316 stream.namespace = self.namespace 317 318 # If we can offer incremental we always should. RFR is always less reliable than incremental which uses a real cursor value 319 if self.supports_incremental: 320 stream.source_defined_cursor = self.source_defined_cursor 321 stream.supported_sync_modes.append(SyncMode.incremental) 322 stream.default_cursor_field = self._wrapped_cursor_field() 323 324 keys = Stream._wrapped_primary_key(self.primary_key) 325 if keys and len(keys) > 0: 326 stream.source_defined_primary_key = keys 327 328 return stream 329 330 @property 331 def supports_incremental(self) -> bool: 332 """ 333 :return: True if this stream supports incrementally reading data 334 """ 335 return len(self._wrapped_cursor_field()) > 0 336 337 @property 338 def is_resumable(self) -> bool: 339 """ 340 :return: True if this stream allows the checkpointing of sync progress and can resume from it on subsequent attempts. 341 This differs from supports_incremental because certain kinds of streams like those supporting resumable full refresh 342 can checkpoint progress in between attempts for improved fault tolerance. However, they will start from the beginning 343 on the next sync job. 344 """ 345 if self.supports_incremental: 346 return True 347 if self.has_multiple_slices: 348 # We temporarily gate substream to not support RFR because puts a pretty high burden on connector developers 349 # to structure stream state in a very specific way. We also can't check for issubclass(HttpSubStream) because 350 # not all substreams implement the interface and it would be a circular dependency so we use parent as a surrogate 351 return False 352 elif hasattr(type(self), "state") and getattr(type(self), "state").fset is not None: 353 # Modern case where a stream manages state using getter/setter 354 return True 355 else: 356 # Legacy case where the CDK manages state via the get_updated_state() method. This is determined by checking if 357 # the stream's get_updated_state() differs from the Stream class and therefore has been overridden 358 return type(self).get_updated_state != Stream.get_updated_state 359 360 def _wrapped_cursor_field(self) -> List[str]: 361 return [self.cursor_field] if isinstance(self.cursor_field, str) else self.cursor_field 362 363 @property 364 def cursor_field(self) -> Union[str, List[str]]: 365 """ 366 Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field. 367 :return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor. 368 """ 369 return [] 370 371 @property 372 def namespace(self) -> Optional[str]: 373 """ 374 Override to return the namespace of this stream, e.g. the Postgres schema which this stream will emit records for. 375 :return: A string containing the name of the namespace. 376 """ 377 return None 378 379 @property 380 def source_defined_cursor(self) -> bool: 381 """ 382 Return False if the cursor can be configured by the user. 383 """ 384 return True 385 386 @property 387 def exit_on_rate_limit(self) -> bool: 388 """Exit on rate limit getter, should return bool value. False if the stream will retry endlessly when rate limited.""" 389 return self._exit_on_rate_limit 390 391 @exit_on_rate_limit.setter 392 def exit_on_rate_limit(self, value: bool) -> None: 393 """Exit on rate limit setter, accept bool value.""" 394 self._exit_on_rate_limit = value 395 396 @property 397 @abstractmethod 398 def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: 399 """ 400 :return: string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields. 401 If the stream has no primary keys, return None. 402 """ 403 404 def stream_slices( 405 self, 406 *, 407 sync_mode: SyncMode, 408 cursor_field: Optional[List[str]] = None, 409 stream_state: Optional[Mapping[str, Any]] = None, 410 ) -> Iterable[Optional[Mapping[str, Any]]]: 411 """ 412 Override to define the slices for this stream. See the stream slicing section of the docs for more information. 413 414 :param sync_mode: 415 :param cursor_field: 416 :param stream_state: 417 :return: 418 """ 419 yield StreamSlice(partition={}, cursor_slice={}) 420 421 @property 422 def state_checkpoint_interval(self) -> Optional[int]: 423 """ 424 Decides how often to checkpoint state (i.e: emit a STATE message). E.g: if this returns a value of 100, then state is persisted after reading 425 100 records, then 200, 300, etc.. A good default value is 1000 although your mileage may vary depending on the underlying data source. 426 427 Checkpointing a stream avoids re-reading records in the case a sync is failed or cancelled. 428 429 return None if state should not be checkpointed e.g: because records returned from the underlying data source are not returned in 430 ascending order with respect to the cursor field. This can happen if the source does not support reading records in ascending order of 431 created_at date (or whatever the cursor is). In those cases, state must only be saved once the full stream has been read. 432 """ 433 return None 434 435 # Commented-out to avoid any runtime penalty, since this is used in a hot per-record codepath. 436 # To be evaluated for re-introduction here: https://github.com/airbytehq/airbyte-python-cdk/issues/116 437 # @deprecated( 438 # "Deprecated method `get_updated_state` as of CDK version 0.1.49. " 439 # "Please use explicit state property instead, see `IncrementalMixin` docs." 440 # ) 441 def get_updated_state( 442 self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any] 443 ) -> MutableMapping[str, Any]: 444 """DEPRECATED. Please use explicit state property instead, see `IncrementalMixin` docs. 445 446 Override to extract state from the latest record. Needed to implement incremental sync. 447 448 Inspects the latest record extracted from the data source and the current state object and return an updated state object. 449 450 For example: if the state object is based on created_at timestamp, and the current state is {'created_at': 10}, and the latest_record is 451 {'name': 'octavia', 'created_at': 20 } then this method would return {'created_at': 20} to indicate state should be updated to this object. 452 453 :param current_stream_state: The stream's current state object 454 :param latest_record: The latest record extracted from the stream 455 :return: An updated state object 456 """ 457 return {} 458 459 def get_cursor(self) -> Optional[Cursor]: 460 """ 461 A Cursor is an interface that a stream can implement to manage how its internal state is read and updated while 462 reading records. Historically, Python connectors had no concept of a cursor to manage state. Python streams need 463 to define a cursor implementation and override this method to manage state through a Cursor. 464 """ 465 return self.cursor 466 467 def _get_checkpoint_reader( 468 self, 469 logger: logging.Logger, 470 cursor_field: Optional[List[str]], 471 sync_mode: SyncMode, 472 stream_state: MutableMapping[str, Any], 473 ) -> CheckpointReader: 474 mappings_or_slices = self.stream_slices( 475 cursor_field=cursor_field, 476 sync_mode=sync_mode, # todo: change this interface to no longer rely on sync_mode for behavior 477 stream_state=stream_state, 478 ) 479 480 # Because of poor foresight, we wrote the default Stream.stream_slices() method to return [None] which is confusing and 481 # has now normalized this behavior for connector developers. Now some connectors return [None]. This is objectively 482 # misleading and a more ideal interface is [{}] to indicate we still want to iterate over one slice, but with no 483 # specific slice values. None is bad, and now I feel bad that I have to write this hack. 484 if mappings_or_slices == [None]: 485 mappings_or_slices = [{}] 486 487 slices_iterable_copy, iterable_for_detecting_format = itertools.tee(mappings_or_slices, 2) 488 stream_classification = self._classify_stream( 489 mappings_or_slices=iterable_for_detecting_format 490 ) 491 492 # Streams that override has_multiple_slices are explicitly indicating that they will iterate over 493 # multiple partitions. Inspecting slices to automatically apply the correct cursor is only needed as 494 # a backup. So if this value was already assigned to True by the stream, we don't need to reassign it 495 self.has_multiple_slices = ( 496 self.has_multiple_slices or stream_classification.has_multiple_slices 497 ) 498 499 cursor = self.get_cursor() 500 if cursor: 501 cursor.set_initial_state(stream_state=stream_state) 502 503 checkpoint_mode = self._checkpoint_mode 504 505 if cursor and stream_classification.is_legacy_format: 506 return LegacyCursorBasedCheckpointReader( 507 stream_slices=slices_iterable_copy, cursor=cursor, read_state_from_cursor=True 508 ) 509 elif cursor: 510 return CursorBasedCheckpointReader( 511 stream_slices=slices_iterable_copy, 512 cursor=cursor, 513 read_state_from_cursor=checkpoint_mode == CheckpointMode.RESUMABLE_FULL_REFRESH, 514 ) 515 elif checkpoint_mode == CheckpointMode.RESUMABLE_FULL_REFRESH: 516 # Resumable full refresh readers rely on the stream state dynamically being updated during pagination and does 517 # not iterate over a static set of slices. 518 return ResumableFullRefreshCheckpointReader(stream_state=stream_state) 519 elif checkpoint_mode == CheckpointMode.INCREMENTAL: 520 return IncrementalCheckpointReader( 521 stream_slices=slices_iterable_copy, stream_state=stream_state 522 ) 523 else: 524 return FullRefreshCheckpointReader(stream_slices=slices_iterable_copy) 525 526 @property 527 def _checkpoint_mode(self) -> CheckpointMode: 528 if self.is_resumable and len(self._wrapped_cursor_field()) > 0: 529 return CheckpointMode.INCREMENTAL 530 elif self.is_resumable: 531 return CheckpointMode.RESUMABLE_FULL_REFRESH 532 else: 533 return CheckpointMode.FULL_REFRESH 534 535 @staticmethod 536 def _classify_stream( 537 mappings_or_slices: Iterator[Optional[Union[Mapping[str, Any], StreamSlice]]], 538 ) -> StreamClassification: 539 """ 540 This is a bit of a crazy solution, but also the only way we can detect certain attributes about the stream since Python 541 streams do not follow consistent implementation patterns. We care about the following two attributes: 542 - is_substream: Helps to incrementally release changes since substreams w/ parents are much more complicated. Also 543 helps de-risk the release of changes that might impact all connectors 544 - uses_legacy_slice_format: Since the checkpoint reader must manage a complex state object, we opted to have it always 545 use the structured StreamSlice object. However, this requires backwards compatibility with Python sources that only 546 support the legacy mapping object 547 548 Both attributes can eventually be deprecated once stream's define this method deleted once substreams have been implemented and 549 legacy connectors all adhere to the StreamSlice object. 550 """ 551 if not mappings_or_slices: 552 raise ValueError("A stream should always have at least one slice") 553 try: 554 next_slice = next(mappings_or_slices) 555 if isinstance(next_slice, StreamSlice) and next_slice == StreamSlice( 556 partition={}, cursor_slice={} 557 ): 558 is_legacy_format = False 559 slice_has_value = False 560 elif next_slice == {}: 561 is_legacy_format = True 562 slice_has_value = False 563 elif isinstance(next_slice, StreamSlice): 564 is_legacy_format = False 565 slice_has_value = True 566 else: 567 is_legacy_format = True 568 slice_has_value = True 569 except StopIteration: 570 # If the stream has no slices, the format ultimately does not matter since no data will get synced. This is technically 571 # a valid case because it is up to the stream to define its slicing behavior 572 return StreamClassification(is_legacy_format=False, has_multiple_slices=False) 573 574 if slice_has_value: 575 # If the first slice contained a partition value from the result of stream_slices(), this is a substream that might 576 # have multiple parent records to iterate over 577 return StreamClassification( 578 is_legacy_format=is_legacy_format, has_multiple_slices=slice_has_value 579 ) 580 581 try: 582 # If stream_slices() returns multiple slices, this is also a substream that can potentially generate empty slices 583 next(mappings_or_slices) 584 return StreamClassification(is_legacy_format=is_legacy_format, has_multiple_slices=True) 585 except StopIteration: 586 # If the result of stream_slices() only returns a single empty stream slice, then we know this is a regular stream 587 return StreamClassification( 588 is_legacy_format=is_legacy_format, has_multiple_slices=False 589 ) 590 591 def log_stream_sync_configuration(self) -> None: 592 """ 593 Logs the configuration of this stream. 594 """ 595 self.logger.debug( 596 f"Syncing stream instance: {self.name}", 597 extra={ 598 "primary_key": self.primary_key, 599 "cursor_field": self.cursor_field, 600 }, 601 ) 602 603 @staticmethod 604 def _wrapped_primary_key( 605 keys: Optional[Union[str, List[str], List[List[str]]]], 606 ) -> Optional[List[List[str]]]: 607 """ 608 :return: wrap the primary_key property in a list of list of strings required by the Airbyte Stream object. 609 """ 610 if not keys: 611 return None 612 613 if isinstance(keys, str): 614 return [[keys]] 615 elif isinstance(keys, list): 616 wrapped_keys = [] 617 for component in keys: 618 if isinstance(component, str): 619 wrapped_keys.append([component]) 620 elif isinstance(component, list): 621 wrapped_keys.append(component) 622 else: 623 raise ValueError(f"Element must be either list or str. Got: {type(component)}") 624 return wrapped_keys 625 else: 626 raise ValueError(f"Element must be either list or str. Got: {type(keys)}") 627 628 def _observe_state( 629 self, checkpoint_reader: CheckpointReader, stream_state: Optional[Mapping[str, Any]] = None 630 ) -> None: 631 """ 632 Convenience method that attempts to read the Stream's state using the recommended way of connector's managing their 633 own state via state setter/getter. But if we get back an AttributeError, then the legacy Stream.get_updated_state() 634 method is used as a fallback method. 635 """ 636 637 # This is an inversion of the original logic that used to try state getter/setters first. As part of the work to 638 # automatically apply resumable full refresh to all streams, all HttpStream classes implement default state 639 # getter/setter methods, we should default to only using the incoming stream_state parameter value is {} which 640 # indicates the stream does not override the default get_updated_state() implementation. When the default method 641 # is not overridden, then the stream defers to self.state getter 642 if stream_state: 643 checkpoint_reader.observe(stream_state) 644 elif type(self).get_updated_state == Stream.get_updated_state: 645 # We only default to the state getter/setter if the stream does not use the legacy get_updated_state() method 646 try: 647 new_state = self.state # type: ignore # This will always exist on HttpStreams, but may not for Stream 648 if new_state: 649 checkpoint_reader.observe(new_state) 650 except AttributeError: 651 pass 652 653 def _checkpoint_state( # type: ignore # ignoring typing for ConnectorStateManager because of circular dependencies 654 self, 655 stream_state: Mapping[str, Any], 656 state_manager, 657 ) -> AirbyteMessage: 658 # todo: This can be consolidated into one ConnectorStateManager.update_and_create_state_message() method, but I want 659 # to reduce changes right now and this would span concurrent as well 660 state_manager.update_state_for_stream(self.name, self.namespace, stream_state) 661 return state_manager.create_state_message(self.name, self.namespace) # type: ignore [no-any-return] 662 663 @property 664 def configured_json_schema(self) -> Optional[Dict[str, Any]]: 665 """ 666 This property is set from the read method. 667 668 :return Optional[Dict]: JSON schema from configured catalog if provided, otherwise None. 669 """ 670 return self._configured_json_schema 671 672 @configured_json_schema.setter 673 def configured_json_schema(self, json_schema: Dict[str, Any]) -> None: 674 self._configured_json_schema = self._filter_schema_invalid_properties(json_schema) 675 676 def _filter_schema_invalid_properties( 677 self, configured_catalog_json_schema: Dict[str, Any] 678 ) -> Dict[str, Any]: 679 """ 680 Filters the properties in json_schema that are not present in the stream schema. 681 Configured Schemas can have very old fields, so we need to housekeeping ourselves. 682 """ 683 configured_schema: Any = configured_catalog_json_schema.get("properties", {}) 684 stream_schema_properties: Any = self.get_json_schema().get("properties", {}) 685 686 configured_keys = configured_schema.keys() 687 stream_keys = stream_schema_properties.keys() 688 invalid_properties = configured_keys - stream_keys 689 if not invalid_properties: 690 return configured_catalog_json_schema 691 692 self.logger.warning( 693 f"Stream {self.name}: the following fields are deprecated and cannot be synced. {invalid_properties}. Refresh the connection's source schema to resolve this warning." 694 ) 695 696 valid_configured_schema_properties_keys = stream_keys & configured_keys 697 valid_configured_schema_properties = {} 698 699 for configured_schema_property in valid_configured_schema_properties_keys: 700 valid_configured_schema_properties[configured_schema_property] = ( 701 stream_schema_properties[configured_schema_property] 702 ) 703 704 return {**configured_catalog_json_schema, "properties": valid_configured_schema_properties}
Base abstract class for an Airbyte Stream. Makes no assumption of the Stream's underlying transport protocol.
139 @cached_property 140 def name(self) -> str: 141 """ 142 :return: Stream name. By default this is the implementing class name, but it can be overridden as needed. 143 """ 144 return casing.camel_to_snake(self.__class__.__name__)
Returns
Stream name. By default this is the implementing class name, but it can be overridden as needed.
146 def get_error_display_message(self, exception: BaseException) -> Optional[str]: 147 """ 148 Retrieves the user-friendly display message that corresponds to an exception. 149 This will be called when encountering an exception while reading records from the stream, and used to build the AirbyteTraceMessage. 150 151 The default implementation of this method does not return user-friendly messages for any exception type, but it should be overriden as needed. 152 153 :param exception: The exception that was raised 154 :return: A user-friendly message that indicates the cause of the error 155 """ 156 return None
Retrieves the user-friendly display message that corresponds to an exception. This will be called when encountering an exception while reading records from the stream, and used to build the AirbyteTraceMessage.
The default implementation of this method does not return user-friendly messages for any exception type, but it should be overriden as needed.
Parameters
- exception: The exception that was raised
Returns
A user-friendly message that indicates the cause of the error
158 def read( # type: ignore # ignoring typing for ConnectorStateManager because of circular dependencies 159 self, 160 configured_stream: ConfiguredAirbyteStream, 161 logger: logging.Logger, 162 slice_logger: SliceLogger, 163 stream_state: MutableMapping[str, Any], 164 state_manager, 165 internal_config: InternalConfig, 166 ) -> Iterable[StreamData]: 167 sync_mode = configured_stream.sync_mode 168 cursor_field = configured_stream.cursor_field 169 self.configured_json_schema = configured_stream.stream.json_schema 170 171 # WARNING: When performing a read() that uses incoming stream state, we MUST use the self.state that is defined as 172 # opposed to the incoming stream_state value. Because some connectors like ones using the file-based CDK modify 173 # state before setting the value on the Stream attribute, the most up-to-date state is derived from Stream.state 174 # instead of the stream_state parameter. This does not apply to legacy connectors using get_updated_state(). 175 try: 176 stream_state = self.state # type: ignore # we know the field might not exist... 177 except AttributeError: 178 pass 179 180 should_checkpoint = bool(state_manager) 181 checkpoint_reader = self._get_checkpoint_reader( 182 logger=logger, cursor_field=cursor_field, sync_mode=sync_mode, stream_state=stream_state 183 ) 184 185 next_slice = checkpoint_reader.next() 186 record_counter = 0 187 stream_state_tracker = copy.deepcopy(stream_state) 188 while next_slice is not None: 189 if slice_logger.should_log_slice_message(logger): 190 yield slice_logger.create_slice_log_message(next_slice) 191 records = self.read_records( 192 sync_mode=sync_mode, # todo: change this interface to no longer rely on sync_mode for behavior 193 stream_slice=next_slice, 194 stream_state=stream_state, 195 cursor_field=cursor_field or None, 196 ) 197 for record_data_or_message in records: 198 yield record_data_or_message 199 if isinstance(record_data_or_message, Mapping) or ( 200 hasattr(record_data_or_message, "type") 201 and record_data_or_message.type == MessageType.RECORD 202 ): 203 record_data = ( 204 record_data_or_message 205 if isinstance(record_data_or_message, Mapping) 206 else record_data_or_message.record 207 ) 208 209 # Thanks I hate it. RFR fundamentally doesn't fit with the concept of the legacy Stream.get_updated_state() 210 # method because RFR streams rely on pagination as a cursor. Stream.get_updated_state() was designed to make 211 # the CDK manage state using specifically the last seen record. don't @ brian.lai 212 # 213 # Also, because the legacy incremental state case decouples observing incoming records from emitting state, it 214 # requires that we separate CheckpointReader.observe() and CheckpointReader.get_checkpoint() which could 215 # otherwise be combined. 216 if self.cursor_field: 217 # Some connectors have streams that implement get_updated_state(), but do not define a cursor_field. This 218 # should be fixed on the stream implementation, but we should also protect against this in the CDK as well 219 stream_state_tracker = self.get_updated_state( 220 stream_state_tracker, 221 record_data, # type: ignore [arg-type] 222 ) 223 self._observe_state(checkpoint_reader, stream_state_tracker) 224 record_counter += 1 225 226 checkpoint_interval = self.state_checkpoint_interval 227 if ( 228 should_checkpoint 229 and checkpoint_interval 230 and record_counter % checkpoint_interval == 0 231 ): 232 checkpoint = checkpoint_reader.get_checkpoint() 233 if checkpoint: 234 airbyte_state_message = self._checkpoint_state( 235 checkpoint, state_manager=state_manager 236 ) 237 yield airbyte_state_message 238 239 if internal_config.is_limit_reached(record_counter): 240 break 241 self._observe_state(checkpoint_reader) 242 checkpoint_state = checkpoint_reader.get_checkpoint() 243 if should_checkpoint and checkpoint_state is not None: 244 airbyte_state_message = self._checkpoint_state( 245 checkpoint_state, state_manager=state_manager 246 ) 247 yield airbyte_state_message 248 249 next_slice = checkpoint_reader.next() 250 251 checkpoint = checkpoint_reader.get_checkpoint() 252 if should_checkpoint and checkpoint is not None: 253 airbyte_state_message = self._checkpoint_state(checkpoint, state_manager=state_manager) 254 yield airbyte_state_message
256 def read_only_records(self, state: Optional[Mapping[str, Any]] = None) -> Iterable[StreamData]: 257 """ 258 Helper method that performs a read on a stream with an optional state and emits records. If the parent stream supports 259 incremental, this operation does not update the stream's internal state (if it uses the modern state setter/getter) 260 or emit state messages. 261 """ 262 263 configured_stream = ConfiguredAirbyteStream( 264 stream=AirbyteStream( 265 name=self.name, 266 json_schema={}, 267 supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental], 268 ), 269 sync_mode=SyncMode.incremental if state else SyncMode.full_refresh, 270 destination_sync_mode=DestinationSyncMode.append, 271 ) 272 273 yield from self.read( 274 configured_stream=configured_stream, 275 logger=self.logger, 276 slice_logger=DebugSliceLogger(), 277 stream_state=dict(state) 278 if state 279 else {}, # read() expects MutableMapping instead of Mapping which is used more often 280 state_manager=None, 281 internal_config=InternalConfig(), # type: ignore [call-arg] 282 )
Helper method that performs a read on a stream with an optional state and emits records. If the parent stream supports incremental, this operation does not update the stream's internal state (if it uses the modern state setter/getter) or emit state messages.
284 @abstractmethod 285 def read_records( 286 self, 287 sync_mode: SyncMode, 288 cursor_field: Optional[List[str]] = None, 289 stream_slice: Optional[Mapping[str, Any]] = None, 290 stream_state: Optional[Mapping[str, Any]] = None, 291 ) -> Iterable[StreamData]: 292 """ 293 This method should be overridden by subclasses to read records based on the inputs 294 """
This method should be overridden by subclasses to read records based on the inputs
296 @lru_cache(maxsize=None) 297 def get_json_schema(self) -> Mapping[str, Any]: 298 """ 299 :return: A dict of the JSON schema representing this stream. 300 301 The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property. 302 Override as needed. 303 """ 304 # TODO show an example of using pydantic to define the JSON schema, or reading an OpenAPI spec 305 return ResourceSchemaLoader(package_name_from_class(self.__class__)).get_schema(self.name)
Returns
A dict of the JSON schema representing this stream.
The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property. Override as needed.
307 def as_airbyte_stream(self) -> AirbyteStream: 308 stream = AirbyteStream( 309 name=self.name, 310 json_schema=dict(self.get_json_schema()), 311 supported_sync_modes=[SyncMode.full_refresh], 312 is_resumable=self.is_resumable, 313 ) 314 315 if self.namespace: 316 stream.namespace = self.namespace 317 318 # If we can offer incremental we always should. RFR is always less reliable than incremental which uses a real cursor value 319 if self.supports_incremental: 320 stream.source_defined_cursor = self.source_defined_cursor 321 stream.supported_sync_modes.append(SyncMode.incremental) 322 stream.default_cursor_field = self._wrapped_cursor_field() 323 324 keys = Stream._wrapped_primary_key(self.primary_key) 325 if keys and len(keys) > 0: 326 stream.source_defined_primary_key = keys 327 328 return stream
330 @property 331 def supports_incremental(self) -> bool: 332 """ 333 :return: True if this stream supports incrementally reading data 334 """ 335 return len(self._wrapped_cursor_field()) > 0
Returns
True if this stream supports incrementally reading data
337 @property 338 def is_resumable(self) -> bool: 339 """ 340 :return: True if this stream allows the checkpointing of sync progress and can resume from it on subsequent attempts. 341 This differs from supports_incremental because certain kinds of streams like those supporting resumable full refresh 342 can checkpoint progress in between attempts for improved fault tolerance. However, they will start from the beginning 343 on the next sync job. 344 """ 345 if self.supports_incremental: 346 return True 347 if self.has_multiple_slices: 348 # We temporarily gate substream to not support RFR because puts a pretty high burden on connector developers 349 # to structure stream state in a very specific way. We also can't check for issubclass(HttpSubStream) because 350 # not all substreams implement the interface and it would be a circular dependency so we use parent as a surrogate 351 return False 352 elif hasattr(type(self), "state") and getattr(type(self), "state").fset is not None: 353 # Modern case where a stream manages state using getter/setter 354 return True 355 else: 356 # Legacy case where the CDK manages state via the get_updated_state() method. This is determined by checking if 357 # the stream's get_updated_state() differs from the Stream class and therefore has been overridden 358 return type(self).get_updated_state != Stream.get_updated_state
Returns
True if this stream allows the checkpointing of sync progress and can resume from it on subsequent attempts. This differs from supports_incremental because certain kinds of streams like those supporting resumable full refresh can checkpoint progress in between attempts for improved fault tolerance. However, they will start from the beginning on the next sync job.
363 @property 364 def cursor_field(self) -> Union[str, List[str]]: 365 """ 366 Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field. 367 :return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor. 368 """ 369 return []
Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
Returns
The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.
371 @property 372 def namespace(self) -> Optional[str]: 373 """ 374 Override to return the namespace of this stream, e.g. the Postgres schema which this stream will emit records for. 375 :return: A string containing the name of the namespace. 376 """ 377 return None
Override to return the namespace of this stream, e.g. the Postgres schema which this stream will emit records for.
Returns
A string containing the name of the namespace.
379 @property 380 def source_defined_cursor(self) -> bool: 381 """ 382 Return False if the cursor can be configured by the user. 383 """ 384 return True
Return False if the cursor can be configured by the user.
386 @property 387 def exit_on_rate_limit(self) -> bool: 388 """Exit on rate limit getter, should return bool value. False if the stream will retry endlessly when rate limited.""" 389 return self._exit_on_rate_limit
Exit on rate limit getter, should return bool value. False if the stream will retry endlessly when rate limited.
396 @property 397 @abstractmethod 398 def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: 399 """ 400 :return: string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields. 401 If the stream has no primary keys, return None. 402 """
Returns
string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields. If the stream has no primary keys, return None.
404 def stream_slices( 405 self, 406 *, 407 sync_mode: SyncMode, 408 cursor_field: Optional[List[str]] = None, 409 stream_state: Optional[Mapping[str, Any]] = None, 410 ) -> Iterable[Optional[Mapping[str, Any]]]: 411 """ 412 Override to define the slices for this stream. See the stream slicing section of the docs for more information. 413 414 :param sync_mode: 415 :param cursor_field: 416 :param stream_state: 417 :return: 418 """ 419 yield StreamSlice(partition={}, cursor_slice={})
Override to define the slices for this stream. See the stream slicing section of the docs for more information.
Parameters
- sync_mode:
- cursor_field:
- stream_state:
Returns
421 @property 422 def state_checkpoint_interval(self) -> Optional[int]: 423 """ 424 Decides how often to checkpoint state (i.e: emit a STATE message). E.g: if this returns a value of 100, then state is persisted after reading 425 100 records, then 200, 300, etc.. A good default value is 1000 although your mileage may vary depending on the underlying data source. 426 427 Checkpointing a stream avoids re-reading records in the case a sync is failed or cancelled. 428 429 return None if state should not be checkpointed e.g: because records returned from the underlying data source are not returned in 430 ascending order with respect to the cursor field. This can happen if the source does not support reading records in ascending order of 431 created_at date (or whatever the cursor is). In those cases, state must only be saved once the full stream has been read. 432 """ 433 return None
Decides how often to checkpoint state (i.e: emit a STATE message). E.g: if this returns a value of 100, then state is persisted after reading 100 records, then 200, 300, etc.. A good default value is 1000 although your mileage may vary depending on the underlying data source.
Checkpointing a stream avoids re-reading records in the case a sync is failed or cancelled.
return None if state should not be checkpointed e.g: because records returned from the underlying data source are not returned in ascending order with respect to the cursor field. This can happen if the source does not support reading records in ascending order of created_at date (or whatever the cursor is). In those cases, state must only be saved once the full stream has been read.
441 def get_updated_state( 442 self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any] 443 ) -> MutableMapping[str, Any]: 444 """DEPRECATED. Please use explicit state property instead, see `IncrementalMixin` docs. 445 446 Override to extract state from the latest record. Needed to implement incremental sync. 447 448 Inspects the latest record extracted from the data source and the current state object and return an updated state object. 449 450 For example: if the state object is based on created_at timestamp, and the current state is {'created_at': 10}, and the latest_record is 451 {'name': 'octavia', 'created_at': 20 } then this method would return {'created_at': 20} to indicate state should be updated to this object. 452 453 :param current_stream_state: The stream's current state object 454 :param latest_record: The latest record extracted from the stream 455 :return: An updated state object 456 """ 457 return {}
DEPRECATED. Please use explicit state property instead, see IncrementalMixin
docs.
Override to extract state from the latest record. Needed to implement incremental sync.
Inspects the latest record extracted from the data source and the current state object and return an updated state object.
For example: if the state object is based on created_at timestamp, and the current state is {'created_at': 10}, and the latest_record is {'name': 'octavia', 'created_at': 20 } then this method would return {'created_at': 20} to indicate state should be updated to this object.
Parameters
- current_stream_state: The stream's current state object
- latest_record: The latest record extracted from the stream
Returns
An updated state object
459 def get_cursor(self) -> Optional[Cursor]: 460 """ 461 A Cursor is an interface that a stream can implement to manage how its internal state is read and updated while 462 reading records. Historically, Python connectors had no concept of a cursor to manage state. Python streams need 463 to define a cursor implementation and override this method to manage state through a Cursor. 464 """ 465 return self.cursor
A Cursor is an interface that a stream can implement to manage how its internal state is read and updated while reading records. Historically, Python connectors had no concept of a cursor to manage state. Python streams need to define a cursor implementation and override this method to manage state through a Cursor.
591 def log_stream_sync_configuration(self) -> None: 592 """ 593 Logs the configuration of this stream. 594 """ 595 self.logger.debug( 596 f"Syncing stream instance: {self.name}", 597 extra={ 598 "primary_key": self.primary_key, 599 "cursor_field": self.cursor_field, 600 }, 601 )
Logs the configuration of this stream.
663 @property 664 def configured_json_schema(self) -> Optional[Dict[str, Any]]: 665 """ 666 This property is set from the read method. 667 668 :return Optional[Dict]: JSON schema from configured catalog if provided, otherwise None. 669 """ 670 return self._configured_json_schema
This property is set from the read method.
Returns
JSON schema from configured catalog if provided, otherwise None.