
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  4import copy
  5import inspect
  6import itertools
  7import logging
  8from abc import ABC, abstractmethod
  9from dataclasses import dataclass
 10from functools import cached_property, lru_cache
 11from typing import Any, Dict, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Union
 13from typing_extensions import deprecated
 15import airbyte_cdk.sources.utils.casing as casing
 16from airbyte_cdk.models import (
 17    AirbyteMessage,
 18    AirbyteStream,
 19    ConfiguredAirbyteStream,
 20    DestinationSyncMode,
 21    SyncMode,
 23from airbyte_cdk.models import Type as MessageType
 24from airbyte_cdk.sources.streams.checkpoint import (
 25    CheckpointMode,
 26    CheckpointReader,
 27    Cursor,
 28    CursorBasedCheckpointReader,
 29    FullRefreshCheckpointReader,
 30    IncrementalCheckpointReader,
 31    LegacyCursorBasedCheckpointReader,
 32    ResumableFullRefreshCheckpointReader,
 34from airbyte_cdk.sources.types import StreamSlice
 36# list of all possible HTTP methods which can be used for sending of request bodies
 37from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, ResourceSchemaLoader
 38from airbyte_cdk.sources.utils.slice_logger import DebugSliceLogger, SliceLogger
 39from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
 41# A stream's read method can return one of the following types:
 42# Mapping[str, Any]: The content of an AirbyteRecordMessage
 43# AirbyteMessage: An AirbyteMessage. Could be of any type
 44StreamData = Union[Mapping[str, Any], AirbyteMessage]
 46JsonSchema = Mapping[str, Any]
 48NO_CURSOR_STATE_KEY = "__ab_no_cursor_state_message"
 51def package_name_from_class(cls: object) -> str:
 52    """Find the package name given a class name"""
 53    module = inspect.getmodule(cls)
 54    if module is not None:
 55        return module.__name__.split(".")[0]
 56    else:
 57        raise ValueError(f"Could not find package name for class {cls}")
 60class CheckpointMixin(ABC):
 61    """Mixin for a stream that implements reading and writing the internal state used to checkpoint sync progress to the platform
 63    class CheckpointedStream(Stream, CheckpointMixin):
 64        @property
 65        def state(self):
 66            return self._state
 68        @state.setter
 69        def state(self, value):
 70            self._state[self.cursor_field] = value[self.cursor_field]
 71    """
 73    @property
 74    @abstractmethod
 75    def state(self) -> MutableMapping[str, Any]:
 76        """State getter, should return state in form that can serialized to a string and send to the output
 77        as a STATE AirbyteMessage.
 79        A good example of a state is a cursor_value:
 80            {
 81                self.cursor_field: "cursor_value"
 82            }
 84         State should try to be as small as possible but at the same time descriptive enough to restore
 85         syncing process from the point where it stopped.
 86        """
 88    @state.setter
 89    @abstractmethod
 90    def state(self, value: MutableMapping[str, Any]) -> None:
 91        """State setter, accept state serialized by state getter."""
 95    "Deprecated as of CDK version 0.87.0. "
 96    "Deprecated in favor of the `CheckpointMixin` which offers similar functionality."
 98class IncrementalMixin(CheckpointMixin, ABC):
 99    """Mixin to make stream incremental.
101    class IncrementalStream(Stream, IncrementalMixin):
102        @property
103        def state(self):
104            return self._state
106        @state.setter
107        def state(self, value):
108            self._state[self.cursor_field] = value[self.cursor_field]
109    """
113class StreamClassification:
114    is_legacy_format: bool
115    has_multiple_slices: bool
118class Stream(ABC):
119    """
120    Base abstract class for an Airbyte Stream. Makes no assumption of the Stream's underlying transport protocol.
121    """
123    _configured_json_schema: Optional[Dict[str, Any]] = None
124    _exit_on_rate_limit: bool = False
126    # Use self.logger in subclasses to log any messages
127    @property
128    def logger(self) -> logging.Logger:
129        return logging.getLogger(f"airbyte.streams.{}")
131    # TypeTransformer object to perform output data transformation
132    transformer: TypeTransformer = TypeTransformer(TransformConfig.NoTransform)
134    cursor: Optional[Cursor] = None
136    has_multiple_slices = False
138    @cached_property
139    def name(self) -> str:
140        """
141        :return: Stream name. By default this is the implementing class name, but it can be overridden as needed.
142        """
143        return casing.camel_to_snake(self.__class__.__name__)
145    def get_error_display_message(self, exception: BaseException) -> Optional[str]:
146        """
147        Retrieves the user-friendly display message that corresponds to an exception.
148        This will be called when encountering an exception while reading records from the stream, and used to build the AirbyteTraceMessage.
150        The default implementation of this method does not return user-friendly messages for any exception type, but it should be overriden as needed.
152        :param exception: The exception that was raised
153        :return: A user-friendly message that indicates the cause of the error
154        """
155        return None
157    def read(  # type: ignore  # ignoring typing for ConnectorStateManager because of circular dependencies
158        self,
159        configured_stream: ConfiguredAirbyteStream,
160        logger: logging.Logger,
161        slice_logger: SliceLogger,
162        stream_state: MutableMapping[str, Any],
163        state_manager,
164        internal_config: InternalConfig,
165    ) -> Iterable[StreamData]:
166        sync_mode = configured_stream.sync_mode
167        cursor_field = configured_stream.cursor_field
168        self.configured_json_schema =
170        # WARNING: When performing a read() that uses incoming stream state, we MUST use the self.state that is defined as
171        # opposed to the incoming stream_state value. Because some connectors like ones using the file-based CDK modify
172        # state before setting the value on the Stream attribute, the most up-to-date state is derived from Stream.state
173        # instead of the stream_state parameter. This does not apply to legacy connectors using get_updated_state().
174        try:
175            stream_state = self.state  # type: ignore # we know the field might not exist...
176        except AttributeError:
177            pass
179        should_checkpoint = bool(state_manager)
180        checkpoint_reader = self._get_checkpoint_reader(
181            logger=logger, cursor_field=cursor_field, sync_mode=sync_mode, stream_state=stream_state
182        )
184        next_slice =
185        record_counter = 0
186        stream_state_tracker = copy.deepcopy(stream_state)
187        while next_slice is not None:
188            if slice_logger.should_log_slice_message(logger):
189                yield slice_logger.create_slice_log_message(next_slice)
190            records = self.read_records(
191                sync_mode=sync_mode,  # todo: change this interface to no longer rely on sync_mode for behavior
192                stream_slice=next_slice,
193                stream_state=stream_state,
194                cursor_field=cursor_field or None,
195            )
196            for record_data_or_message in records:
197                yield record_data_or_message
198                if isinstance(record_data_or_message, Mapping) or (
199                    hasattr(record_data_or_message, "type")
200                    and record_data_or_message.type == MessageType.RECORD
201                ):
202                    record_data = (
203                        record_data_or_message
204                        if isinstance(record_data_or_message, Mapping)
205                        else record_data_or_message.record
206                    )
208                    # Thanks I hate it. RFR fundamentally doesn't fit with the concept of the legacy Stream.get_updated_state()
209                    # method because RFR streams rely on pagination as a cursor. Stream.get_updated_state() was designed to make
210                    # the CDK manage state using specifically the last seen record. don't @ brian.lai
211                    #
212                    # Also, because the legacy incremental state case decouples observing incoming records from emitting state, it
213                    # requires that we separate CheckpointReader.observe() and CheckpointReader.get_checkpoint() which could
214                    # otherwise be combined.
215                    if self.cursor_field:
216                        # Some connectors have streams that implement get_updated_state(), but do not define a cursor_field. This
217                        # should be fixed on the stream implementation, but we should also protect against this in the CDK as well
218                        stream_state_tracker = self.get_updated_state(
219                            stream_state_tracker,
220                            record_data,  # type: ignore [arg-type]
221                        )
222                        self._observe_state(checkpoint_reader, stream_state_tracker)
223                    record_counter += 1
225                    checkpoint_interval = self.state_checkpoint_interval
226                    if (
227                        should_checkpoint
228                        and checkpoint_interval
229                        and record_counter % checkpoint_interval == 0
230                    ):
231                        checkpoint = checkpoint_reader.get_checkpoint()
232                        if checkpoint:
233                            airbyte_state_message = self._checkpoint_state(
234                                checkpoint, state_manager=state_manager
235                            )
236                            yield airbyte_state_message
238                    if internal_config.is_limit_reached(record_counter):
239                        break
240            self._observe_state(checkpoint_reader)
241            checkpoint_state = checkpoint_reader.get_checkpoint()
242            if should_checkpoint and checkpoint_state is not None:
243                airbyte_state_message = self._checkpoint_state(
244                    checkpoint_state, state_manager=state_manager
245                )
246                yield airbyte_state_message
248            next_slice =
250        checkpoint = checkpoint_reader.get_checkpoint()
251        if should_checkpoint and checkpoint is not None:
252            airbyte_state_message = self._checkpoint_state(checkpoint, state_manager=state_manager)
253            yield airbyte_state_message
255    def read_only_records(self, state: Optional[Mapping[str, Any]] = None) -> Iterable[StreamData]:
256        """
257        Helper method that performs a read on a stream with an optional state and emits records. If the parent stream supports
258        incremental, this operation does not update the stream's internal state (if it uses the modern state setter/getter)
259        or emit state messages.
260        """
262        configured_stream = ConfiguredAirbyteStream(
263            stream=AirbyteStream(
264      ,
265                json_schema={},
266                supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental],
267            ),
268            sync_mode=SyncMode.incremental if state else SyncMode.full_refresh,
269            destination_sync_mode=DestinationSyncMode.append,
270        )
272        yield from
273            configured_stream=configured_stream,
274            logger=self.logger,
275            slice_logger=DebugSliceLogger(),
276            stream_state=dict(state)
277            if state
278            else {},  # read() expects MutableMapping instead of Mapping which is used more often
279            state_manager=None,
280            internal_config=InternalConfig(),  # type: ignore [call-arg]
281        )
283    @abstractmethod
284    def read_records(
285        self,
286        sync_mode: SyncMode,
287        cursor_field: Optional[List[str]] = None,
288        stream_slice: Optional[Mapping[str, Any]] = None,
289        stream_state: Optional[Mapping[str, Any]] = None,
290    ) -> Iterable[StreamData]:
291        """
292        This method should be overridden by subclasses to read records based on the inputs
293        """
295    @lru_cache(maxsize=None)
296    def get_json_schema(self) -> Mapping[str, Any]:
297        """
298        :return: A dict of the JSON schema representing this stream.
300        The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property.
301        Override as needed.
302        """
303        # TODO show an example of using pydantic to define the JSON schema, or reading an OpenAPI spec
304        return ResourceSchemaLoader(package_name_from_class(self.__class__)).get_schema(
306    def as_airbyte_stream(self) -> AirbyteStream:
307        stream = AirbyteStream(
308  ,
309            json_schema=dict(self.get_json_schema()),
310            supported_sync_modes=[SyncMode.full_refresh],
311            is_resumable=self.is_resumable,
312        )
314        if self.namespace:
315            stream.namespace = self.namespace
317        # If we can offer incremental we always should. RFR is always less reliable than incremental which uses a real cursor value
318        if self.supports_incremental:
319            stream.source_defined_cursor = self.source_defined_cursor
320            stream.supported_sync_modes.append(SyncMode.incremental)
321            stream.default_cursor_field = self._wrapped_cursor_field()
323        keys = Stream._wrapped_primary_key(self.primary_key)
324        if keys and len(keys) > 0:
325            stream.source_defined_primary_key = keys
327        return stream
329    @property
330    def supports_incremental(self) -> bool:
331        """
332        :return: True if this stream supports incrementally reading data
333        """
334        return len(self._wrapped_cursor_field()) > 0
336    @property
337    def is_resumable(self) -> bool:
338        """
339        :return: True if this stream allows the checkpointing of sync progress and can resume from it on subsequent attempts.
340        This differs from supports_incremental because certain kinds of streams like those supporting resumable full refresh
341        can checkpoint progress in between attempts for improved fault tolerance. However, they will start from the beginning
342        on the next sync job.
343        """
344        if self.supports_incremental:
345            return True
346        if self.has_multiple_slices:
347            # We temporarily gate substream to not support RFR because puts a pretty high burden on connector developers
348            # to structure stream state in a very specific way. We also can't check for issubclass(HttpSubStream) because
349            # not all substreams implement the interface and it would be a circular dependency so we use parent as a surrogate
350            return False
351        elif hasattr(type(self), "state") and getattr(type(self), "state").fset is not None:
352            # Modern case where a stream manages state using getter/setter
353            return True
354        else:
355            # Legacy case where the CDK manages state via the get_updated_state() method. This is determined by checking if
356            # the stream's get_updated_state() differs from the Stream class and therefore has been overridden
357            return type(self).get_updated_state != Stream.get_updated_state
359    def _wrapped_cursor_field(self) -> List[str]:
360        return [self.cursor_field] if isinstance(self.cursor_field, str) else self.cursor_field
362    @property
363    def cursor_field(self) -> Union[str, List[str]]:
364        """
365        Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
366        :return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.
367        """
368        return []
370    @property
371    def namespace(self) -> Optional[str]:
372        """
373        Override to return the namespace of this stream, e.g. the Postgres schema which this stream will emit records for.
374        :return: A string containing the name of the namespace.
375        """
376        return None
378    @property
379    def source_defined_cursor(self) -> bool:
380        """
381        Return False if the cursor can be configured by the user.
382        """
383        return True
385    @property
386    def exit_on_rate_limit(self) -> bool:
387        """Exit on rate limit getter, should return bool value. False if the stream will retry endlessly when rate limited."""
388        return self._exit_on_rate_limit
390    @exit_on_rate_limit.setter
391    def exit_on_rate_limit(self, value: bool) -> None:
392        """Exit on rate limit setter, accept bool value."""
393        self._exit_on_rate_limit = value
395    @property
396    @abstractmethod
397    def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
398        """
399        :return: string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields.
400          If the stream has no primary keys, return None.
401        """
403    def stream_slices(
404        self,
405        *,
406        sync_mode: SyncMode,
407        cursor_field: Optional[List[str]] = None,
408        stream_state: Optional[Mapping[str, Any]] = None,
409    ) -> Iterable[Optional[Mapping[str, Any]]]:
410        """
411        Override to define the slices for this stream. See the stream slicing section of the docs for more information.
413        :param sync_mode:
414        :param cursor_field:
415        :param stream_state:
416        :return:
417        """
418        yield StreamSlice(partition={}, cursor_slice={})
420    @property
421    def state_checkpoint_interval(self) -> Optional[int]:
422        """
423        Decides how often to checkpoint state (i.e: emit a STATE message). E.g: if this returns a value of 100, then state is persisted after reading
424        100 records, then 200, 300, etc.. A good default value is 1000 although your mileage may vary depending on the underlying data source.
426        Checkpointing a stream avoids re-reading records in the case a sync is failed or cancelled.
428        return None if state should not be checkpointed e.g: because records returned from the underlying data source are not returned in
429        ascending order with respect to the cursor field. This can happen if the source does not support reading records in ascending order of
430        created_at date (or whatever the cursor is). In those cases, state must only be saved once the full stream has been read.
431        """
432        return None
434    # Commented-out to avoid any runtime penalty, since this is used in a hot per-record codepath.
435    # To be evaluated for re-introduction here:
436    # @deprecated(
437    #     "Deprecated method `get_updated_state` as of CDK version 0.1.49. "
438    #     "Please use explicit state property instead, see `IncrementalMixin` docs."
439    # )
440    def get_updated_state(
441        self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]
442    ) -> MutableMapping[str, Any]:
443        """DEPRECATED. Please use explicit state property instead, see `IncrementalMixin` docs.
445        Override to extract state from the latest record. Needed to implement incremental sync.
447        Inspects the latest record extracted from the data source and the current state object and return an updated state object.
449        For example: if the state object is based on created_at timestamp, and the current state is {'created_at': 10}, and the latest_record is
450        {'name': 'octavia', 'created_at': 20 } then this method would return {'created_at': 20} to indicate state should be updated to this object.
452        :param current_stream_state: The stream's current state object
453        :param latest_record: The latest record extracted from the stream
454        :return: An updated state object
455        """
456        return {}
458    def get_cursor(self) -> Optional[Cursor]:
459        """
460        A Cursor is an interface that a stream can implement to manage how its internal state is read and updated while
461        reading records. Historically, Python connectors had no concept of a cursor to manage state. Python streams need
462        to define a cursor implementation and override this method to manage state through a Cursor.
463        """
464        return self.cursor
466    def _get_checkpoint_reader(
467        self,
468        logger: logging.Logger,
469        cursor_field: Optional[List[str]],
470        sync_mode: SyncMode,
471        stream_state: MutableMapping[str, Any],
472    ) -> CheckpointReader:
473        mappings_or_slices = self.stream_slices(
474            cursor_field=cursor_field,
475            sync_mode=sync_mode,  # todo: change this interface to no longer rely on sync_mode for behavior
476            stream_state=stream_state,
477        )
479        # Because of poor foresight, we wrote the default Stream.stream_slices() method to return [None] which is confusing and
480        # has now normalized this behavior for connector developers. Now some connectors return [None]. This is objectively
481        # misleading and a more ideal interface is [{}] to indicate we still want to iterate over one slice, but with no
482        # specific slice values. None is bad, and now I feel bad that I have to write this hack.
483        if mappings_or_slices == [None]:
484            mappings_or_slices = [{}]
486        slices_iterable_copy, iterable_for_detecting_format = itertools.tee(mappings_or_slices, 2)
487        stream_classification = self._classify_stream(
488            mappings_or_slices=iterable_for_detecting_format
489        )
491        # Streams that override has_multiple_slices are explicitly indicating that they will iterate over
492        # multiple partitions. Inspecting slices to automatically apply the correct cursor is only needed as
493        # a backup. So if this value was already assigned to True by the stream, we don't need to reassign it
494        self.has_multiple_slices = (
495            self.has_multiple_slices or stream_classification.has_multiple_slices
496        )
498        cursor = self.get_cursor()
499        if cursor:
500            cursor.set_initial_state(stream_state=stream_state)
502        checkpoint_mode = self._checkpoint_mode
504        if cursor and stream_classification.is_legacy_format:
505            return LegacyCursorBasedCheckpointReader(
506                stream_slices=slices_iterable_copy, cursor=cursor, read_state_from_cursor=True
507            )
508        elif cursor:
509            return CursorBasedCheckpointReader(
510                stream_slices=slices_iterable_copy,
511                cursor=cursor,
512                read_state_from_cursor=checkpoint_mode == CheckpointMode.RESUMABLE_FULL_REFRESH,
513            )
514        elif checkpoint_mode == CheckpointMode.RESUMABLE_FULL_REFRESH:
515            # Resumable full refresh readers rely on the stream state dynamically being updated during pagination and does
516            # not iterate over a static set of slices.
517            return ResumableFullRefreshCheckpointReader(stream_state=stream_state)
518        elif checkpoint_mode == CheckpointMode.INCREMENTAL:
519            return IncrementalCheckpointReader(
520                stream_slices=slices_iterable_copy, stream_state=stream_state
521            )
522        else:
523            return FullRefreshCheckpointReader(stream_slices=slices_iterable_copy)
525    @property
526    def _checkpoint_mode(self) -> CheckpointMode:
527        if self.is_resumable and len(self._wrapped_cursor_field()) > 0:
528            return CheckpointMode.INCREMENTAL
529        elif self.is_resumable:
530            return CheckpointMode.RESUMABLE_FULL_REFRESH
531        else:
532            return CheckpointMode.FULL_REFRESH
534    @staticmethod
535    def _classify_stream(
536        mappings_or_slices: Iterator[Optional[Union[Mapping[str, Any], StreamSlice]]],
537    ) -> StreamClassification:
538        """
539        This is a bit of a crazy solution, but also the only way we can detect certain attributes about the stream since Python
540        streams do not follow consistent implementation patterns. We care about the following two attributes:
541        - is_substream: Helps to incrementally release changes since substreams w/ parents are much more complicated. Also
542          helps de-risk the release of changes that might impact all connectors
543        - uses_legacy_slice_format: Since the checkpoint reader must manage a complex state object, we opted to have it always
544          use the structured StreamSlice object. However, this requires backwards compatibility with Python sources that only
545          support the legacy mapping object
547        Both attributes can eventually be deprecated once stream's define this method deleted once substreams have been implemented and
548        legacy connectors all adhere to the StreamSlice object.
549        """
550        if not mappings_or_slices:
551            raise ValueError("A stream should always have at least one slice")
552        try:
553            next_slice = next(mappings_or_slices)
554            if isinstance(next_slice, StreamSlice) and next_slice == StreamSlice(
555                partition={}, cursor_slice={}
556            ):
557                is_legacy_format = False
558                slice_has_value = False
559            elif next_slice == {}:
560                is_legacy_format = True
561                slice_has_value = False
562            elif isinstance(next_slice, StreamSlice):
563                is_legacy_format = False
564                slice_has_value = True
565            else:
566                is_legacy_format = True
567                slice_has_value = True
568        except StopIteration:
569            # If the stream has no slices, the format ultimately does not matter since no data will get synced. This is technically
570            # a valid case because it is up to the stream to define its slicing behavior
571            return StreamClassification(is_legacy_format=False, has_multiple_slices=False)
573        if slice_has_value:
574            # If the first slice contained a partition value from the result of stream_slices(), this is a substream that might
575            # have multiple parent records to iterate over
576            return StreamClassification(
577                is_legacy_format=is_legacy_format, has_multiple_slices=slice_has_value
578            )
580        try:
581            # If stream_slices() returns multiple slices, this is also a substream that can potentially generate empty slices
582            next(mappings_or_slices)
583            return StreamClassification(is_legacy_format=is_legacy_format, has_multiple_slices=True)
584        except StopIteration:
585            # If the result of stream_slices() only returns a single empty stream slice, then we know this is a regular stream
586            return StreamClassification(
587                is_legacy_format=is_legacy_format, has_multiple_slices=False
588            )
590    def log_stream_sync_configuration(self) -> None:
591        """
592        Logs the configuration of this stream.
593        """
594        self.logger.debug(
595            f"Syncing stream instance: {}",
596            extra={
597                "primary_key": self.primary_key,
598                "cursor_field": self.cursor_field,
599            },
600        )
602    @staticmethod
603    def _wrapped_primary_key(
604        keys: Optional[Union[str, List[str], List[List[str]]]],
605    ) -> Optional[List[List[str]]]:
606        """
607        :return: wrap the primary_key property in a list of list of strings required by the Airbyte Stream object.
608        """
609        if not keys:
610            return None
612        if isinstance(keys, str):
613            return [[keys]]
614        elif isinstance(keys, list):
615            wrapped_keys = []
616            for component in keys:
617                if isinstance(component, str):
618                    wrapped_keys.append([component])
619                elif isinstance(component, list):
620                    wrapped_keys.append(component)
621                else:
622                    raise ValueError(f"Element must be either list or str. Got: {type(component)}")
623            return wrapped_keys
624        else:
625            raise ValueError(f"Element must be either list or str. Got: {type(keys)}")
627    def _observe_state(
628        self, checkpoint_reader: CheckpointReader, stream_state: Optional[Mapping[str, Any]] = None
629    ) -> None:
630        """
631        Convenience method that attempts to read the Stream's state using the recommended way of connector's managing their
632        own state via state setter/getter. But if we get back an AttributeError, then the legacy Stream.get_updated_state()
633        method is used as a fallback method.
634        """
636        # This is an inversion of the original logic that used to try state getter/setters first. As part of the work to
637        # automatically apply resumable full refresh to all streams, all HttpStream classes implement default state
638        # getter/setter methods, we should default to only using the incoming stream_state parameter value is {} which
639        # indicates the stream does not override the default get_updated_state() implementation. When the default method
640        # is not overridden, then the stream defers to self.state getter
641        if stream_state:
642            checkpoint_reader.observe(stream_state)
643        elif type(self).get_updated_state == Stream.get_updated_state:
644            # We only default to the state getter/setter if the stream does not use the legacy get_updated_state() method
645            try:
646                new_state = self.state  # type: ignore # This will always exist on HttpStreams, but may not for Stream
647                if new_state:
648                    checkpoint_reader.observe(new_state)
649            except AttributeError:
650                pass
652    def _checkpoint_state(  # type: ignore  # ignoring typing for ConnectorStateManager because of circular dependencies
653        self,
654        stream_state: Mapping[str, Any],
655        state_manager,
656    ) -> AirbyteMessage:
657        # todo: This can be consolidated into one ConnectorStateManager.update_and_create_state_message() method, but I want
658        #  to reduce changes right now and this would span concurrent as well
659        state_manager.update_state_for_stream(, self.namespace, stream_state)
660        return state_manager.create_state_message(, self.namespace)  # type: ignore [no-any-return]
662    @property
663    def configured_json_schema(self) -> Optional[Dict[str, Any]]:
664        """
665        This property is set from the read method.
667        :return Optional[Dict]: JSON schema from configured catalog if provided, otherwise None.
668        """
669        return self._configured_json_schema
671    @configured_json_schema.setter
672    def configured_json_schema(self, json_schema: Dict[str, Any]) -> None:
673        self._configured_json_schema = self._filter_schema_invalid_properties(json_schema)
675    def _filter_schema_invalid_properties(
676        self, configured_catalog_json_schema: Dict[str, Any]
677    ) -> Dict[str, Any]:
678        """
679        Filters the properties in json_schema that are not present in the stream schema.
680        Configured Schemas can have very old fields, so we need to housekeeping ourselves.
681        """
682        configured_schema: Any = configured_catalog_json_schema.get("properties", {})
683        stream_schema_properties: Any = self.get_json_schema().get("properties", {})
685        configured_keys = configured_schema.keys()
686        stream_keys = stream_schema_properties.keys()
687        invalid_properties = configured_keys - stream_keys
688        if not invalid_properties:
689            return configured_catalog_json_schema
691        self.logger.warning(
692            f"Stream {}: the following fields are deprecated and cannot be synced. {invalid_properties}. Refresh the connection's source schema to resolve this warning."
693        )
695        valid_configured_schema_properties_keys = stream_keys & configured_keys
696        valid_configured_schema_properties = {}
698        for configured_schema_property in valid_configured_schema_properties_keys:
699            valid_configured_schema_properties[configured_schema_property] = (
700                stream_schema_properties[configured_schema_property]
701            )
703        return {**configured_catalog_json_schema, "properties": valid_configured_schema_properties}
StreamData = typing.Union[typing.Mapping[str, typing.Any], airbyte_cdk.AirbyteMessage]
JsonSchema = typing.Mapping[str, typing.Any]
NO_CURSOR_STATE_KEY = '__ab_no_cursor_state_message'
def package_name_from_class(cls: object) -> str:
52def package_name_from_class(cls: object) -> str:
53    """Find the package name given a class name"""
54    module = inspect.getmodule(cls)
55    if module is not None:
56        return module.__name__.split(".")[0]
57    else:
58        raise ValueError(f"Could not find package name for class {cls}")

