airbyte_cdk.legacy.sources.declarative.declarative_stream

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4import logging
  5from dataclasses import InitVar, dataclass, field
  6from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
  7
  8from typing_extensions import deprecated
  9
 10from airbyte_cdk.legacy.sources.declarative.incremental import (
 11    GlobalSubstreamCursor,
 12    PerPartitionCursor,
 13    PerPartitionWithGlobalCursor,
 14)
 15from airbyte_cdk.models import SyncMode
 16from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
 17from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
 18from airbyte_cdk.sources.declarative.retrievers.async_retriever import AsyncRetriever
 19from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
 20from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader
 21from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
 22from airbyte_cdk.sources.streams.checkpoint import (
 23    CheckpointMode,
 24    CheckpointReader,
 25    Cursor,
 26    CursorBasedCheckpointReader,
 27)
 28from airbyte_cdk.sources.streams.core import Stream
 29from airbyte_cdk.sources.types import Config, StreamSlice
 30
 31
 32@deprecated("DeclarativeStream has been deprecated in favor of the concurrent DefaultStream")
 33@dataclass
 34class DeclarativeStream(Stream):
 35    """
 36    DeclarativeStream is a Stream that delegates most of its logic to its schema_load and retriever
 37
 38    Attributes:
 39        name (str): stream name
 40        primary_key (Optional[Union[str, List[str], List[List[str]]]]): the primary key of the stream
 41        schema_loader (SchemaLoader): The schema loader
 42        retriever (Retriever): The retriever
 43        config (Config): The user-provided configuration as specified by the source's spec
 44        stream_cursor_field (Optional[Union[InterpolatedString, str]]): The cursor field
 45        stream. Transformations are applied in the order in which they are defined.
 46    """
 47
 48    retriever: Retriever
 49    config: Config
 50    parameters: InitVar[Mapping[str, Any]]
 51    name: str
 52    primary_key: Optional[Union[str, List[str], List[List[str]]]]
 53    state_migrations: List[StateMigration] = field(repr=True, default_factory=list)
 54    schema_loader: Optional[SchemaLoader] = None
 55    _name: str = field(init=False, repr=False, default="")
 56    _primary_key: str = field(init=False, repr=False, default="")
 57    stream_cursor_field: Optional[Union[InterpolatedString, str]] = None
 58
 59    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 60        self._stream_cursor_field = (
 61            InterpolatedString.create(self.stream_cursor_field, parameters=parameters)
 62            if isinstance(self.stream_cursor_field, str)
 63            else self.stream_cursor_field
 64        )
 65        self._schema_loader = (
 66            self.schema_loader
 67            if self.schema_loader
 68            else DefaultSchemaLoader(config=self.config, parameters=parameters)
 69        )
 70
 71    @property  # type: ignore
 72    def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
 73        return self._primary_key
 74
 75    @primary_key.setter
 76    def primary_key(self, value: str) -> None:
 77        if not isinstance(value, property):
 78            self._primary_key = value
 79
 80    @property
 81    def exit_on_rate_limit(self) -> bool:
 82        if isinstance(self.retriever, AsyncRetriever):
 83            return self.retriever.exit_on_rate_limit
 84
 85        return self.retriever.requester.exit_on_rate_limit  # type: ignore # abstract Retriever class has not requester attribute
 86
 87    @exit_on_rate_limit.setter
 88    def exit_on_rate_limit(self, value: bool) -> None:
 89        if isinstance(self.retriever, AsyncRetriever):
 90            self.retriever.exit_on_rate_limit = value
 91        else:
 92            self.retriever.requester.exit_on_rate_limit = value  # type: ignore[attr-defined]
 93
 94    @property  # type: ignore
 95    def name(self) -> str:
 96        """
 97        :return: Stream name. By default this is the implementing class name, but it can be overridden as needed.
 98        """
 99        return self._name
