airbyte_cdk.sources.declarative.declarative_stream
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4import logging 5from dataclasses import InitVar, dataclass, field 6from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union 7 8from airbyte_cdk.models import SyncMode 9from airbyte_cdk.sources.declarative.incremental import ( 10 GlobalSubstreamCursor, 11 PerPartitionCursor, 12 PerPartitionWithGlobalCursor, 13) 14from airbyte_cdk.sources.declarative.interpolation import InterpolatedString 15from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration 16from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever 17from airbyte_cdk.sources.declarative.retrievers.async_retriever import AsyncRetriever 18from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever 19from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader 20from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader 21from airbyte_cdk.sources.streams.checkpoint import ( 22 CheckpointMode, 23 CheckpointReader, 24 Cursor, 25 CursorBasedCheckpointReader, 26) 27from airbyte_cdk.sources.streams.core import Stream 28from airbyte_cdk.sources.types import Config, StreamSlice 29 30 31@dataclass 32class DeclarativeStream(Stream): 33 """ 34 DeclarativeStream is a Stream that delegates most of its logic to its schema_load and retriever 35 36 Attributes: 37 name (str): stream name 38 primary_key (Optional[Union[str, List[str], List[List[str]]]]): the primary key of the stream 39 schema_loader (SchemaLoader): The schema loader 40 retriever (Retriever): The retriever 41 config (Config): The user-provided configuration as specified by the source's spec 42 stream_cursor_field (Optional[Union[InterpolatedString, str]]): The cursor field 43 stream. Transformations are applied in the order in which they are defined. 44 """ 45 46 retriever: Retriever 47 config: Config 48 parameters: InitVar[Mapping[str, Any]] 49 name: str 50 primary_key: Optional[Union[str, List[str], List[List[str]]]] 51 state_migrations: List[StateMigration] = field(repr=True, default_factory=list) 52 schema_loader: Optional[SchemaLoader] = None 53 _name: str = field(init=False, repr=False, default="") 54 _primary_key: str = field(init=False, repr=False, default="") 55 stream_cursor_field: Optional[Union[InterpolatedString, str]] = None 56 57 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 58 self._stream_cursor_field = ( 59 InterpolatedString.create(self.stream_cursor_field, parameters=parameters) 60 if isinstance(self.stream_cursor_field, str) 61 else self.stream_cursor_field 62 ) 63 self._schema_loader = ( 64 self.schema_loader 65 if self.schema_loader 66 else DefaultSchemaLoader(config=self.config, parameters=parameters) 67 ) 68 69 @property # type: ignore 70 def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: 71 return self._primary_key 72 73 @primary_key.setter 74 def primary_key(self, value: str) -> None: 75 if not isinstance(value, property): 76 self._primary_key = value 77 78 @property 79 def exit_on_rate_limit(self) -> bool: 80 if isinstance(self.retriever, AsyncRetriever): 81 return self.retriever.exit_on_rate_limit 82 83 return self.retriever.requester.exit_on_rate_limit # type: ignore # abstract Retriever class has not requester attribute 84 85 @exit_on_rate_limit.setter 86 def exit_on_rate_limit(self, value: bool) -> None: 87 if isinstance(self.retriever, AsyncRetriever): 88 self.retriever.exit_on_rate_limit = value 89 else: 90 self.retriever.requester.exit_on_rate_limit = value # type: ignore[attr-defined] 91 92 @property # type: ignore 93 def name(self) -> str: 94 """ 95 :return: Stream name. By default this is the implementing class name, but it can be overridden as needed. 96 """ 97 return self._name 98 99 @name.setter 100 def name(self, value: str) -> None: 101 if not isinstance(value, property): 102 self._name = value 103 104 @property 105 def state(self) -> MutableMapping[str, Any]: 106 return self.retriever.state # type: ignore 107 108 @state.setter 109 def state(self, value: MutableMapping[str, Any]) -> None: 110 """State setter, accept state serialized by state getter.""" 111 state: Mapping[str, Any] = value 112 if self.state_migrations: 113 for migration in self.state_migrations: 114 if migration.should_migrate(state): 115 state = migration.migrate(state) 116 self.retriever.state = state 117 118 def get_updated_state( 119 self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any] 120 ) -> MutableMapping[str, Any]: 121 return self.state 122 123 @property 124 def cursor_field(self) -> Union[str, List[str]]: 125 """ 126 Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field. 127 :return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor. 128 """ 129 cursor = self._stream_cursor_field.eval(self.config) # type: ignore # _stream_cursor_field is always cast to interpolated string 130 return cursor if cursor else [] 131 132 @property 133 def is_resumable(self) -> bool: 134 # Declarative sources always implement state getter/setter, but whether it supports checkpointing is based on 135 # if the retriever has a cursor defined. 136 return self.retriever.cursor is not None if hasattr(self.retriever, "cursor") else False 137 138 def read_records( 139 self, 140 sync_mode: SyncMode, 141 cursor_field: Optional[List[str]] = None, 142 stream_slice: Optional[Mapping[str, Any]] = None, 143 stream_state: Optional[Mapping[str, Any]] = None, 144 ) -> Iterable[Mapping[str, Any]]: 145 """ 146 :param: stream_state We knowingly avoid using stream_state as we want cursors to manage their own state. 147 """ 148 if stream_slice is None or ( 149 not isinstance(stream_slice, StreamSlice) and stream_slice == {} 150 ): 151 # As the parameter is Optional, many would just call `read_records(sync_mode)` during testing without specifying the field 152 # As part of the declarative model without custom components, this should never happen as the CDK would wire up a 153 # SinglePartitionRouter that would create this StreamSlice properly 154 # As part of the declarative model with custom components, a user that would return a `None` slice would now have the default 155 # empty slice which seems to make sense. 156 stream_slice = StreamSlice(partition={}, cursor_slice={}) 157 if not isinstance(stream_slice, StreamSlice): 158 raise ValueError( 159 f"DeclarativeStream does not support stream_slices that are not StreamSlice. Got {stream_slice}" 160 ) 161 yield from self.retriever.read_records(self.get_json_schema(), stream_slice) # type: ignore # records are of the correct type 162 163 def get_json_schema(self) -> Mapping[str, Any]: # type: ignore 164 """ 165 :return: A dict of the JSON schema representing this stream. 166 167 The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property. 168 Override as needed. 169 """ 170 return self._schema_loader.get_json_schema() 171 172 def stream_slices( 173 self, 174 *, 175 sync_mode: SyncMode, 176 cursor_field: Optional[List[str]] = None, 177 stream_state: Optional[Mapping[str, Any]] = None, 178 ) -> Iterable[Optional[StreamSlice]]: 179 """ 180 Override to define the slices for this stream. See the stream slicing section of the docs for more information. 181 182 :param sync_mode: 183 :param cursor_field: 184 :param stream_state: we knowingly avoid using stream_state as we want cursors to manage their own state 185 :return: 186 """ 187 return self.retriever.stream_slices() 188 189 @property 190 def state_checkpoint_interval(self) -> Optional[int]: 191 """ 192 We explicitly disable checkpointing here. There are a couple reasons for that and not all are documented here but: 193 * In the case where records are not ordered, the granularity of what is ordered is the slice. Therefore, we will only update the 194 cursor value once at the end of every slice. 195 * Updating the state once every record would generate issues for data feed stop conditions or semi-incremental syncs where the 196 important state is the one at the beginning of the slice 197 """ 198 return None 199 200 def get_cursor(self) -> Optional[Cursor]: 201 if self.retriever and isinstance(self.retriever, SimpleRetriever): 202 return self.retriever.cursor 203 return None 204 205 def _get_checkpoint_reader( 206 self, 207 logger: logging.Logger, 208 cursor_field: Optional[List[str]], 209 sync_mode: SyncMode, 210 stream_state: MutableMapping[str, Any], 211 ) -> CheckpointReader: 212 """ 213 This method is overridden to prevent issues with stream slice classification for incremental streams that have parent streams. 214 215 The classification logic, when used with `itertools.tee`, creates a copy of the stream slices. When `stream_slices` is called 216 the second time, the parent records generated during the classification phase are lost. This occurs because `itertools.tee` 217 only buffers the results, meaning the logic in `simple_retriever` that observes and updates the cursor isn't executed again. 218 219 By overriding this method, we ensure that the stream slices are processed correctly and parent records are not lost, 220 allowing the cursor to function as expected. 221 """ 222 mappings_or_slices = self.stream_slices( 223 cursor_field=cursor_field, 224 sync_mode=sync_mode, # todo: change this interface to no longer rely on sync_mode for behavior 225 stream_state=stream_state, 226 ) 227 228 cursor = self.get_cursor() 229 checkpoint_mode = self._checkpoint_mode 230 231 if isinstance( 232 cursor, (GlobalSubstreamCursor, PerPartitionCursor, PerPartitionWithGlobalCursor) 233 ): 234 self.has_multiple_slices = True 235 return CursorBasedCheckpointReader( 236 stream_slices=mappings_or_slices, 237 cursor=cursor, 238 read_state_from_cursor=checkpoint_mode == CheckpointMode.RESUMABLE_FULL_REFRESH, 239 ) 240 241 return super()._get_checkpoint_reader(logger, cursor_field, sync_mode, stream_state)
32@dataclass 33class DeclarativeStream(Stream): 34 """ 35 DeclarativeStream is a Stream that delegates most of its logic to its schema_load and retriever 36 37 Attributes: 38 name (str): stream name 39 primary_key (Optional[Union[str, List[str], List[List[str]]]]): the primary key of the stream 40 schema_loader (SchemaLoader): The schema loader 41 retriever (Retriever): The retriever 42 config (Config): The user-provided configuration as specified by the source's spec 43 stream_cursor_field (Optional[Union[InterpolatedString, str]]): The cursor field 44 stream. Transformations are applied in the order in which they are defined. 45 """ 46 47 retriever: Retriever 48 config: Config 49 parameters: InitVar[Mapping[str, Any]] 50 name: str 51 primary_key: Optional[Union[str, List[str], List[List[str]]]] 52 state_migrations: List[StateMigration] = field(repr=True, default_factory=list) 53 schema_loader: Optional[SchemaLoader] = None 54 _name: str = field(init=False, repr=False, default="") 55 _primary_key: str = field(init=False, repr=False, default="") 56 stream_cursor_field: Optional[Union[InterpolatedString, str]] = None 57 58 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 59 self._stream_cursor_field = ( 60 InterpolatedString.create(self.stream_cursor_field, parameters=parameters) 61 if isinstance(self.stream_cursor_field, str) 62 else self.stream_cursor_field 63 ) 64 self._schema_loader = ( 65 self.schema_loader 66 if self.schema_loader 67 else DefaultSchemaLoader(config=self.config, parameters=parameters) 68 ) 69 70 @property # type: ignore 71 def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: 72 return self._primary_key 73 74 @primary_key.setter 75 def primary_key(self, value: str) -> None: 76 if not isinstance(value, property): 77 self._primary_key = value 78 79 @property 80 def exit_on_rate_limit(self) -> bool: 81 if isinstance(self.retriever, AsyncRetriever): 82 return self.retriever.exit_on_rate_limit 83 84 return self.retriever.requester.exit_on_rate_limit # type: ignore # abstract Retriever class has not requester attribute 85 86 @exit_on_rate_limit.setter 87 def exit_on_rate_limit(self, value: bool) -> None: 88 if isinstance(self.retriever, AsyncRetriever): 89 self.retriever.exit_on_rate_limit = value 90 else: 91 self.retriever.requester.exit_on_rate_limit = value # type: ignore[attr-defined] 92 93 @property # type: ignore 94 def name(self) -> str: 95 """ 96 :return: Stream name. By default this is the implementing class name, but it can be overridden as needed. 97 """ 98 return self._name 99 100 @name.setter 101 def name(self, value: str) -> None: 102 if not isinstance(value, property): 103 self._name = value 104 105 @property 106 def state(self) -> MutableMapping[str, Any]: 107 return self.retriever.state # type: ignore 108 109 @state.setter 110 def state(self, value: MutableMapping[str, Any]) -> None: 111 """State setter, accept state serialized by state getter.""" 112 state: Mapping[str, Any] = value 113 if self.state_migrations: 114 for migration in self.state_migrations: 115 if migration.should_migrate(state): 116 state = migration.migrate(state) 117 self.retriever.state = state 118 119 def get_updated_state( 120 self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any] 121 ) -> MutableMapping[str, Any]: 122 return self.state 123 124 @property 125 def cursor_field(self) -> Union[str, List[str]]: 126 """ 127 Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field. 128 :return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor. 129 """ 130 cursor = self._stream_cursor_field.eval(self.config) # type: ignore # _stream_cursor_field is always cast to interpolated string 131 return cursor if cursor else [] 132 133 @property 134 def is_resumable(self) -> bool: 135 # Declarative sources always implement state getter/setter, but whether it supports checkpointing is based on 136 # if the retriever has a cursor defined. 137 return self.retriever.cursor is not None if hasattr(self.retriever, "cursor") else False 138 139 def read_records( 140 self, 141 sync_mode: SyncMode, 142 cursor_field: Optional[List[str]] = None, 143 stream_slice: Optional[Mapping[str, Any]] = None, 144 stream_state: Optional[Mapping[str, Any]] = None, 145 ) -> Iterable[Mapping[str, Any]]: 146 """ 147 :param: stream_state We knowingly avoid using stream_state as we want cursors to manage their own state. 148 """ 149 if stream_slice is None or ( 150 not isinstance(stream_slice, StreamSlice) and stream_slice == {} 151 ): 152 # As the parameter is Optional, many would just call `read_records(sync_mode)` during testing without specifying the field 153 # As part of the declarative model without custom components, this should never happen as the CDK would wire up a 154 # SinglePartitionRouter that would create this StreamSlice properly 155 # As part of the declarative model with custom components, a user that would return a `None` slice would now have the default 156 # empty slice which seems to make sense. 157 stream_slice = StreamSlice(partition={}, cursor_slice={}) 158 if not isinstance(stream_slice, StreamSlice): 159 raise ValueError( 160 f"DeclarativeStream does not support stream_slices that are not StreamSlice. Got {stream_slice}" 161 ) 162 yield from self.retriever.read_records(self.get_json_schema(), stream_slice) # type: ignore # records are of the correct type 163 164 def get_json_schema(self) -> Mapping[str, Any]: # type: ignore 165 """ 166 :return: A dict of the JSON schema representing this stream. 167 168 The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property. 169 Override as needed. 170 """ 171 return self._schema_loader.get_json_schema() 172 173 def stream_slices( 174 self, 175 *, 176 sync_mode: SyncMode, 177 cursor_field: Optional[List[str]] = None, 178 stream_state: Optional[Mapping[str, Any]] = None, 179 ) -> Iterable[Optional[StreamSlice]]: 180 """ 181 Override to define the slices for this stream. See the stream slicing section of the docs for more information. 182 183 :param sync_mode: 184 :param cursor_field: 185 :param stream_state: we knowingly avoid using stream_state as we want cursors to manage their own state 186 :return: 187 """ 188 return self.retriever.stream_slices() 189 190 @property 191 def state_checkpoint_interval(self) -> Optional[int]: 192 """ 193 We explicitly disable checkpointing here. There are a couple reasons for that and not all are documented here but: 194 * In the case where records are not ordered, the granularity of what is ordered is the slice. Therefore, we will only update the 195 cursor value once at the end of every slice. 196 * Updating the state once every record would generate issues for data feed stop conditions or semi-incremental syncs where the 197 important state is the one at the beginning of the slice 198 """ 199 return None 200 201 def get_cursor(self) -> Optional[Cursor]: 202 if self.retriever and isinstance(self.retriever, SimpleRetriever): 203 return self.retriever.cursor 204 return None 205 206 def _get_checkpoint_reader( 207 self, 208 logger: logging.Logger, 209 cursor_field: Optional[List[str]], 210 sync_mode: SyncMode, 211 stream_state: MutableMapping[str, Any], 212 ) -> CheckpointReader: 213 """ 214 This method is overridden to prevent issues with stream slice classification for incremental streams that have parent streams. 215 216 The classification logic, when used with `itertools.tee`, creates a copy of the stream slices. When `stream_slices` is called 217 the second time, the parent records generated during the classification phase are lost. This occurs because `itertools.tee` 218 only buffers the results, meaning the logic in `simple_retriever` that observes and updates the cursor isn't executed again. 219 220 By overriding this method, we ensure that the stream slices are processed correctly and parent records are not lost, 221 allowing the cursor to function as expected. 222 """ 223 mappings_or_slices = self.stream_slices( 224 cursor_field=cursor_field, 225 sync_mode=sync_mode, # todo: change this interface to no longer rely on sync_mode for behavior 226 stream_state=stream_state, 227 ) 228 229 cursor = self.get_cursor() 230 checkpoint_mode = self._checkpoint_mode 231 232 if isinstance( 233 cursor, (GlobalSubstreamCursor, PerPartitionCursor, PerPartitionWithGlobalCursor) 234 ): 235 self.has_multiple_slices = True 236 return CursorBasedCheckpointReader( 237 stream_slices=mappings_or_slices, 238 cursor=cursor, 239 read_state_from_cursor=checkpoint_mode == CheckpointMode.RESUMABLE_FULL_REFRESH, 240 ) 241 242 return super()._get_checkpoint_reader(logger, cursor_field, sync_mode, stream_state)
DeclarativeStream is a Stream that delegates most of its logic to its schema_load and retriever
Attributes:
- name (str): stream name
- primary_key (Optional[Union[str, List[str], List[List[str]]]]): the primary key of the stream
- schema_loader (SchemaLoader): The schema loader
- retriever (Retriever): The retriever
- config (Config): The user-provided configuration as specified by the source's spec
- stream_cursor_field (Optional[Union[InterpolatedString, str]]): The cursor field
- stream. Transformations are applied in the order in which they are defined.
93 @property # type: ignore 94 def name(self) -> str: 95 """ 96 :return: Stream name. By default this is the implementing class name, but it can be overridden as needed. 97 """ 98 return self._name
Returns
Stream name. By default this is the implementing class name, but it can be overridden as needed.
70 @property # type: ignore 71 def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: 72 return self._primary_key
Returns
string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields. If the stream has no primary keys, return None.
79 @property 80 def exit_on_rate_limit(self) -> bool: 81 if isinstance(self.retriever, AsyncRetriever): 82 return self.retriever.exit_on_rate_limit 83 84 return self.retriever.requester.exit_on_rate_limit # type: ignore # abstract Retriever class has not requester attribute
Exit on rate limit getter, should return bool value. False if the stream will retry endlessly when rate limited.
105 @property 106 def state(self) -> MutableMapping[str, Any]: 107 return self.retriever.state # type: ignore
State setter, accept state serialized by state getter.
119 def get_updated_state( 120 self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any] 121 ) -> MutableMapping[str, Any]: 122 return self.state
DEPRECATED. Please use explicit state property instead, see IncrementalMixin
docs.
Override to extract state from the latest record. Needed to implement incremental sync.
Inspects the latest record extracted from the data source and the current state object and return an updated state object.
For example: if the state object is based on created_at timestamp, and the current state is {'created_at': 10}, and the latest_record is {'name': 'octavia', 'created_at': 20 } then this method would return {'created_at': 20} to indicate state should be updated to this object.
Parameters
- current_stream_state: The stream's current state object
- latest_record: The latest record extracted from the stream
Returns
An updated state object
124 @property 125 def cursor_field(self) -> Union[str, List[str]]: 126 """ 127 Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field. 128 :return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor. 129 """ 130 cursor = self._stream_cursor_field.eval(self.config) # type: ignore # _stream_cursor_field is always cast to interpolated string 131 return cursor if cursor else []
Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
Returns
The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.
133 @property 134 def is_resumable(self) -> bool: 135 # Declarative sources always implement state getter/setter, but whether it supports checkpointing is based on 136 # if the retriever has a cursor defined. 137 return self.retriever.cursor is not None if hasattr(self.retriever, "cursor") else False
Returns
True if this stream allows the checkpointing of sync progress and can resume from it on subsequent attempts. This differs from supports_incremental because certain kinds of streams like those supporting resumable full refresh can checkpoint progress in between attempts for improved fault tolerance. However, they will start from the beginning on the next sync job.
139 def read_records( 140 self, 141 sync_mode: SyncMode, 142 cursor_field: Optional[List[str]] = None, 143 stream_slice: Optional[Mapping[str, Any]] = None, 144 stream_state: Optional[Mapping[str, Any]] = None, 145 ) -> Iterable[Mapping[str, Any]]: 146 """ 147 :param: stream_state We knowingly avoid using stream_state as we want cursors to manage their own state. 148 """ 149 if stream_slice is None or ( 150 not isinstance(stream_slice, StreamSlice) and stream_slice == {} 151 ): 152 # As the parameter is Optional, many would just call `read_records(sync_mode)` during testing without specifying the field 153 # As part of the declarative model without custom components, this should never happen as the CDK would wire up a 154 # SinglePartitionRouter that would create this StreamSlice properly 155 # As part of the declarative model with custom components, a user that would return a `None` slice would now have the default 156 # empty slice which seems to make sense. 157 stream_slice = StreamSlice(partition={}, cursor_slice={}) 158 if not isinstance(stream_slice, StreamSlice): 159 raise ValueError( 160 f"DeclarativeStream does not support stream_slices that are not StreamSlice. Got {stream_slice}" 161 ) 162 yield from self.retriever.read_records(self.get_json_schema(), stream_slice) # type: ignore # records are of the correct type
Parameters
- stream_state We knowingly avoid using stream_state as we want cursors to manage their own state.
164 def get_json_schema(self) -> Mapping[str, Any]: # type: ignore 165 """ 166 :return: A dict of the JSON schema representing this stream. 167 168 The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property. 169 Override as needed. 170 """ 171 return self._schema_loader.get_json_schema()
Returns
A dict of the JSON schema representing this stream.
The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property. Override as needed.
173 def stream_slices( 174 self, 175 *, 176 sync_mode: SyncMode, 177 cursor_field: Optional[List[str]] = None, 178 stream_state: Optional[Mapping[str, Any]] = None, 179 ) -> Iterable[Optional[StreamSlice]]: 180 """ 181 Override to define the slices for this stream. See the stream slicing section of the docs for more information. 182 183 :param sync_mode: 184 :param cursor_field: 185 :param stream_state: we knowingly avoid using stream_state as we want cursors to manage their own state 186 :return: 187 """ 188 return self.retriever.stream_slices()
Override to define the slices for this stream. See the stream slicing section of the docs for more information.
Parameters
- sync_mode:
- cursor_field:
- stream_state: we knowingly avoid using stream_state as we want cursors to manage their own state
Returns
190 @property 191 def state_checkpoint_interval(self) -> Optional[int]: 192 """ 193 We explicitly disable checkpointing here. There are a couple reasons for that and not all are documented here but: 194 * In the case where records are not ordered, the granularity of what is ordered is the slice. Therefore, we will only update the 195 cursor value once at the end of every slice. 196 * Updating the state once every record would generate issues for data feed stop conditions or semi-incremental syncs where the 197 important state is the one at the beginning of the slice 198 """ 199 return None
We explicitly disable checkpointing here. There are a couple reasons for that and not all are documented here but:
- In the case where records are not ordered, the granularity of what is ordered is the slice. Therefore, we will only update the cursor value once at the end of every slice.
- Updating the state once every record would generate issues for data feed stop conditions or semi-incremental syncs where the important state is the one at the beginning of the slice
201 def get_cursor(self) -> Optional[Cursor]: 202 if self.retriever and isinstance(self.retriever, SimpleRetriever): 203 return self.retriever.cursor 204 return None
A Cursor is an interface that a stream can implement to manage how its internal state is read and updated while reading records. Historically, Python connectors had no concept of a cursor to manage state. Python streams need to define a cursor implementation and override this method to manage state through a Cursor.