airbyte_cdk.sources.declarative.declarative_stream

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4import logging
  5from dataclasses import InitVar, dataclass, field
  6from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
  7
  8from airbyte_cdk.models import SyncMode
  9from airbyte_cdk.sources.declarative.incremental import (
 10    GlobalSubstreamCursor,
 11    PerPartitionCursor,
 12    PerPartitionWithGlobalCursor,
 13)
 14from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
 15from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
 16from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever
 17from airbyte_cdk.sources.declarative.retrievers.async_retriever import AsyncRetriever
 18from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
 19from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader
 20from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
 21from airbyte_cdk.sources.streams.checkpoint import (
 22    CheckpointMode,
 23    CheckpointReader,
 24    Cursor,
 25    CursorBasedCheckpointReader,
 26)
 27from airbyte_cdk.sources.streams.core import Stream
 28from airbyte_cdk.sources.types import Config, StreamSlice
 29
 30
 31@dataclass
 32class DeclarativeStream(Stream):
 33    """
 34    DeclarativeStream is a Stream that delegates most of its logic to its schema_load and retriever
 35
 36    Attributes:
 37        name (str): stream name
 38        primary_key (Optional[Union[str, List[str], List[List[str]]]]): the primary key of the stream
 39        schema_loader (SchemaLoader): The schema loader
 40        retriever (Retriever): The retriever
 41        config (Config): The user-provided configuration as specified by the source's spec
 42        stream_cursor_field (Optional[Union[InterpolatedString, str]]): The cursor field
 43        stream. Transformations are applied in the order in which they are defined.
 44    """
 45
 46    retriever: Retriever
 47    config: Config
 48    parameters: InitVar[Mapping[str, Any]]
 49    name: str
 50    primary_key: Optional[Union[str, List[str], List[List[str]]]]
 51    state_migrations: List[StateMigration] = field(repr=True, default_factory=list)
 52    schema_loader: Optional[SchemaLoader] = None
 53    _name: str = field(init=False, repr=False, default="")
 54    _primary_key: str = field(init=False, repr=False, default="")
 55    stream_cursor_field: Optional[Union[InterpolatedString, str]] = None
 56
 57    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 58        self._stream_cursor_field = (
 59            InterpolatedString.create(self.stream_cursor_field, parameters=parameters)
 60            if isinstance(self.stream_cursor_field, str)
 61            else self.stream_cursor_field
 62        )
 63        self._schema_loader = (
 64            self.schema_loader
 65            if self.schema_loader
 66            else DefaultSchemaLoader(config=self.config, parameters=parameters)
 67        )
 68
 69    @property  # type: ignore
 70    def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
 71        return self._primary_key
 72
 73    @primary_key.setter
 74    def primary_key(self, value: str) -> None:
 75        if not isinstance(value, property):
 76            self._primary_key = value
 77
 78    @property
 79    def exit_on_rate_limit(self) -> bool:
 80        if isinstance(self.retriever, AsyncRetriever):
 81            return self.retriever.exit_on_rate_limit
 82
 83        return self.retriever.requester.exit_on_rate_limit  # type: ignore # abstract Retriever class has not requester attribute
 84
 85    @exit_on_rate_limit.setter
 86    def exit_on_rate_limit(self, value: bool) -> None:
 87        if isinstance(self.retriever, AsyncRetriever):
 88            self.retriever.exit_on_rate_limit = value
 89        else:
 90            self.retriever.requester.exit_on_rate_limit = value  # type: ignore[attr-defined]
 91
 92    @property  # type: ignore
 93    def name(self) -> str:
 94        """
 95        :return: Stream name. By default this is the implementing class name, but it can be overridden as needed.
 96        """
 97        return self._name
 98
 99    @name.setter