Find the package name given a class name

class CheckpointMixin(abc.ABC):
61class CheckpointMixin(ABC):
62    """Mixin for a stream that implements reading and writing the internal state used to checkpoint sync progress to the platform
64    class CheckpointedStream(Stream, CheckpointMixin):
65        @property
66        def state(self):
67            return self._state
69        @state.setter
70        def state(self, value):
71            self._state[self.cursor_field] = value[self.cursor_field]
72    """
74    @property
75    @abstractmethod
76    def state(self) -> MutableMapping[str, Any]:
77        """State getter, should return state in form that can serialized to a string and send to the output
78        as a STATE AirbyteMessage.
80        A good example of a state is a cursor_value:
81            {
82                self.cursor_field: "cursor_value"
83            }
85         State should try to be as small as possible but at the same time descriptive enough to restore
86         syncing process from the point where it stopped.
87        """
89    @state.setter
90    @abstractmethod
91    def state(self, value: MutableMapping[str, Any]) -> None:
92        """State setter, accept state serialized by state getter."""

Mixin for a stream that implements reading and writing the internal state used to checkpoint sync progress to the platform

class CheckpointedStream(Stream, CheckpointMixin): @property def state(self): return self._state

def state(self, value):
    self._state[self.cursor_field] = value[self.cursor_field]