100
101    @name.setter
102    def name(self, value: str) -> None:
103        if not isinstance(value, property):
104            self._name = value
105
106    @property
107    def state(self) -> MutableMapping[str, Any]:
108        return self.retriever.state  # type: ignore
109
110    @state.setter
111    def state(self, value: MutableMapping[str, Any]) -> None:
112        """State setter, accept state serialized by state getter."""
113        state: Mapping[str, Any] = value
114        if self.state_migrations:
115            for migration in self.state_migrations:
116                if migration.should_migrate(state):
117                    state = migration.migrate(state)
118        self.retriever.state = state
119
120    def get_updated_state(
121        self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]
122    ) -> MutableMapping[str, Any]:
123        return self.state
124
125    @property
126    def cursor_field(self) -> Union[str, List[str]]:
127        """
128        Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
129        :return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.
130        """
131        cursor = self._stream_cursor_field.eval(self.config)  # type: ignore # _stream_cursor_field is always cast to interpolated string
132        return cursor if cursor else []
133
134    @property
135    def is_resumable(self) -> bool:
136        # Declarative sources always implement state getter/setter, but whether it supports checkpointing is based on
137        # if the retriever has a cursor defined.
138        return self.retriever.cursor is not None if hasattr(self.retriever, "cursor") else False
139
140    def read_records(
141        self,
142        sync_mode: SyncMode,
143        cursor_field: Optional[List[str]] = None,
144        stream_slice: Optional[Mapping[str, Any]] = None,
145        stream_state: Optional[Mapping[str, Any]] = None,
146    ) -> Iterable[Mapping[str, Any]]:
147        """
148        :param: stream_state We knowingly avoid using stream_state as we want cursors to manage their own state.
149        """
150        if stream_slice is None or (
151            not isinstance(stream_slice, StreamSlice) and stream_slice == {}
152        ):
153            # As the parameter is Optional, many would just call `read_records(sync_mode)` during testing without specifying the field
154            # As part of the declarative model without custom components, this should never happen as the CDK would wire up a
155            # SinglePartitionRouter that would create this StreamSlice properly
156            # As part of the declarative model with custom components, a user that would return a `None` slice would now have the default
157            # empty slice which seems to make sense.
158            stream_slice = StreamSlice(partition={}, cursor_slice={})
159        if not isinstance(stream_slice, StreamSlice):
160            raise ValueError(
161                f"DeclarativeStream does not support stream_slices that are not StreamSlice. Got {stream_slice}"
162            )
163        yield from self.retriever.read_records(self.get_json_schema(), stream_slice)  # type: ignore # records are of the correct type
164
165    def get_json_schema(self) -> Mapping[str, Any]:  # type: ignore
166        """
167        :return: A dict of the JSON schema representing this stream.
168
169        The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property.
170        Override as needed.
171        """
172        return self._schema_loader.get_json_schema()
173
174    def stream_slices(
175        self,
176        *,
177        sync_mode: SyncMode,
178        cursor_field: Optional[List[str]] = None,
179        stream_state: Optional[Mapping[str, Any]] = None,
180    ) -> Iterable[Optional[StreamSlice]]:
181        """
182        Override to define the slices for this stream. See the stream slicing section of the docs for more information.
183
184        :param sync_mode:
185        :param cursor_field:
186        :param stream_state: we knowingly avoid using stream_state as we want cursors to manage their own state
187        :return:
188        """
189        return self.retriever.stream_slices()
190
191    @property
192    def state_checkpoint_interval(self) -> Optional[int]:
193        """
194        We explicitly disable checkpointing here. There are a couple reasons for that and not all are documented here but:
195        * In the case where records are not ordered, the granularity of what is ordered is the slice. Therefore, we will only update the
196            cursor value once at the end of every slice.
197        * Updating the state once every record would generate issues for data feed stop conditions or semi-incremental syncs where the
198            important state is the one at the beginning of the slice
199        """
200        return None
201
202    def get_cursor(self) -> Optional[Cursor]:
203        return None
204
205    def _get_checkpoint_reader(
206        self,
207        logger: logging.Logger,
208        cursor_field: Optional[List[str]],
209        sync_mode: SyncMode,
210        stream_state: MutableMapping[str, Any],
211    ) -> CheckpointReader:
212        """
213        This method is overridden to prevent issues with stream slice classification for incremental streams that have parent streams.
214
215        The classification logic, when used with `itertools.tee`, creates a copy of the stream slices. When `stream_slices` is called
216        the second time, the parent records generated during the classification phase are lost. This occurs because `itertools.tee`
217        only buffers the results, meaning the logic in `simple_retriever` that observes and updates the cursor isn't executed again.
218
219        By overriding this method, we ensure that the stream slices are processed correctly and parent records are not lost,
220        allowing the cursor to function as expected.
221        """
222        mappings_or_slices = self.stream_slices(
223            cursor_field=cursor_field,
224            sync_mode=sync_mode,  # todo: change this interface to no longer rely on sync_mode for behavior
225            stream_state=stream_state,
226        )
227
228        cursor = self.get_cursor()
229        checkpoint_mode = self._checkpoint_mode
230
231        if isinstance(
232            cursor, (GlobalSubstreamCursor, PerPartitionCursor, PerPartitionWithGlobalCursor)
233        ):
234            self.has_multiple_slices = True
235            return CursorBasedCheckpointReader(
236                stream_slices=mappings_or_slices,
237                cursor=cursor,
238                read_state_from_cursor=checkpoint_mode == CheckpointMode.RESUMABLE_FULL_REFRESH,
239            )
240
241        return super()._get_checkpoint_reader(logger, cursor_field, sync_mode, stream_state)
@deprecated('DeclarativeStream has been deprecated in favor of the concurrent DefaultStream')
@dataclass
class DeclarativeStream(airbyte_cdk.sources.streams.core.Stream):
 33@deprecated("DeclarativeStream has been deprecated in favor of the concurrent DefaultStream")
 34@dataclass
 35class DeclarativeStream(Stream):
 36    """
 37    DeclarativeStream is a Stream that delegates most of its logic to its schema_load and retriever
 38
 39    Attributes:
 40        name (str): stream name
 41        primary_key (Optional[Union[str, List[str], List[List[str]]]]): the primary key of the stream
 42        schema_loader (SchemaLoader): The schema loader
 43        retriever (Retriever): The retriever
 44        config (Config): The user-provided configuration as specified by the source's spec
 45        stream_cursor_field (Optional[Union[InterpolatedString, str]]): The cursor field
 46        stream. Transformations are applied in the order in which they are defined.
 47    """
 48
 49    retriever: Retriever
 50    config: Config
 51    parameters: InitVar[Mapping[str, Any]]
 52    name: str
 53    primary_key: Optional[Union[str, List[str], List[List[str]]]]
 54    state_migrations: List[StateMigration] = field(repr=True, default_factory=list)
 55    schema_loader: Optional[SchemaLoader] = None
 56    _name: str = field(init=False, repr=False, default="")
 57    _primary_key: str = field(init=False, repr=False, default="")
 58    stream_cursor_field: Optional[Union[InterpolatedString, str]] = None
 59
 60    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 61        self._stream_cursor_field = (
 62            InterpolatedString.create(self.stream_cursor_field, parameters=parameters)
 63            if isinstance(self.stream_cursor_field, str)
 64            else self.stream_cursor_field
 65        )
 66        self._schema_loader = (
 67            self.schema_loader
 68            if self.schema_loader
 69            else DefaultSchemaLoader(config=self.config, parameters=parameters)
 70        )
 71
 72    @property  # type: ignore
 73    def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
 74        return self._primary_key
 75
 76    @primary_key.setter
 77    def primary_key(self, value: str) -> None:
 78        if not isinstance(value, property):
 79            self._primary_key = value
 80
 81    @property
 82    def exit_on_rate_limit(self) -> bool:
 83        if isinstance(self.retriever, AsyncRetriever):
 84            return self.retriever.exit_on_rate_limit
 85
 86        return self.retriever.requester.exit_on_rate_limit  # type: ignore # abstract Retriever class has not requester attribute
 87
 88    @exit_on_rate_limit.setter
 89    def exit_on_rate_limit(self, value: bool) -> None:
 90        if isinstance(self.retriever, AsyncRetriever):
 91            self.retriever.exit_on_rate_limit = value
 92        else:
 93            self.retriever.requester.exit_on_rate_limit = value  # type: ignore[attr-defined]
 94
 95    @property  # type: ignore
 96    def name(self) -> str:
 97        """
 98        :return: Stream name. By default this is the implementing class name, but it can be overridden as needed.
 99        """