100    def name(self, value: str) -> None:
101        if not isinstance(value, property):
102            self._name = value
103
104    @property
105    def state(self) -> MutableMapping[str, Any]:
106        return self.retriever.state  # type: ignore
107
108    @state.setter
109    def state(self, value: MutableMapping[str, Any]) -> None:
110        """State setter, accept state serialized by state getter."""
111        state: Mapping[str, Any] = value
112        if self.state_migrations:
113            for migration in self.state_migrations:
114                if migration.should_migrate(state):
115                    state = migration.migrate(state)
116        self.retriever.state = state
117
118    def get_updated_state(
119        self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]
120    ) -> MutableMapping[str, Any]:
121        return self.state
122
123    @property
124    def cursor_field(self) -> Union[str, List[str]]:
125        """
126        Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
127        :return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.
128        """
129        cursor = self._stream_cursor_field.eval(self.config)  # type: ignore # _stream_cursor_field is always cast to interpolated string
130        return cursor if cursor else []
131
132    @property
133    def is_resumable(self) -> bool:
134        # Declarative sources always implement state getter/setter, but whether it supports checkpointing is based on
135        # if the retriever has a cursor defined.
136        return self.retriever.cursor is not None if hasattr(self.retriever, "cursor") else False
137
138    def read_records(
139        self,
140        sync_mode: SyncMode,
141        cursor_field: Optional[List[str]] = None,
142        stream_slice: Optional[Mapping[str, Any]] = None,
143        stream_state: Optional[Mapping[str, Any]] = None,
144    ) -> Iterable[Mapping[str, Any]]:
145        """
146        :param: stream_state We knowingly avoid using stream_state as we want cursors to manage their own state.
147        """
148        if stream_slice is None or (
149            not isinstance(stream_slice, StreamSlice) and stream_slice == {}
150        ):
151            # As the parameter is Optional, many would just call `read_records(sync_mode)` during testing without specifying the field
152            # As part of the declarative model without custom components, this should never happen as the CDK would wire up a
153            # SinglePartitionRouter that would create this StreamSlice properly
154            # As part of the declarative model with custom components, a user that would return a `None` slice would now have the default
155            # empty slice which seems to make sense.
156            stream_slice = StreamSlice(partition={}, cursor_slice={})
157        if not isinstance(stream_slice, StreamSlice):
158            raise ValueError(
159                f"DeclarativeStream does not support stream_slices that are not StreamSlice. Got {stream_slice}"
160            )
161        yield from self.retriever.read_records(self.get_json_schema(), stream_slice)  # type: ignore # records are of the correct type
162
163    def get_json_schema(self) -> Mapping[str, Any]:  # type: ignore
164        """
165        :return: A dict of the JSON schema representing this stream.
166
167        The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property.
168        Override as needed.
169        """
170        return self._schema_loader.get_json_schema()
171
172    def stream_slices(
173        self,
174        *,
175        sync_mode: SyncMode,
176        cursor_field: Optional[List[str]] = None,
177        stream_state: Optional[Mapping[str, Any]] = None,
178    ) -> Iterable[Optional[StreamSlice]]:
179        """
180        Override to define the slices for this stream. See the stream slicing section of the docs for more information.
181
182        :param sync_mode:
183        :param cursor_field:
184        :param stream_state: we knowingly avoid using stream_state as we want cursors to manage their own state
185        :return:
186        """
187        return self.retriever.stream_slices()
188
189    @property
190    def state_checkpoint_interval(self) -> Optional[int]:
191        """
192        We explicitly disable checkpointing here. There are a couple reasons for that and not all are documented here but:
193        * In the case where records are not ordered, the granularity of what is ordered is the slice. Therefore, we will only update the
194            cursor value once at the end of every slice.
195        * Updating the state once every record would generate issues for data feed stop conditions or semi-incremental syncs where the
196            important state is the one at the beginning of the slice
197        """
198        return None
199
200    def get_cursor(self) -> Optional[Cursor]:
201        if self.retriever and isinstance(self.retriever, SimpleRetriever):
202            return self.retriever.cursor
203        return None
204
205    def _get_checkpoint_reader(
206        self,
207        logger: logging.Logger,
208        cursor_field: Optional[List[str]],
209        sync_mode: SyncMode,
210        stream_state: MutableMapping[str, Any],
211    ) -> CheckpointReader:
212        """
213        This method is overridden to prevent issues with stream slice classification for incremental streams that have parent streams.
214
215        The classification logic, when used with `itertools.tee`, creates a copy of the stream slices. When `stream_slices` is called
216        the second time, the parent records generated during the classification phase are lost. This occurs because `itertools.tee`
217        only buffers the results, meaning the logic in `simple_retriever` that observes and updates the cursor isn't executed again.
218
219        By overriding this method, we ensure that the stream slices are processed correctly and parent records are not lost,
220        allowing the cursor to function as expected.
221        """
222        mappings_or_slices = self.stream_slices(
223            cursor_field=cursor_field,
224            sync_mode=sync_mode,  # todo: change this interface to no longer rely on sync_mode for behavior
225            stream_state=stream_state,
226        )
227
228        cursor = self.get_cursor()
229        checkpoint_mode = self._checkpoint_mode
230
231        if isinstance(
232            cursor, (GlobalSubstreamCursor, PerPartitionCursor, PerPartitionWithGlobalCursor)
233        ):
234            self.has_multiple_slices = True
235            return CursorBasedCheckpointReader(
236                stream_slices=mappings_or_slices,
237                cursor=cursor,
238                read_state_from_cursor=checkpoint_mode == CheckpointMode.RESUMABLE_FULL_REFRESH,
239            )
240
241        return super()._get_checkpoint_reader(logger, cursor_field, sync_mode, stream_state)
@dataclass
class DeclarativeStream(airbyte_cdk.sources.streams.core.Stream):
 32@dataclass
 33class DeclarativeStream(Stream):
 34    """
 35    DeclarativeStream is a Stream that delegates most of its logic to its schema_load and retriever
 36
 37    Attributes:
 38        name (str): stream name
 39        primary_key (Optional[Union[str, List[str], List[List[str]]]]): the primary key of the stream
 40        schema_loader (SchemaLoader): The schema loader
 41        retriever (Retriever): The retriever
 42        config (Config): The user-provided configuration as specified by the source's spec
 43        stream_cursor_field (Optional[Union[InterpolatedString, str]]): The cursor field
 44        stream. Transformations are applied in the order in which they are defined.
 45    """
 46
 47    retriever: Retriever
 48    config: Config
 49    parameters: InitVar[Mapping[str, Any]]
 50    name: str
 51    primary_key: Optional[Union[str, List[str], List[List[str]]]]
 52    state_migrations: List[StateMigration] = field(repr=True, default_factory=list)
 53    schema_loader: Optional[SchemaLoader] = None
 54    _name: str = field(init=False, repr=False, default="")
 55    _primary_key: str = field(init=False, repr=False, default="")
 56    stream_cursor_field: Optional[Union[InterpolatedString, str]] = None
 57
 58    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 59        self._stream_cursor_field = (
 60            InterpolatedString.create(self.stream_cursor_field, parameters=parameters)
 61            if isinstance(self.stream_cursor_field, str)
 62            else self.stream_cursor_field
 63        )
 64        self._schema_loader = (
 65            self.schema_loader
 66            if self.schema_loader
 67            else DefaultSchemaLoader(config=self.config, parameters=parameters)
 68        )
 69
 70    @property  # type: ignore
 71    def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
 72        return self._primary_key
 73
 74    @primary_key.setter
 75    def primary_key(self, value: str) -> None:
 76        if not isinstance(value, property):
 77            self._primary_key = value
 78
 79    @property
 80    def exit_on_rate_limit(self) -> bool:
 81        if isinstance(self.retriever, AsyncRetriever):
 82            return self.retriever.exit_on_rate_limit
 83
 84        return self.retriever.requester.exit_on_rate_limit  # type: ignore # abstract Retriever class has not requester attribute
 85
 86    @exit_on_rate_limit.setter
 87    def exit_on_rate_limit(self, value: bool) -> None:
 88        if isinstance(self.retriever, AsyncRetriever):
 89            self.retriever.exit_on_rate_limit = value
 90        else:
 91            self.retriever.requester.exit_on_rate_limit = value  # type: ignore[attr-defined]
 92
 93    @property  # type: ignore
 94    def name(self) -> str:
 95        """
 96        :return: Stream name. By default this is the implementing class name, but it can be overridden as needed.
 97        """
 98        return self._name
 99