state: MutableMapping[str, Any]
74    @property
75    @abstractmethod
76    def state(self) -> MutableMapping[str, Any]:
77        """State getter, should return state in form that can serialized to a string and send to the output
78        as a STATE AirbyteMessage.
80        A good example of a state is a cursor_value:
81            {
82                self.cursor_field: "cursor_value"
83            }
85         State should try to be as small as possible but at the same time descriptive enough to restore
86         syncing process from the point where it stopped.
87        """

State getter, should return state in form that can serialized to a string and send to the output as a STATE AirbyteMessage.

A good example of a state is a cursor_value: { self.cursor_field: "cursor_value" }

State should try to be as small as possible but at the same time descriptive enough to restore syncing process from the point where it stopped.

@deprecated('Deprecated as of CDK version 0.87.0. Deprecated in favor of the `CheckpointMixin` which offers similar functionality.')
class IncrementalMixin(CheckpointMixin, abc.ABC):
 96    "Deprecated as of CDK version 0.87.0. "
 97    "Deprecated in favor of the `CheckpointMixin` which offers similar functionality."
 99class IncrementalMixin(CheckpointMixin, ABC):
100    """Mixin to make stream incremental.
102    class IncrementalStream(Stream, IncrementalMixin):
103        @property
104        def state(self):
105            return self._state
107        @state.setter
108        def state(self, value):
109            self._state[self.cursor_field] = value[self.cursor_field]
110    """