100        return self._name
101
102    @name.setter
103    def name(self, value: str) -> None:
104        if not isinstance(value, property):
105            self._name = value
106
107    @property
108    def state(self) -> MutableMapping[str, Any]:
109        return self.retriever.state  # type: ignore
110
111    @state.setter
112    def state(self, value: MutableMapping[str, Any]) -> None:
113        """State setter, accept state serialized by state getter."""
114        state: Mapping[str, Any] = value
115        if self.state_migrations:
116            for migration in self.state_migrations:
117                if migration.should_migrate(state):
118                    state = migration.migrate(state)
119        self.retriever.state = state
120
121    def get_updated_state(
122        self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]
123    ) -> MutableMapping[str, Any]:
124        return self.state
125
126    @property
127    def cursor_field(self) -> Union[str, List[str]]:
128        """
129        Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
130        :return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.
131        """
132        cursor = self._stream_cursor_field.eval(self.config)  # type: ignore # _stream_cursor_field is always cast to interpolated string
133        return cursor if cursor else []
134
135    @property
136    def is_resumable(self) -> bool:
137        # Declarative sources always implement state getter/setter, but whether it supports checkpointing is based on
138        # if the retriever has a cursor defined.
139        return self.retriever.cursor is not None if hasattr(self.retriever, "cursor") else False
140
141    def read_records(
142        self,
143        sync_mode: SyncMode,
144        cursor_field: Optional[List[str]] = None,
145        stream_slice: Optional[Mapping[str, Any]] = None,
146        stream_state: Optional[Mapping[str, Any]] = None,
147    ) -> Iterable[Mapping[str, Any]]:
148        """
149        :param: stream_state We knowingly avoid using stream_state as we want cursors to manage their own state.
150        """
151        if stream_slice is None or (
152            not isinstance(stream_slice, StreamSlice) and stream_slice == {}
153        ):
154            # As the parameter is Optional, many would just call `read_records(sync_mode)` during testing without specifying the field
155            # As part of the declarative model without custom components, this should never happen as the CDK would wire up a
156            # SinglePartitionRouter that would create this StreamSlice properly
157            # As part of the declarative model with custom components, a user that would return a `None` slice would now have the default
158            # empty slice which seems to make sense.
159            stream_slice = StreamSlice(partition={}, cursor_slice={})
160        if not isinstance(stream_slice, StreamSlice):
161            raise ValueError(
162                f"DeclarativeStream does not support stream_slices that are not StreamSlice. Got {stream_slice}"
163            )
164        yield from self.retriever.read_records(self.get_json_schema(), stream_slice)  # type: ignore # records are of the correct type
165
166    def get_json_schema(self) -> Mapping[str, Any]:  # type: ignore
167        """
168        :return: A dict of the JSON schema representing this stream.
169
170        The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property.
171        Override as needed.
172        """
173        return self._schema_loader.get_json_schema()
174
175    def stream_slices(
176        self,
177        *,
178        sync_mode: SyncMode,
179        cursor_field: Optional[List[str]] = None,
180        stream_state: Optional[Mapping[str, Any]] = None,
181    ) -> Iterable[Optional[StreamSlice]]:
182        """
183        Override to define the slices for this stream. See the stream slicing section of the docs for more information.
184
185        :param sync_mode:
186        :param cursor_field:
187        :param stream_state: we knowingly avoid using stream_state as we want cursors to manage their own state
188        :return:
189        """
190        return self.retriever.stream_slices()
191
192    @property
193    def state_checkpoint_interval(self) -> Optional[int]:
194        """
195        We explicitly disable checkpointing here. There are a couple reasons for that and not all are documented here but:
196        * In the case where records are not ordered, the granularity of what is ordered is the slice. Therefore, we will only update the
197            cursor value once at the end of every slice.
198        * Updating the state once every record would generate issues for data feed stop conditions or semi-incremental syncs where the
199            important state is the one at the beginning of the slice
200        """
201        return None
202
203    def get_cursor(self) -> Optional[Cursor]:
204        return None
205
206    def _get_checkpoint_reader(
207        self,
208        logger: logging.Logger,
209        cursor_field: Optional[List[str]],
210        sync_mode: SyncMode,
211        stream_state: MutableMapping[str, Any],
212    ) -> CheckpointReader:
213        """
214        This method is overridden to prevent issues with stream slice classification for incremental streams that have parent streams.
215
216        The classification logic, when used with `itertools.tee`, creates a copy of the stream slices. When `stream_slices` is called
217        the second time, the parent records generated during the classification phase are lost. This occurs because `itertools.tee`
218        only buffers the results, meaning the logic in `simple_retriever` that observes and updates the cursor isn't executed again.
219
220        By overriding this method, we ensure that the stream slices are processed correctly and parent records are not lost,
221        allowing the cursor to function as expected.
222        """
223        mappings_or_slices = self.stream_slices(
224            cursor_field=cursor_field,
225            sync_mode=sync_mode,  # todo: change this interface to no longer rely on sync_mode for behavior
226            stream_state=stream_state,
227        )
228
229        cursor = self.get_cursor()
230        checkpoint_mode = self._checkpoint_mode
231
232        if isinstance(
233            cursor, (GlobalSubstreamCursor, PerPartitionCursor, PerPartitionWithGlobalCursor)
234        ):
235            self.has_multiple_slices = True
236            return CursorBasedCheckpointReader(
237                stream_slices=mappings_or_slices,
238                cursor=cursor,
239                read_state_from_cursor=checkpoint_mode == CheckpointMode.RESUMABLE_FULL_REFRESH,
240            )
241
242        return super()._get_checkpoint_reader(logger, cursor_field, sync_mode, stream_state)