100    @name.setter
101    def name(self, value: str) -> None:
102        if not isinstance(value, property):
103            self._name = value
104
105    @property
106    def state(self) -> MutableMapping[str, Any]:
107        return self.retriever.state  # type: ignore
108
109    @state.setter
110    def state(self, value: MutableMapping[str, Any]) -> None:
111        """State setter, accept state serialized by state getter."""
112        state: Mapping[str, Any] = value
113        if self.state_migrations:
114            for migration in self.state_migrations:
115                if migration.should_migrate(state):
116                    state = migration.migrate(state)
117        self.retriever.state = state
118
119    def get_updated_state(
120        self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]
121    ) -> MutableMapping[str, Any]:
122        return self.state
123
124    @property
125    def cursor_field(self) -> Union[str, List[str]]:
126        """
127        Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
128        :return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.
129        """
130        cursor = self._stream_cursor_field.eval(self.config)  # type: ignore # _stream_cursor_field is always cast to interpolated string
131        return cursor if cursor else []
132
133    @property
134    def is_resumable(self) -> bool:
135        # Declarative sources always implement state getter/setter, but whether it supports checkpointing is based on
136        # if the retriever has a cursor defined.
137        return self.retriever.cursor is not None if hasattr(self.retriever, "cursor") else False
138
139    def read_records(
140        self,
141        sync_mode: SyncMode,
142        cursor_field: Optional[List[str]] = None,
143        stream_slice: Optional[Mapping[str, Any]] = None,
144        stream_state: Optional[Mapping[str, Any]] = None,
145    ) -> Iterable[Mapping[str, Any]]:
146        """
147        :param: stream_state We knowingly avoid using stream_state as we want cursors to manage their own state.
148        """
149        if stream_slice is None or (
150            not isinstance(stream_slice, StreamSlice) and stream_slice == {}
151        ):
152            # As the parameter is Optional, many would just call `read_records(sync_mode)` during testing without specifying the field
153            # As part of the declarative model without custom components, this should never happen as the CDK would wire up a
154            # SinglePartitionRouter that would create this StreamSlice properly
155            # As part of the declarative model with custom components, a user that would return a `None` slice would now have the default
156            # empty slice which seems to make sense.
157            stream_slice = StreamSlice(partition={}, cursor_slice={})
158        if not isinstance(stream_slice, StreamSlice):
159            raise ValueError(
160                f"DeclarativeStream does not support stream_slices that are not StreamSlice. Got {stream_slice}"
161            )
162        yield from self.retriever.read_records(self.get_json_schema(), stream_slice)  # type: ignore # records are of the correct type
163
164    def get_json_schema(self) -> Mapping[str, Any]:  # type: ignore
165        """
166        :return: A dict of the JSON schema representing this stream.
167
168        The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property.
169        Override as needed.
170        """
171        return self._schema_loader.get_json_schema()
172
173    def stream_slices(
174        self,
175        *,
176        sync_mode: SyncMode,
177        cursor_field: Optional[List[str]] = None,
178        stream_state: Optional[Mapping[str, Any]] = None,
179    ) -> Iterable[Optional[StreamSlice]]:
180        """
181        Override to define the slices for this stream. See the stream slicing section of the docs for more information.
182
183        :param sync_mode:
184        :param cursor_field:
185        :param stream_state: we knowingly avoid using stream_state as we want cursors to manage their own state
186        :return:
187        """
188        return self.retriever.stream_slices()
189
190    @property
191    def state_checkpoint_interval(self) -> Optional[int]:
192        """
193        We explicitly disable checkpointing here. There are a couple reasons for that and not all are documented here but:
194        * In the case where records are not ordered, the granularity of what is ordered is the slice. Therefore, we will only update the
195            cursor value once at the end of every slice.
196        * Updating the state once every record would generate issues for data feed stop conditions or semi-incremental syncs where the
197            important state is the one at the beginning of the slice
198        """
199        return None
200
201    def get_cursor(self) -> Optional[Cursor]:
202        if self.retriever and isinstance(self.retriever, SimpleRetriever):
203            return self.retriever.cursor
204        return None
205
206    def _get_checkpoint_reader(
207        self,
208        logger: logging.Logger,
209        cursor_field: Optional[List[str]],
210        sync_mode: SyncMode,
211        stream_state: MutableMapping[str, Any],
212    ) -> CheckpointReader:
213        """
214        This method is overridden to prevent issues with stream slice classification for incremental streams that have parent streams.
215
216        The classification logic, when used with `itertools.tee`, creates a copy of the stream slices. When `stream_slices` is called
217        the second time, the parent records generated during the classification phase are lost. This occurs because `itertools.tee`
218        only buffers the results, meaning the logic in `simple_retriever` that observes and updates the cursor isn't executed again.
219
220        By overriding this method, we ensure that the stream slices are processed correctly and parent records are not lost,
221        allowing the cursor to function as expected.
222        """
223        mappings_or_slices = self.stream_slices(
224            cursor_field=cursor_field,
225            sync_mode=sync_mode,  # todo: change this interface to no longer rely on sync_mode for behavior
226            stream_state=stream_state,
227        )
228
229        cursor = self.get_cursor()
230        checkpoint_mode = self._checkpoint_mode
231
232        if isinstance(
233            cursor, (GlobalSubstreamCursor, PerPartitionCursor, PerPartitionWithGlobalCursor)
234        ):
235            self.has_multiple_slices = True
236            return CursorBasedCheckpointReader(
237                stream_slices=mappings_or_slices,
238                cursor=cursor,
239                read_state_from_cursor=checkpoint_mode == CheckpointMode.RESUMABLE_FULL_REFRESH,
240            )
241
242        return super()._get_checkpoint_reader(logger, cursor_field, sync_mode, stream_state)