Mixin to make stream incremental.

class IncrementalStream(Stream, IncrementalMixin): @property def state(self): return self._state

def state(self, value):
    self._state[self.cursor_field] = value[self.cursor_field]
Inherited Members
class StreamClassification:
114class StreamClassification:
115    is_legacy_format: bool
116    has_multiple_slices: bool
StreamClassification(is_legacy_format: bool, has_multiple_slices: bool)
is_legacy_format: bool
has_multiple_slices: bool
class Stream(abc.ABC):
119class Stream(ABC):
120    """
121    Base abstract class for an Airbyte Stream. Makes no assumption of the Stream's underlying transport protocol.
122    """
124    _configured_json_schema: Optional[Dict[str, Any]] = None
125    _exit_on_rate_limit: bool = False
127    # Use self.logger in subclasses to log any messages
128    @property
129    def logger(self) -> logging.Logger:
130        return logging.getLogger(f"airbyte.streams.{}")
132    # TypeTransformer object to perform output data transformation
133    transformer: TypeTransformer = TypeTransformer(TransformConfig.NoTransform)
135    cursor: Optional[Cursor] = None
137    has_multiple_slices = False
139    @cached_property
140    def name(self) -> str:
141        """
142        :return: Stream name. By default this is the implementing class name, but it can be overridden as needed.
143        """
144        return casing.camel_to_snake(self.__class__.__name__)
146    def get_error_display_message(self, exception: BaseException) -> Optional[str]:
147        """
148        Retrieves the user-friendly display message that corresponds to an exception.
149        This will be called when encountering an exception while reading records from the stream, and used to build the AirbyteTraceMessage.
151        The default implementation of this method does not return user-friendly messages for any exception type, but it should be overriden as needed.
153        :param exception: The exception that was raised
154        :return: A user-friendly message that indicates the cause of the error
155        """
156        return None
158    def read(  # type: ignore  # ignoring typing for ConnectorStateManager because of circular dependencies
159        self,
160        configured_stream: ConfiguredAirbyteStream,
161        logger: logging.Logger,
162        slice_logger: SliceLogger,
163        stream_state: MutableMapping[str, Any],
164        state_manager,
165        internal_config: InternalConfig,
166    ) -> Iterable[StreamData]:
167        sync_mode = configured_stream.sync_mode
168        cursor_field = configured_stream.cursor_field
169        self.configured_json_schema =
171        # WARNING: When performing a read() that uses incoming stream state, we MUST use the self.state that is defined as
172        # opposed to the incoming stream_state value. Because some connectors like ones using the file-based CDK modify
173        # state before setting the value on the Stream attribute, the most up-to-date state is derived from Stream.state
174        # instead of the stream_state parameter. This does not apply to legacy connectors using get_updated_state().
175        try:
176            stream_state = self.state  # type: ignore # we know the field might not exist...
177        except AttributeError:
178            pass
180        should_checkpoint = bool(state_manager)
181        checkpoint_reader = self._get_checkpoint_reader(
182            logger=logger, cursor_field=cursor_field, sync_mode=sync_mode, stream_state=stream_state
183        )
185        next_slice =
186        record_counter = 0
187        stream_state_tracker = copy.deepcopy(stream_state)
188        while next_slice is not None:
189            if slice_logger.should_log_slice_message(logger):
190                yield slice_logger.create_slice_log_message(next_slice)
191            records = self.read_records(
192                sync_mode=sync_mode,  # todo: change this interface to no longer rely on sync_mode for behavior
193                stream_slice=next_slice,
194                stream_state=stream_state,
195                cursor_field=cursor_field or None,
196            )
197            for record_data_or_message in records:
198                yield record_data_or_message
199                if isinstance(record_data_or_message, Mapping) or (
200                    hasattr(record_data_or_message, "type")
201                    and record_data_or_message.type == MessageType.RECORD
202                ):
203                    record_data = (
204                        record_data_or_message
205                        if isinstance(record_data_or_message, Mapping)
206                        else record_data_or_message.record
207                    )
209                    # Thanks I hate it. RFR fundamentally doesn't fit with the concept of the legacy Stream.get_updated_state()
210                    # method because RFR streams rely on pagination as a cursor. Stream.get_updated_state() was designed to make
211                    # the CDK manage state using specifically the last seen record. don't @ brian.lai
212                    #
213                    # Also, because the legacy incremental state case decouples observing incoming records from emitting state, it
214                    # requires that we separate CheckpointReader.observe() and CheckpointReader.get_checkpoint() which could
215                    # otherwise be combined.
216                    if self.cursor_field:
217                        # Some connectors have streams that implement get_updated_state(), but do not define a cursor_field. This
218                        # should be fixed on the stream implementation, but we should also protect against this in the CDK as well
219                        stream_state_tracker = self.get_updated_state(
220                            stream_state_tracker,
221                            record_data,  # type: ignore [arg-type]
222                        )
223                        self._observe_state(checkpoint_reader, stream_state_tracker)
224                    record_counter += 1
226                    checkpoint_interval = self.state_checkpoint_interval
227                    if (
228                        should_checkpoint
229                        and checkpoint_interval
230                        and record_counter % checkpoint_interval == 0
231                    ):
232                        checkpoint = checkpoint_reader.get_checkpoint()
233                        if checkpoint:
234                            airbyte_state_message = self._checkpoint_state(
235                                checkpoint, state_manager=state_manager
236                            )
237                            yield airbyte_state_message
239                    if internal_config.is_limit_reached(record_counter):
240                        break
241            self._observe_state(checkpoint_reader)
242            checkpoint_state = checkpoint_reader.get_checkpoint()
243            if should_checkpoint and checkpoint_state is not None:
244                airbyte_state_message = self._checkpoint_state(
245                    checkpoint_state, state_manager=state_manager
246                )
247                yield airbyte_state_message
249            next_slice =
251        checkpoint = checkpoint_reader.get_checkpoint()
252        if should_checkpoint and checkpoint is not None:
253            airbyte_state_message = self._checkpoint_state(checkpoint, state_manager=state_manager)
254            yield airbyte_state_message
256    def read_only_records(self, state: Optional[Mapping[str, Any]] = None) -> Iterable[StreamData]:
257        """
258        Helper method that performs a read on a stream with an optional state and emits records. If the parent stream supports
259        incremental, this operation does not update the stream's internal state (if it uses the modern state setter/getter)
260        or emit state messages.
261        """
263        configured_stream = ConfiguredAirbyteStream(
264            stream=AirbyteStream(
265      ,
266                json_schema={},
267                supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental],
268            ),
269            sync_mode=SyncMode.incremental if state else SyncMode.full_refresh,
270            destination_sync_mode=DestinationSyncMode.append,
271        )
273        yield from
274            configured_stream=configured_stream,
275            logger=self.logger,
276            slice_logger=DebugSliceLogger(),
277            stream_state=dict(state)
278            if state
279            else {},  # read() expects MutableMapping instead of Mapping which is used more often
280            state_manager=None,
281            internal_config=InternalConfig(),  # type: ignore [call-arg]
282        )
284    @abstractmethod
285    def read_records(
286        self,
287        sync_mode: SyncMode,
288        cursor_field: Optional[List[str]] = None,
289        stream_slice: Optional[Mapping[str, Any]] = None,
290        stream_state: Optional[Mapping[str, Any]] = None,
291    ) -> Iterable[StreamData]:
292        """
293        This method should be overridden by subclasses to read records based on the inputs
294        """
296    @lru_cache(maxsize=None)
297    def get_json_schema(self) -> Mapping[str, Any]:
298        """
299        :return: A dict of the JSON schema representing this stream.
301        The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property.
302        Override as needed.
303        """
304        # TODO show an example of using pydantic to define the JSON schema, or reading an OpenAPI spec
305        return ResourceSchemaLoader(package_name_from_class(self.__class__)).get_schema(
307    def as_airbyte_stream(self) -> AirbyteStream:
308        stream = AirbyteStream(
309  ,
310            json_schema=dict(self.get_json_schema()),
311            supported_sync_modes=[SyncMode.full_refresh],
312            is_resumable=self.is_resumable,
313        )
315        if self.namespace:
316            stream.namespace = self.namespace
318        # If we can offer incremental we always should. RFR is always less reliable than incremental which uses a real cursor value
319        if self.supports_incremental:
320            stream.source_defined_cursor = self.source_defined_cursor
321            stream.supported_sync_modes.append(SyncMode.incremental)
322            stream.default_cursor_field = self._wrapped_cursor_field()
324        keys = Stream._wrapped_primary_key(self.primary_key)
325        if keys and len(keys) > 0:
326            stream.source_defined_primary_key = keys
328        return stream
330    @property
331    def supports_incremental(self) -> bool:
332        """
333        :return: True if this stream supports incrementally reading data
334        """
335        return len(self._wrapped_cursor_field()) > 0
337    @property
338    def is_resumable(self) -> bool:
339        """
340        :return: True if this stream allows the checkpointing of sync progress and can resume from it on subsequent attempts.
341        This differs from supports_incremental because certain kinds of streams like those supporting resumable full refresh
342        can checkpoint progress in between attempts for improved fault tolerance. However, they will start from the beginning
343        on the next sync job.
344        """
345        if self.supports_incremental:
346            return True
347        if self.has_multiple_slices:
348            # We temporarily gate substream to not support RFR because puts a pretty high burden on connector developers
349            # to structure stream state in a very specific way. We also can't check for issubclass(HttpSubStream) because
350            # not all substreams implement the interface and it would be a circular dependency so we use parent as a surrogate
351            return False
352        elif hasattr(type(self), "state") and getattr(type(self), "state").fset is not None:
353            # Modern case where a stream manages state using getter/setter
354            return True
355        else:
356            # Legacy case where the CDK manages state via the get_updated_state() method. This is determined by checking if
357            # the stream's get_updated_state() differs from the Stream class and therefore has been overridden
358            return type(self).get_updated_state != Stream.get_updated_state
360    def _wrapped_cursor_field(self) -> List[str]:
361        return [self.cursor_field] if isinstance(self.cursor_field, str) else self.cursor_field
363    @property
364    def cursor_field(self) -> Union[str, List[str]]:
365        """
366        Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
367        :return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.
368        """
369        return []
371    @property
372    def namespace(self) -> Optional[str]:
373        """
374        Override to return the namespace of this stream, e.g. the Postgres schema which this stream will emit records for.
375        :return: A string containing the name of the namespace.
376        """
377        return None
379    @property
380    def source_defined_cursor(self) -> bool:
381        """
382        Return False if the cursor can be configured by the user.
383        """
384        return True
386    @property
387    def exit_on_rate_limit(self) -> bool:
388        """Exit on rate limit getter, should return bool value. False if the stream will retry endlessly when rate limited."""
389        return self._exit_on_rate_limit
391    @exit_on_rate_limit.setter
392    def exit_on_rate_limit(self, value: bool) -> None:
393        """Exit on rate limit setter, accept bool value."""
394        self._exit_on_rate_limit = value
396    @property
397    @abstractmethod
398    def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
399        """
400        :return: string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields.
401          If the stream has no primary keys, return None.
402        """
404    def stream_slices(
405        self,
406        *,
407        sync_mode: SyncMode,
408        cursor_field: Optional[List[str]] = None,
409        stream_state: Optional[Mapping[str, Any]] = None,
410    ) -> Iterable[Optional[Mapping[str, Any]]]:
411        """
412        Override to define the slices for this stream. See the stream slicing section of the docs for more information.
414        :param sync_mode:
415        :param cursor_field:
416        :param stream_state:
417        :return:
418        """
419        yield StreamSlice(partition={}, cursor_slice={})
421    @property
422    def state_checkpoint_interval(self) -> Optional[int]:
423        """
424        Decides how often to checkpoint state (i.e: emit a STATE message). E.g: if this returns a value of 100, then state is persisted after reading
425        100 records, then 200, 300, etc.. A good default value is 1000 although your mileage may vary depending on the underlying data source.
427        Checkpointing a stream avoids re-reading records in the case a sync is failed or cancelled.
429        return None if state should not be checkpointed e.g: because records returned from the underlying data source are not returned in
430        ascending order with respect to the cursor field. This can happen if the source does not support reading records in ascending order of
431        created_at date (or whatever the cursor is). In those cases, state must only be saved once the full stream has been read.
432        """
433        return None
435    # Commented-out to avoid any runtime penalty, since this is used in a hot per-record codepath.
436    # To be evaluated for re-introduction here:
437    # @deprecated(
438    #     "Deprecated method `get_updated_state` as of CDK version 0.1.49. "
439    #     "Please use explicit state property instead, see `IncrementalMixin` docs."
440    # )
441    def get_updated_state(
442        self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]
443    ) -> MutableMapping[str, Any]:
444        """DEPRECATED. Please use explicit state property instead, see `IncrementalMixin` docs.
446        Override to extract state from the latest record. Needed to implement incremental sync.
448        Inspects the latest record extracted from the data source and the current state object and return an updated state object.
450        For example: if the state object is based on created_at timestamp, and the current state is {'created_at': 10}, and the latest_record is
451        {'name': 'octavia', 'created_at': 20 } then this method would return {'created_at': 20} to indicate state should be updated to this object.
453        :param current_stream_state: The stream's current state object
454        :param latest_record: The latest record extracted from the stream
455        :return: An updated state object
456        """
457        return {}
459    def get_cursor(self) -> Optional[Cursor]:
460        """
461        A Cursor is an interface that a stream can implement to manage how its internal state is read and updated while
462        reading records. Historically, Python connectors had no concept of a cursor to manage state. Python streams need
463        to define a cursor implementation and override this method to manage state through a Cursor.
464        """
465        return self.cursor
467    def _get_checkpoint_reader(
468        self,
469        logger: logging.Logger,
470        cursor_field: Optional[List[str]],
471        sync_mode: SyncMode,
472        stream_state: MutableMapping[str, Any],
473    ) -> CheckpointReader:
474        mappings_or_slices = self.stream_slices(
475            cursor_field=cursor_field,
476            sync_mode=sync_mode,  # todo: change this interface to no longer rely on sync_mode for behavior
477            stream_state=stream_state,
478        )
480        # Because of poor foresight, we wrote the default Stream.stream_slices() method to return [None] which is confusing and
481        # has now normalized this behavior for connector developers. Now some connectors return [None]. This is objectively
482        # misleading and a more ideal interface is [{}] to indicate we still want to iterate over one slice, but with no
483        # specific slice values. None is bad, and now I feel bad that I have to write this hack.
484        if mappings_or_slices == [None]:
485            mappings_or_slices = [{}]
487        slices_iterable_copy, iterable_for_detecting_format = itertools.tee(mappings_or_slices, 2)
488        stream_classification = self._classify_stream(
489            mappings_or_slices=iterable_for_detecting_format
490        )
492        # Streams that override has_multiple_slices are explicitly indicating that they will iterate over
493        # multiple partitions. Inspecting slices to automatically apply the correct cursor is only needed as
494        # a backup. So if this value was already assigned to True by the stream, we don't need to reassign it
495        self.has_multiple_slices = (
496            self.has_multiple_slices or stream_classification.has_multiple_slices
497        )
499        cursor = self.get_cursor()
500        if cursor:
501            cursor.set_initial_state(stream_state=stream_state)
503        checkpoint_mode = self._checkpoint_mode
505        if cursor and stream_classification.is_legacy_format:
506            return LegacyCursorBasedCheckpointReader(
507                stream_slices=slices_iterable_copy, cursor=cursor, read_state_from_cursor=True
508            )
509        elif cursor:
510            return CursorBasedCheckpointReader(
511                stream_slices=slices_iterable_copy,
512                cursor=cursor,
513                read_state_from_cursor=checkpoint_mode == CheckpointMode.RESUMABLE_FULL_REFRESH,
514            )
515        elif checkpoint_mode == CheckpointMode.RESUMABLE_FULL_REFRESH:
516            # Resumable full refresh readers rely on the stream state dynamically being updated during pagination and does
517            # not iterate over a static set of slices.
518            return ResumableFullRefreshCheckpointReader(stream_state=stream_state)
519        elif checkpoint_mode == CheckpointMode.INCREMENTAL:
520            return IncrementalCheckpointReader(
521                stream_slices=slices_iterable_copy, stream_state=stream_state
522            )
523        else:
524            return FullRefreshCheckpointReader(stream_slices=slices_iterable_copy)
526    @property
527    def _checkpoint_mode(self) -> CheckpointMode:
528        if self.is_resumable and len(self._wrapped_cursor_field()) > 0:
529            return CheckpointMode.INCREMENTAL
530        elif self.is_resumable:
531            return CheckpointMode.RESUMABLE_FULL_REFRESH
532        else:
533            return CheckpointMode.FULL_REFRESH
535    @staticmethod
536    def _classify_stream(
537        mappings_or_slices: Iterator[Optional[Union[Mapping[str, Any], StreamSlice]]],
538    ) -> StreamClassification:
539        """
540        This is a bit of a crazy solution, but also the only way we can detect certain attributes about the stream since Python
541        streams do not follow consistent implementation patterns. We care about the following two attributes:
542        - is_substream: Helps to incrementally release changes since substreams w/ parents are much more complicated. Also
543          helps de-risk the release of changes that might impact all connectors
544        - uses_legacy_slice_format: Since the checkpoint reader must manage a complex state object, we opted to have it always
545          use the structured StreamSlice object. However, this requires backwards compatibility with Python sources that only
546          support the legacy mapping object
548        Both attributes can eventually be deprecated once stream's define this method deleted once substreams have been implemented and
549        legacy connectors all adhere to the StreamSlice object.
550        """
551        if not mappings_or_slices:
552            raise ValueError("A stream should always have at least one slice")
553        try:
554            next_slice = next(mappings_or_slices)
555            if isinstance(next_slice, StreamSlice) and next_slice == StreamSlice(
556                partition={}, cursor_slice={}
557            ):
558                is_legacy_format = False
559                slice_has_value = False
560            elif next_slice == {}:
561                is_legacy_format = True
562                slice_has_value = False
563            elif isinstance(next_slice, StreamSlice):
564                is_legacy_format = False
565                slice_has_value = True
566            else:
567                is_legacy_format = True
568                slice_has_value = True
569        except StopIteration:
570            # If the stream has no slices, the format ultimately does not matter since no data will get synced. This is technically
571            # a valid case because it is up to the stream to define its slicing behavior
572            return StreamClassification(is_legacy_format=False, has_multiple_slices=False)
574        if slice_has_value:
575            # If the first slice contained a partition value from the result of stream_slices(), this is a substream that might
576            # have multiple parent records to iterate over
577            return StreamClassification(
578                is_legacy_format=is_legacy_format, has_multiple_slices=slice_has_value
579            )
581        try:
582            # If stream_slices() returns multiple slices, this is also a substream that can potentially generate empty slices
583            next(mappings_or_slices)
584            return StreamClassification(is_legacy_format=is_legacy_format, has_multiple_slices=True)
585        except StopIteration:
586            # If the result of stream_slices() only returns a single empty stream slice, then we know this is a regular stream
587            return StreamClassification(
588                is_legacy_format=is_legacy_format, has_multiple_slices=False
589            )
591    def log_stream_sync_configuration(self) -> None:
592        """
593        Logs the configuration of this stream.
594        """
595        self.logger.debug(
596            f"Syncing stream instance: {}",
597            extra={
598                "primary_key": self.primary_key,
599                "cursor_field": self.cursor_field,
600            },
601        )
603    @staticmethod
604    def _wrapped_primary_key(
605        keys: Optional[Union[str, List[str], List[List[str]]]],
606    ) -> Optional[List[List[str]]]:
607        """
608        :return: wrap the primary_key property in a list of list of strings required by the Airbyte Stream object.
609        """
610        if not keys:
611            return None
613        if isinstance(keys, str):
614            return [[keys]]
615        elif isinstance(keys, list):
616            wrapped_keys = []
617            for component in keys:
618                if isinstance(component, str):
619                    wrapped_keys.append([component])
620                elif isinstance(component, list):
621                    wrapped_keys.append(component)
622                else:
623                    raise ValueError(f"Element must be either list or str. Got: {type(component)}")
624            return wrapped_keys
625        else:
626            raise ValueError(f"Element must be either list or str. Got: {type(keys)}")
628    def _observe_state(
629        self, checkpoint_reader: CheckpointReader, stream_state: Optional[Mapping[str, Any]] = None
630    ) -> None:
631        """
632        Convenience method that attempts to read the Stream's state using the recommended way of connector's managing their
633        own state via state setter/getter. But if we get back an AttributeError, then the legacy Stream.get_updated_state()
634        method is used as a fallback method.
635        """
637        # This is an inversion of the original logic that used to try state getter/setters first. As part of the work to
638        # automatically apply resumable full refresh to all streams, all HttpStream classes implement default state
639        # getter/setter methods, we should default to only using the incoming stream_state parameter value is {} which
640        # indicates the stream does not override the default get_updated_state() implementation. When the default method
641        # is not overridden, then the stream defers to self.state getter
642        if stream_state:
643            checkpoint_reader.observe(stream_state)
644        elif type(self).get_updated_state == Stream.get_updated_state:
645            # We only default to the state getter/setter if the stream does not use the legacy get_updated_state() method
646            try:
647                new_state = self.state  # type: ignore # This will always exist on HttpStreams, but may not for Stream
648                if new_state:
649                    checkpoint_reader.observe(new_state)
650            except AttributeError:
651                pass
653    def _checkpoint_state(  # type: ignore  # ignoring typing for ConnectorStateManager because of circular dependencies
654        self,
655        stream_state: Mapping[str, Any],
656        state_manager,
657    ) -> AirbyteMessage:
658        # todo: This can be consolidated into one ConnectorStateManager.update_and_create_state_message() method, but I want
659        #  to reduce changes right now and this would span concurrent as well
660        state_manager.update_state_for_stream(, self.namespace, stream_state)
661        return state_manager.create_state_message(, self.namespace)  # type: ignore [no-any-return]
663    @property
664    def configured_json_schema(self) -> Optional[Dict[str, Any]]:
665        """
666        This property is set from the read method.
668        :return Optional[Dict]: JSON schema from configured catalog if provided, otherwise None.
669        """
670        return self._configured_json_schema
672    @configured_json_schema.setter
673    def configured_json_schema(self, json_schema: Dict[str, Any]) -> None:
674        self._configured_json_schema = self._filter_schema_invalid_properties(json_schema)
676    def _filter_schema_invalid_properties(
677        self, configured_catalog_json_schema: Dict[str, Any]
678    ) -> Dict[str, Any]:
679        """
680        Filters the properties in json_schema that are not present in the stream schema.
681        Configured Schemas can have very old fields, so we need to housekeeping ourselves.
682        """
683        configured_schema: Any = configured_catalog_json_schema.get("properties", {})
684        stream_schema_properties: Any = self.get_json_schema().get("properties", {})
686        configured_keys = configured_schema.keys()
687        stream_keys = stream_schema_properties.keys()
688        invalid_properties = configured_keys - stream_keys
689        if not invalid_properties:
690            return configured_catalog_json_schema
692        self.logger.warning(
693            f"Stream {}: the following fields are deprecated and cannot be synced. {invalid_properties}. Refresh the connection's source schema to resolve this warning."
694        )
696        valid_configured_schema_properties_keys = stream_keys & configured_keys
697        valid_configured_schema_properties = {}
699        for configured_schema_property in valid_configured_schema_properties_keys:
700            valid_configured_schema_properties[configured_schema_property] = (
701                stream_schema_properties[configured_schema_property]
702            )
704        return {**configured_catalog_json_schema, "properties": valid_configured_schema_properties}