DeclarativeStream is a Stream that delegates most of its logic to its schema_load and retriever

Attributes:
  • name (str): stream name
  • primary_key (Optional[Union[str, List[str], List[List[str]]]]): the primary key of the stream
  • schema_loader (SchemaLoader): The schema loader
  • retriever (Retriever): The retriever
  • config (Config): The user-provided configuration as specified by the source's spec
  • stream_cursor_field (Optional[Union[InterpolatedString, str]]): The cursor field
  • stream. Transformations are applied in the order in which they are defined.
DeclarativeStream( retriever: airbyte_cdk.sources.declarative.retrievers.Retriever, config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], name: str = <property object>, primary_key: Union[str, List[str], List[List[str]], NoneType] = <property object>, state_migrations: List[airbyte_cdk.sources.declarative.migrations.state_migration.StateMigration] = <factory>, schema_loader: Optional[airbyte_cdk.sources.declarative.schema.SchemaLoader] = None, stream_cursor_field: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None)
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
name: str
 95    @property  # type: ignore
 96    def name(self) -> str:
 97        """
 98        :return: Stream name. By default this is the implementing class name, but it can be overridden as needed.
 99        """
100        return self._name
Returns

Stream name. By default this is the implementing class name, but it can be overridden as needed.

primary_key: Union[str, List[str], List[List[str]], NoneType]
72    @property  # type: ignore
73    def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
74        return self._primary_key
Returns