DeclarativeStream is a Stream that delegates most of its logic to its schema_load and retriever

Attributes:
  • name (str): stream name
  • primary_key (Optional[Union[str, List[str], List[List[str]]]]): the primary key of the stream
  • schema_loader (SchemaLoader): The schema loader
  • retriever (Retriever): The retriever
  • config (Config): The user-provided configuration as specified by the source's spec
  • stream_cursor_field (Optional[Union[InterpolatedString, str]]): The cursor field
  • stream. Transformations are applied in the order in which they are defined.
DeclarativeStream( retriever: airbyte_cdk.sources.declarative.retrievers.Retriever, config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], name: str = <property object>, primary_key: Union[str, List[str], List[List[str]], NoneType] = <property object>, state_migrations: List[airbyte_cdk.sources.declarative.migrations.state_migration.StateMigration] = <factory>, schema_loader: Optional[airbyte_cdk.sources.declarative.schema.SchemaLoader] = None, stream_cursor_field: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None)
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
name: str
93    @property  # type: ignore
94    def name(self) -> str:
95        """
96        :return: Stream name. By default this is the implementing class name, but it can be overridden as needed.
97        """
98        return self._name
Returns

Stream name. By default this is the implementing class name, but it can be overridden as needed.

primary_key: Union[str, List[str], List[List[str]], NoneType]
70    @property  # type: ignore
71    def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
72        return self._primary_key
Returns