Base abstract class for an Airbyte Stream. Makes no assumption of the Stream's underlying transport protocol.

logger: logging.Logger
128    @property
129    def logger(self) -> logging.Logger:
130        return logging.getLogger(f"airbyte.streams.{}")
has_multiple_slices = False
name: str
139    @cached_property
140    def name(self) -> str:
141        """
142        :return: Stream name. By default this is the implementing class name, but it can be overridden as needed.
143        """
144        return casing.camel_to_snake(self.__class__.__name__)

Stream name. By default this is the implementing class name, but it can be overridden as needed.

def get_error_display_message(self, exception: BaseException) -> Optional[str]:
146    def get_error_display_message(self, exception: BaseException) -> Optional[str]:
147        """
148        Retrieves the user-friendly display message that corresponds to an exception.
149        This will be called when encountering an exception while reading records from the stream, and used to build the AirbyteTraceMessage.
151        The default implementation of this method does not return user-friendly messages for any exception type, but it should be overriden as needed.
153        :param exception: The exception that was raised
154        :return: A user-friendly message that indicates the cause of the error
155        """
156        return None

Retrieves the user-friendly display message that corresponds to an exception. This will be called when encountering an exception while reading records from the stream, and used to build the AirbyteTraceMessage.

The default implementation of this method does not return user-friendly messages for any exception type, but it should be overriden as needed.

  • exception: The exception that was raised