string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields. If the stream has no primary keys, return None.

schema_loader: Optional[airbyte_cdk.sources.declarative.schema.SchemaLoader] = None
stream_cursor_field: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
exit_on_rate_limit: bool
81    @property
82    def exit_on_rate_limit(self) -> bool:
83        if isinstance(self.retriever, AsyncRetriever):
84            return self.retriever.exit_on_rate_limit
85
86        return self.retriever.requester.exit_on_rate_limit  # type: ignore # abstract Retriever class has not requester attribute

Exit on rate limit getter, should return bool value. False if the stream will retry endlessly when rate limited.

state: MutableMapping[str, Any]
107    @property
108    def state(self) -> MutableMapping[str, Any]:
109        return self.retriever.state  # type: ignore

State setter, accept state serialized by state getter.

def get_updated_state( self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> MutableMapping[str, Any]:
121    def get_updated_state(
122        self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]
123    ) -> MutableMapping[str, Any]:
124        return self.state

DEPRECATED. Please use explicit state property instead, see IncrementalMixin docs.

Override to extract state from the latest record. Needed to implement incremental sync.

Inspects the latest record extracted from the data source and the current state object and return an updated state object.

For example: if the state object is based on created_at timestamp, and the current state is {'created_at': 10}, and the latest_record is {'name': 'octavia', 'created_at': 20 } then this method would return {'created_at': 20} to indicate state should be updated to this object.

Parameters
  • current_stream_state: The stream's current state object
  • latest_record: The latest record extracted from the stream
Returns

An updated state object

cursor_field: Union[str, List[str]]
126    @property
127    def cursor_field(self) -> Union[str, List[str]]:
128        """
129        Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
130        :return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.
131        """
132        cursor = self._stream_cursor_field.eval(self.config)  # type: ignore # _stream_cursor_field is always cast to interpolated string
133        return cursor if cursor else []

Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.

Returns

The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.

is_resumable: bool
135    @property
136    def is_resumable(self) -> bool:
137        # Declarative sources always implement state getter/setter, but whether it supports checkpointing is based on
138        # if the retriever has a cursor defined.
139        return self.retriever.cursor is not None if hasattr(self.retriever, "cursor") else False
Returns