string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields. If the stream has no primary keys, return None.

schema_loader: Optional[airbyte_cdk.sources.declarative.schema.SchemaLoader] = None
stream_cursor_field: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
exit_on_rate_limit: bool
79    @property
80    def exit_on_rate_limit(self) -> bool:
81        if isinstance(self.retriever, AsyncRetriever):
82            return self.retriever.exit_on_rate_limit
83
84        return self.retriever.requester.exit_on_rate_limit  # type: ignore # abstract Retriever class has not requester attribute

Exit on rate limit getter, should return bool value. False if the stream will retry endlessly when rate limited.

state: MutableMapping[str, Any]
105    @property
106    def state(self) -> MutableMapping[str, Any]:
107        return self.retriever.state  # type: ignore

State setter, accept state serialized by state getter.

def get_updated_state( self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> MutableMapping[str, Any]:
119    def get_updated_state(
120        self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]
121    ) -> MutableMapping[str, Any]:
122        return self.state

DEPRECATED. Please use explicit state property instead, see IncrementalMixin docs.

Override to extract state from the latest record. Needed to implement incremental sync.

Inspects the latest record extracted from the data source and the current state object and return an updated state object.

For example: if the state object is based on created_at timestamp, and the current state is {'created_at': 10}, and the latest_record is {'name': 'octavia', 'created_at': 20 } then this method would return {'created_at': 20} to indicate state should be updated to this object.

Parameters
  • current_stream_state: The stream's current state object
  • latest_record: The latest record extracted from the stream
Returns

An updated state object

cursor_field: Union[str, List[str]]
124    @property
125    def cursor_field(self) -> Union[str, List[str]]:
126        """
127        Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
128        :return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.
129        """
130        cursor = self._stream_cursor_field.eval(self.config)  # type: ignore # _stream_cursor_field is always cast to interpolated string
131        return cursor if cursor else []

Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.

Returns

The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.

is_resumable: bool
133    @property
134    def is_resumable(self) -> bool:
135        # Declarative sources always implement state getter/setter, but whether it supports checkpointing is based on
136        # if the retriever has a cursor defined.
137        return self.retriever.cursor is not None if hasattr(self.retriever, "cursor") else False
Returns

True if this stream allows the checkpointing of sync progress and can resume from it on subsequent attempts. This differs from supports_incremental because certain kinds of streams like those supporting resumable full refresh can checkpoint progress in between attempts for improved fault tolerance. However, they will start from the beginning on the next sync job.