A user-friendly message that indicates the cause of the error

def read( self, configured_stream: airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteStream, logger: logging.Logger, slice_logger: airbyte_cdk.sources.utils.slice_logger.SliceLogger, stream_state: MutableMapping[str, Any], state_manager, internal_config: airbyte_cdk.InternalConfig) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
158    def read(  # type: ignore  # ignoring typing for ConnectorStateManager because of circular dependencies
159        self,
160        configured_stream: ConfiguredAirbyteStream,
161        logger: logging.Logger,
162        slice_logger: SliceLogger,
163        stream_state: MutableMapping[str, Any],
164        state_manager,
165        internal_config: InternalConfig,
166    ) -> Iterable[StreamData]:
167        sync_mode = configured_stream.sync_mode
168        cursor_field = configured_stream.cursor_field
169        self.configured_json_schema =
171        # WARNING: When performing a read() that uses incoming stream state, we MUST use the self.state that is defined as
172        # opposed to the incoming stream_state value. Because some connectors like ones using the file-based CDK modify
173        # state before setting the value on the Stream attribute, the most up-to-date state is derived from Stream.state
174        # instead of the stream_state parameter. This does not apply to legacy connectors using get_updated_state().
175        try:
176            stream_state = self.state  # type: ignore # we know the field might not exist...
177        except AttributeError:
178            pass
180        should_checkpoint = bool(state_manager)
181        checkpoint_reader = self._get_checkpoint_reader(
182            logger=logger, cursor_field=cursor_field, sync_mode=sync_mode, stream_state=stream_state
183        )
185        next_slice =
186        record_counter = 0
187        stream_state_tracker = copy.deepcopy(stream_state)
188        while next_slice is not None:
189            if slice_logger.should_log_slice_message(logger):
190                yield slice_logger.create_slice_log_message(next_slice)
191            records = self.read_records(
192                sync_mode=sync_mode,  # todo: change this interface to no longer rely on sync_mode for behavior
193                stream_slice=next_slice,
194                stream_state=stream_state,
195                cursor_field=cursor_field or None,
196            )
197            for record_data_or_message in records:
198                yield record_data_or_message
199                if isinstance(record_data_or_message, Mapping) or (
200                    hasattr(record_data_or_message, "type")
201                    and record_data_or_message.type == MessageType.RECORD
202                ):
203                    record_data = (
204                        record_data_or_message
205                        if isinstance(record_data_or_message, Mapping)
206                        else record_data_or_message.record
207                    )
209                    # Thanks I hate it. RFR fundamentally doesn't fit with the concept of the legacy Stream.get_updated_state()
210                    # method because RFR streams rely on pagination as a cursor. Stream.get_updated_state() was designed to make
211                    # the CDK manage state using specifically the last seen record. don't @ brian.lai
212                    #
213                    # Also, because the legacy incremental state case decouples observing incoming records from emitting state, it
214                    # requires that we separate CheckpointReader.observe() and CheckpointReader.get_checkpoint() which could
215                    # otherwise be combined.
216                    if self.cursor_field:
217                        # Some connectors have streams that implement get_updated_state(), but do not define a cursor_field. This
218                        # should be fixed on the stream implementation, but we should also protect against this in the CDK as well
219                        stream_state_tracker = self.get_updated_state(
220                            stream_state_tracker,
221                            record_data,  # type: ignore [arg-type]
222                        )
223                        self._observe_state(checkpoint_reader, stream_state_tracker)
224                    record_counter += 1
226                    checkpoint_interval = self.state_checkpoint_interval
227                    if (
228                        should_checkpoint
229                        and checkpoint_interval
230                        and record_counter % checkpoint_interval == 0
231                    ):
232                        checkpoint = checkpoint_reader.get_checkpoint()
233                        if checkpoint:
234                            airbyte_state_message = self._checkpoint_state(
235                                checkpoint, state_manager=state_manager
236                            )
237                            yield airbyte_state_message
239                    if internal_config.is_limit_reached(record_counter):
240                        break
241            self._observe_state(checkpoint_reader)
242            checkpoint_state = checkpoint_reader.get_checkpoint()
243            if should_checkpoint and checkpoint_state is not None:
244                airbyte_state_message = self._checkpoint_state(
245                    checkpoint_state, state_manager=state_manager
246                )
247                yield airbyte_state_message
249            next_slice =
251        checkpoint = checkpoint_reader.get_checkpoint()
252        if should_checkpoint and checkpoint is not None:
253            airbyte_state_message = self._checkpoint_state(checkpoint, state_manager=state_manager)
254            yield airbyte_state_message
def read_only_records( self, state: Optional[Mapping[str, Any]] = None) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
256    def read_only_records(self, state: Optional[Mapping[str, Any]] = None) -> Iterable[StreamData]:
257        """
258        Helper method that performs a read on a stream with an optional state and emits records. If the parent stream supports
259        incremental, this operation does not update the stream's internal state (if it uses the modern state setter/getter)
260        or emit state messages.
261        """
263        configured_stream = ConfiguredAirbyteStream(
264            stream=AirbyteStream(
265      ,
266                json_schema={},
267                supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental],
268            ),
269            sync_mode=SyncMode.incremental if state else SyncMode.full_refresh,
270            destination_sync_mode=DestinationSyncMode.append,
271        )
273        yield from
274            configured_stream=configured_stream,
275            logger=self.logger,
276            slice_logger=DebugSliceLogger(),
277            stream_state=dict(state)
278            if state
279            else {},  # read() expects MutableMapping instead of Mapping which is used more often
280            state_manager=None,
281            internal_config=InternalConfig(),  # type: ignore [call-arg]
282        )