True if this stream allows the checkpointing of sync progress and can resume from it on subsequent attempts. This differs from supports_incremental because certain kinds of streams like those supporting resumable full refresh can checkpoint progress in between attempts for improved fault tolerance. However, they will start from the beginning on the next sync job.

def read_records( self, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]] = None, stream_slice: Optional[Mapping[str, Any]] = None, stream_state: Optional[Mapping[str, Any]] = None) -> Iterable[Mapping[str, Any]]:
141    def read_records(
142        self,
143        sync_mode: SyncMode,
144        cursor_field: Optional[List[str]] = None,
145        stream_slice: Optional[Mapping[str, Any]] = None,
146        stream_state: Optional[Mapping[str, Any]] = None,
147    ) -> Iterable[Mapping[str, Any]]:
148        """
149        :param: stream_state We knowingly avoid using stream_state as we want cursors to manage their own state.
150        """
151        if stream_slice is None or (
152            not isinstance(stream_slice, StreamSlice) and stream_slice == {}
153        ):
154            # As the parameter is Optional, many would just call `read_records(sync_mode)` during testing without specifying the field
155            # As part of the declarative model without custom components, this should never happen as the CDK would wire up a
156            # SinglePartitionRouter that would create this StreamSlice properly
157            # As part of the declarative model with custom components, a user that would return a `None` slice would now have the default
158            # empty slice which seems to make sense.
159            stream_slice = StreamSlice(partition={}, cursor_slice={})
160        if not isinstance(stream_slice, StreamSlice):
161            raise ValueError(
162                f"DeclarativeStream does not support stream_slices that are not StreamSlice. Got {stream_slice}"
163            )
164        yield from self.retriever.read_records(self.get_json_schema(), stream_slice)  # type: ignore # records are of the correct type
Parameters
  • stream_state We knowingly avoid using stream_state as we want cursors to manage their own state.
def get_json_schema(self) -> Mapping[str, Any]:
166    def get_json_schema(self) -> Mapping[str, Any]:  # type: ignore
167        """
168        :return: A dict of the JSON schema representing this stream.
169
170        The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property.
171        Override as needed.
172        """
173        return self._schema_loader.get_json_schema()
Returns

A dict of the JSON schema representing this stream.

The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property. Override as needed.

def stream_slices( self, *, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None) -> Iterable[Optional[airbyte_cdk.StreamSlice]]:
175    def stream_slices(
176        self,
177        *,
178        sync_mode: SyncMode,
179        cursor_field: Optional[List[str]] = None,
180        stream_state: Optional[Mapping[str, Any]] = None,
181    ) -> Iterable[Optional[StreamSlice]]:
182        """
183        Override to define the slices for this stream. See the stream slicing section of the docs for more information.
184
185        :param sync_mode:
186        :param cursor_field:
187        :param stream_state: we knowingly avoid using stream_state as we want cursors to manage their own state
188        :return:
189        """
190        return self.retriever.stream_slices()

Override to define the slices for this stream. See the stream slicing section of the docs for more information.

Parameters
  • sync_mode:
  • cursor_field:
  • stream_state: we knowingly avoid using stream_state as we want cursors to manage their own state
Returns
state_checkpoint_interval: Optional[int]
192    @property
193    def state_checkpoint_interval(self) -> Optional[int]:
194        """
195        We explicitly disable checkpointing here. There are a couple reasons for that and not all are documented here but:
196        * In the case where records are not ordered, the granularity of what is ordered is the slice. Therefore, we will only update the
197            cursor value once at the end of every slice.
198        * Updating the state once every record would generate issues for data feed stop conditions or semi-incremental syncs where the
199            important state is the one at the beginning of the slice
200        """
201        return None

We explicitly disable checkpointing here. There are a couple reasons for that and not all are documented here but:

  • In the case where records are not ordered, the granularity of what is ordered is the slice. Therefore, we will only update the cursor value once at the end of every slice.
  • Updating the state once every record would generate issues for data feed stop conditions or semi-incremental syncs where the important state is the one at the beginning of the slice
def get_cursor(self) -> Optional[airbyte_cdk.sources.streams.checkpoint.Cursor]:
203    def get_cursor(self) -> Optional[Cursor]:
204        return None

A Cursor is an interface that a stream can implement to manage how its internal state is read and updated while reading records. Historically, Python connectors had no concept of a cursor to manage state. Python streams need to define a cursor implementation and override this method to manage state through a Cursor.