def read_records( self, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]] = None, stream_slice: Optional[Mapping[str, Any]] = None, stream_state: Optional[Mapping[str, Any]] = None) -> Iterable[Mapping[str, Any]]:
139    def read_records(
140        self,
141        sync_mode: SyncMode,
142        cursor_field: Optional[List[str]] = None,
143        stream_slice: Optional[Mapping[str, Any]] = None,
144        stream_state: Optional[Mapping[str, Any]] = None,
145    ) -> Iterable[Mapping[str, Any]]:
146        """
147        :param: stream_state We knowingly avoid using stream_state as we want cursors to manage their own state.
148        """
149        if stream_slice is None or (
150            not isinstance(stream_slice, StreamSlice) and stream_slice == {}
151        ):
152            # As the parameter is Optional, many would just call `read_records(sync_mode)` during testing without specifying the field
153            # As part of the declarative model without custom components, this should never happen as the CDK would wire up a
154            # SinglePartitionRouter that would create this StreamSlice properly
155            # As part of the declarative model with custom components, a user that would return a `None` slice would now have the default
156            # empty slice which seems to make sense.
157            stream_slice = StreamSlice(partition={}, cursor_slice={})
158        if not isinstance(stream_slice, StreamSlice):
159            raise ValueError(
160                f"DeclarativeStream does not support stream_slices that are not StreamSlice. Got {stream_slice}"
161            )
162        yield from self.retriever.read_records(self.get_json_schema(), stream_slice)  # type: ignore # records are of the correct type
Parameters
  • stream_state We knowingly avoid using stream_state as we want cursors to manage their own state.
def get_json_schema(self) -> Mapping[str, Any]:
164    def get_json_schema(self) -> Mapping[str, Any]:  # type: ignore
165        """
166        :return: A dict of the JSON schema representing this stream.
167
168        The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property.
169        Override as needed.
170        """
171        return self._schema_loader.get_json_schema()
Returns

A dict of the JSON schema representing this stream.

The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property. Override as needed.

def stream_slices( self, *, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None) -> Iterable[Optional[airbyte_cdk.StreamSlice]]:
173    def stream_slices(
174        self,
175        *,
176        sync_mode: SyncMode,
177        cursor_field: Optional[List[str]] = None,
178        stream_state: Optional[Mapping[str, Any]] = None,
179    ) -> Iterable[Optional[StreamSlice]]:
180        """
181        Override to define the slices for this stream. See the stream slicing section of the docs for more information.
182
183        :param sync_mode:
184        :param cursor_field:
185        :param stream_state: we knowingly avoid using stream_state as we want cursors to manage their own state
186        :return:
187        """
188        return self.retriever.stream_slices()

Override to define the slices for this stream. See the stream slicing section of the docs for more information.

Parameters
  • sync_mode:
  • cursor_field:
  • stream_state: we knowingly avoid using stream_state as we want cursors to manage their own state
Returns
state_checkpoint_interval: Optional[int]
190    @property
191    def state_checkpoint_interval(self) -> Optional[int]:
192        """
193        We explicitly disable checkpointing here. There are a couple reasons for that and not all are documented here but:
194        * In the case where records are not ordered, the granularity of what is ordered is the slice. Therefore, we will only update the
195            cursor value once at the end of every slice.
196        * Updating the state once every record would generate issues for data feed stop conditions or semi-incremental syncs where the
197            important state is the one at the beginning of the slice
198        """
199        return None

We explicitly disable checkpointing here. There are a couple reasons for that and not all are documented here but:

  • In the case where records are not ordered, the granularity of what is ordered is the slice. Therefore, we will only update the cursor value once at the end of every slice.
  • Updating the state once every record would generate issues for data feed stop conditions or semi-incremental syncs where the important state is the one at the beginning of the slice
def get_cursor(self) -> Optional[airbyte_cdk.sources.streams.checkpoint.Cursor]:
201    def get_cursor(self) -> Optional[Cursor]:
202        if self.retriever and isinstance(self.retriever, SimpleRetriever):
203            return self.retriever.cursor
204        return None

A Cursor is an interface that a stream can implement to manage how its internal state is read and updated while reading records. Historically, Python connectors had no concept of a cursor to manage state. Python streams need to define a cursor implementation and override this method to manage state through a Cursor.