Helper method that performs a read on a stream with an optional state and emits records. If the parent stream supports incremental, this operation does not update the stream's internal state (if it uses the modern state setter/getter) or emit state messages.

def read_records( self, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]] = None, stream_slice: Optional[Mapping[str, Any]] = None, stream_state: Optional[Mapping[str, Any]] = None) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
284    @abstractmethod
285    def read_records(
286        self,
287        sync_mode: SyncMode,
288        cursor_field: Optional[List[str]] = None,
289        stream_slice: Optional[Mapping[str, Any]] = None,
290        stream_state: Optional[Mapping[str, Any]] = None,
291    ) -> Iterable[StreamData]:
292        """
293        This method should be overridden by subclasses to read records based on the inputs
294        """

This method should be overridden by subclasses to read records based on the inputs

def get_json_schema(self) -> Mapping[str, Any]:
296    @lru_cache(maxsize=None)
297    def get_json_schema(self) -> Mapping[str, Any]:
298        """
299        :return: A dict of the JSON schema representing this stream.
301        The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property.
302        Override as needed.
303        """
304        # TODO show an example of using pydantic to define the JSON schema, or reading an OpenAPI spec
305        return ResourceSchemaLoader(package_name_from_class(self.__class__)).get_schema(

A dict of the JSON schema representing this stream.

The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property. Override as needed.

def as_airbyte_stream( self) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteStream:
307    def as_airbyte_stream(self) -> AirbyteStream:
308        stream = AirbyteStream(
309  ,
310            json_schema=dict(self.get_json_schema()),
311            supported_sync_modes=[SyncMode.full_refresh],
312            is_resumable=self.is_resumable,
313        )
315        if self.namespace:
316            stream.namespace = self.namespace
318        # If we can offer incremental we always should. RFR is always less reliable than incremental which uses a real cursor value
319        if self.supports_incremental:
320            stream.source_defined_cursor = self.source_defined_cursor
321            stream.supported_sync_modes.append(SyncMode.incremental)
322            stream.default_cursor_field = self._wrapped_cursor_field()
324        keys = Stream._wrapped_primary_key(self.primary_key)
325        if keys and len(keys) > 0:
326            stream.source_defined_primary_key = keys
328        return stream
supports_incremental: bool
330    @property
331    def supports_incremental(self) -> bool:
332        """
333        :return: True if this stream supports incrementally reading data
334        """
335        return len(self._wrapped_cursor_field()) > 0

True if this stream supports incrementally reading data

is_resumable: bool
337    @property
338    def is_resumable(self) -> bool:
339        """
340        :return: True if this stream allows the checkpointing of sync progress and can resume from it on subsequent attempts.
341        This differs from supports_incremental because certain kinds of streams like those supporting resumable full refresh
342        can checkpoint progress in between attempts for improved fault tolerance. However, they will start from the beginning
343        on the next sync job.
344        """
345        if self.supports_incremental:
346            return True
347        if self.has_multiple_slices:
348            # We temporarily gate substream to not support RFR because puts a pretty high burden on connector developers
349            # to structure stream state in a very specific way. We also can't check for issubclass(HttpSubStream) because
350            # not all substreams implement the interface and it would be a circular dependency so we use parent as a surrogate
351            return False
352        elif hasattr(type(self), "state") and getattr(type(self), "state").fset is not None:
353            # Modern case where a stream manages state using getter/setter
354            return True
355        else:
356            # Legacy case where the CDK manages state via the get_updated_state() method. This is determined by checking if
357            # the stream's get_updated_state() differs from the Stream class and therefore has been overridden
358            return type(self).get_updated_state != Stream.get_updated_state

True if this stream allows the checkpointing of sync progress and can resume from it on subsequent attempts. This differs from supports_incremental because certain kinds of streams like those supporting resumable full refresh can checkpoint progress in between attempts for improved fault tolerance. However, they will start from the beginning on the next sync job.

cursor_field: Union[str, List[str]]
363    @property
364    def cursor_field(self) -> Union[str, List[str]]:
365        """
366        Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
367        :return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.
368        """
369        return []

Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.


The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.

namespace: Optional[str]
371    @property
372    def namespace(self) -> Optional[str]:
373        """
374        Override to return the namespace of this stream, e.g. the Postgres schema which this stream will emit records for.
375        :return: A string containing the name of the namespace.
376        """
377        return None

Override to return the namespace of this stream, e.g. the Postgres schema which this stream will emit records for.


A string containing the name of the namespace.

source_defined_cursor: bool
379    @property
380    def source_defined_cursor(self) -> bool:
381        """
382        Return False if the cursor can be configured by the user.
383        """
384        return True

Return False if the cursor can be configured by the user.

exit_on_rate_limit: bool
386    @property
387    def exit_on_rate_limit(self) -> bool:
388        """Exit on rate limit getter, should return bool value. False if the stream will retry endlessly when rate limited."""
389        return self._exit_on_rate_limit

Exit on rate limit getter, should return bool value. False if the stream will retry endlessly when rate limited.

primary_key: Union[str, List[str], List[List[str]], NoneType]
396    @property
397    @abstractmethod
398    def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
399        """
400        :return: string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields.
401          If the stream has no primary keys, return None.
402        """

string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields. If the stream has no primary keys, return None.

def stream_slices( self, *, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None) -> Iterable[Optional[Mapping[str, Any]]]:
404    def stream_slices(
405        self,
406        *,
407        sync_mode: SyncMode,
408        cursor_field: Optional[List[str]] = None,
409        stream_state: Optional[Mapping[str, Any]] = None,
410    ) -> Iterable[Optional[Mapping[str, Any]]]:
411        """
412        Override to define the slices for this stream. See the stream slicing section of the docs for more information.
414        :param sync_mode:
415        :param cursor_field:
416        :param stream_state:
417        :return:
418        """
419        yield StreamSlice(partition={}, cursor_slice={})

Override to define the slices for this stream. See the stream slicing section of the docs for more information.

  • sync_mode:
  • cursor_field:
  • stream_state:
state_checkpoint_interval: Optional[int]
421    @property
422    def state_checkpoint_interval(self) -> Optional[int]:
423        """
424        Decides how often to checkpoint state (i.e: emit a STATE message). E.g: if this returns a value of 100, then state is persisted after reading
425        100 records, then 200, 300, etc.. A good default value is 1000 although your mileage may vary depending on the underlying data source.
427        Checkpointing a stream avoids re-reading records in the case a sync is failed or cancelled.
429        return None if state should not be checkpointed e.g: because records returned from the underlying data source are not returned in
430        ascending order with respect to the cursor field. This can happen if the source does not support reading records in ascending order of
431        created_at date (or whatever the cursor is). In those cases, state must only be saved once the full stream has been read.
432        """
433        return None

Decides how often to checkpoint state (i.e: emit a STATE message). E.g: if this returns a value of 100, then state is persisted after reading 100 records, then 200, 300, etc.. A good default value is 1000 although your mileage may vary depending on the underlying data source.

Checkpointing a stream avoids re-reading records in the case a sync is failed or cancelled.

return None if state should not be checkpointed e.g: because records returned from the underlying data source are not returned in ascending order with respect to the cursor field. This can happen if the source does not support reading records in ascending order of created_at date (or whatever the cursor is). In those cases, state must only be saved once the full stream has been read.

def get_updated_state( self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> MutableMapping[str, Any]:
441    def get_updated_state(
442        self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]
443    ) -> MutableMapping[str, Any]:
444        """DEPRECATED. Please use explicit state property instead, see `IncrementalMixin` docs.
446        Override to extract state from the latest record. Needed to implement incremental sync.
448        Inspects the latest record extracted from the data source and the current state object and return an updated state object.
450        For example: if the state object is based on created_at timestamp, and the current state is {'created_at': 10}, and the latest_record is
451        {'name': 'octavia', 'created_at': 20 } then this method would return {'created_at': 20} to indicate state should be updated to this object.
453        :param current_stream_state: The stream's current state object
454        :param latest_record: The latest record extracted from the stream
455        :return: An updated state object
456        """
457        return {}

DEPRECATED. Please use explicit state property instead, see IncrementalMixin docs.

Override to extract state from the latest record. Needed to implement incremental sync.

Inspects the latest record extracted from the data source and the current state object and return an updated state object.

For example: if the state object is based on created_at timestamp, and the current state is {'created_at': 10}, and the latest_record is {'name': 'octavia', 'created_at': 20 } then this method would return {'created_at': 20} to indicate state should be updated to this object.

  • current_stream_state: The stream's current state object
  • latest_record: The latest record extracted from the stream

An updated state object

def get_cursor(self) -> Optional[airbyte_cdk.sources.streams.checkpoint.Cursor]:
459    def get_cursor(self) -> Optional[Cursor]:
460        """
461        A Cursor is an interface that a stream can implement to manage how its internal state is read and updated while
462        reading records. Historically, Python connectors had no concept of a cursor to manage state. Python streams need
463        to define a cursor implementation and override this method to manage state through a Cursor.
464        """
465        return self.cursor

A Cursor is an interface that a stream can implement to manage how its internal state is read and updated while reading records. Historically, Python connectors had no concept of a cursor to manage state. Python streams need to define a cursor implementation and override this method to manage state through a Cursor.

def log_stream_sync_configuration(self) -> None:
591    def log_stream_sync_configuration(self) -> None:
592        """
593        Logs the configuration of this stream.
594        """
595        self.logger.debug(
596            f"Syncing stream instance: {}",
597            extra={
598                "primary_key": self.primary_key,
599                "cursor_field": self.cursor_field,
600            },
601        )

Logs the configuration of this stream.

configured_json_schema: Optional[Dict[str, Any]]
663    @property
664    def configured_json_schema(self) -> Optional[Dict[str, Any]]:
665        """
666        This property is set from the read method.
668        :return Optional[Dict]: JSON schema from configured catalog if provided, otherwise None.
669        """
670        return self._configured_json_schema

This property is set from the read method.


JSON schema from configured catalog if provided, otherwise None.