airbyte_cdk.legacy.sources.declarative.declarative_stream
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4import logging 5from dataclasses import InitVar, dataclass, field 6from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union 7 8from typing_extensions import deprecated 9 10from airbyte_cdk.legacy.sources.declarative.incremental import ( 11 GlobalSubstreamCursor, 12 PerPartitionCursor, 13 PerPartitionWithGlobalCursor, 14) 15from airbyte_cdk.models import SyncMode 16from airbyte_cdk.sources.declarative.interpolation import InterpolatedString 17from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration 18from airbyte_cdk.sources.declarative.retrievers.async_retriever import AsyncRetriever 19from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever 20from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader 21from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader 22from airbyte_cdk.sources.streams.checkpoint import ( 23 CheckpointMode, 24 CheckpointReader, 25 Cursor, 26 CursorBasedCheckpointReader, 27) 28from airbyte_cdk.sources.streams.core import Stream 29from airbyte_cdk.sources.types import Config, StreamSlice 30 31 32@deprecated("DeclarativeStream has been deprecated in favor of the concurrent DefaultStream") 33@dataclass 34class DeclarativeStream(Stream): 35 """ 36 DeclarativeStream is a Stream that delegates most of its logic to its schema_load and retriever 37 38 Attributes: 39 name (str): stream name 40 primary_key (Optional[Union[str, List[str], List[List[str]]]]): the primary key of the stream 41 schema_loader (SchemaLoader): The schema loader 42 retriever (Retriever): The retriever 43 config (Config): The user-provided configuration as specified by the source's spec 44 stream_cursor_field (Optional[Union[InterpolatedString, str]]): The cursor field 45 stream. Transformations are applied in the order in which they are defined. 46 """ 47 48 retriever: Retriever 49 config: Config 50 parameters: InitVar[Mapping[str, Any]] 51 name: str 52 primary_key: Optional[Union[str, List[str], List[List[str]]]] 53 state_migrations: List[StateMigration] = field(repr=True, default_factory=list) 54 schema_loader: Optional[SchemaLoader] = None 55 _name: str = field(init=False, repr=False, default="") 56 _primary_key: str = field(init=False, repr=False, default="") 57 stream_cursor_field: Optional[Union[InterpolatedString, str]] = None 58 59 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 60 self._stream_cursor_field = ( 61 InterpolatedString.create(self.stream_cursor_field, parameters=parameters) 62 if isinstance(self.stream_cursor_field, str) 63 else self.stream_cursor_field 64 ) 65 self._schema_loader = ( 66 self.schema_loader 67 if self.schema_loader 68 else DefaultSchemaLoader(config=self.config, parameters=parameters) 69 ) 70 71 @property # type: ignore 72 def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: 73 return self._primary_key 74 75 @primary_key.setter 76 def primary_key(self, value: str) -> None: 77 if not isinstance(value, property): 78 self._primary_key = value 79 80 @property 81 def exit_on_rate_limit(self) -> bool: 82 if isinstance(self.retriever, AsyncRetriever): 83 return self.retriever.exit_on_rate_limit 84 85 return self.retriever.requester.exit_on_rate_limit # type: ignore # abstract Retriever class has not requester attribute 86 87 @exit_on_rate_limit.setter 88 def exit_on_rate_limit(self, value: bool) -> None: 89 if isinstance(self.retriever, AsyncRetriever): 90 self.retriever.exit_on_rate_limit = value 91 else: 92 self.retriever.requester.exit_on_rate_limit = value # type: ignore[attr-defined] 93 94 @property # type: ignore 95 def name(self) -> str: 96 """ 97 :return: Stream name. By default this is the implementing class name, but it can be overridden as needed. 98 """ 99 return self._name 100 101 @name.setter 102 def name(self, value: str) -> None: 103 if not isinstance(value, property): 104 self._name = value 105 106 @property 107 def state(self) -> MutableMapping[str, Any]: 108 return self.retriever.state # type: ignore 109 110 @state.setter 111 def state(self, value: MutableMapping[str, Any]) -> None: 112 """State setter, accept state serialized by state getter.""" 113 state: Mapping[str, Any] = value 114 if self.state_migrations: 115 for migration in self.state_migrations: 116 if migration.should_migrate(state): 117 state = migration.migrate(state) 118 self.retriever.state = state 119 120 def get_updated_state( 121 self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any] 122 ) -> MutableMapping[str, Any]: 123 return self.state 124 125 @property 126 def cursor_field(self) -> Union[str, List[str]]: 127 """ 128 Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field. 129 :return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor. 130 """ 131 cursor = self._stream_cursor_field.eval(self.config) # type: ignore # _stream_cursor_field is always cast to interpolated string 132 return cursor if cursor else [] 133 134 @property 135 def is_resumable(self) -> bool: 136 # Declarative sources always implement state getter/setter, but whether it supports checkpointing is based on 137 # if the retriever has a cursor defined. 138 return self.retriever.cursor is not None if hasattr(self.retriever, "cursor") else False 139 140 def read_records( 141 self, 142 sync_mode: SyncMode, 143 cursor_field: Optional[List[str]] = None, 144 stream_slice: Optional[Mapping[str, Any]] = None, 145 stream_state: Optional[Mapping[str, Any]] = None, 146 ) -> Iterable[Mapping[str, Any]]: 147 """ 148 :param: stream_state We knowingly avoid using stream_state as we want cursors to manage their own state. 149 """ 150 if stream_slice is None or ( 151 not isinstance(stream_slice, StreamSlice) and stream_slice == {} 152 ): 153 # As the parameter is Optional, many would just call `read_records(sync_mode)` during testing without specifying the field 154 # As part of the declarative model without custom components, this should never happen as the CDK would wire up a 155 # SinglePartitionRouter that would create this StreamSlice properly 156 # As part of the declarative model with custom components, a user that would return a `None` slice would now have the default 157 # empty slice which seems to make sense. 158 stream_slice = StreamSlice(partition={}, cursor_slice={}) 159 if not isinstance(stream_slice, StreamSlice): 160 raise ValueError( 161 f"DeclarativeStream does not support stream_slices that are not StreamSlice. Got {stream_slice}" 162 ) 163 yield from self.retriever.read_records(self.get_json_schema(), stream_slice) # type: ignore # records are of the correct type 164 165 def get_json_schema(self) -> Mapping[str, Any]: # type: ignore 166 """ 167 :return: A dict of the JSON schema representing this stream. 168 169 The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property. 170 Override as needed. 171 """ 172 return self._schema_loader.get_json_schema() 173 174 def stream_slices( 175 self, 176 *, 177 sync_mode: SyncMode, 178 cursor_field: Optional[List[str]] = None, 179 stream_state: Optional[Mapping[str, Any]] = None, 180 ) -> Iterable[Optional[StreamSlice]]: 181 """ 182 Override to define the slices for this stream. See the stream slicing section of the docs for more information. 183 184 :param sync_mode: 185 :param cursor_field: 186 :param stream_state: we knowingly avoid using stream_state as we want cursors to manage their own state 187 :return: 188 """ 189 return self.retriever.stream_slices() 190 191 @property 192 def state_checkpoint_interval(self) -> Optional[int]: 193 """ 194 We explicitly disable checkpointing here. There are a couple reasons for that and not all are documented here but: 195 * In the case where records are not ordered, the granularity of what is ordered is the slice. Therefore, we will only update the 196 cursor value once at the end of every slice. 197 * Updating the state once every record would generate issues for data feed stop conditions or semi-incremental syncs where the 198 important state is the one at the beginning of the slice 199 """ 200 return None 201 202 def get_cursor(self) -> Optional[Cursor]: 203 return None 204 205 def _get_checkpoint_reader( 206 self, 207 logger: logging.Logger, 208 cursor_field: Optional[List[str]], 209 sync_mode: SyncMode, 210 stream_state: MutableMapping[str, Any], 211 ) -> CheckpointReader: 212 """ 213 This method is overridden to prevent issues with stream slice classification for incremental streams that have parent streams. 214 215 The classification logic, when used with `itertools.tee`, creates a copy of the stream slices. When `stream_slices` is called 216 the second time, the parent records generated during the classification phase are lost. This occurs because `itertools.tee` 217 only buffers the results, meaning the logic in `simple_retriever` that observes and updates the cursor isn't executed again. 218 219 By overriding this method, we ensure that the stream slices are processed correctly and parent records are not lost, 220 allowing the cursor to function as expected. 221 """ 222 mappings_or_slices = self.stream_slices( 223 cursor_field=cursor_field, 224 sync_mode=sync_mode, # todo: change this interface to no longer rely on sync_mode for behavior 225 stream_state=stream_state, 226 ) 227 228 cursor = self.get_cursor() 229 checkpoint_mode = self._checkpoint_mode 230 231 if isinstance( 232 cursor, (GlobalSubstreamCursor, PerPartitionCursor, PerPartitionWithGlobalCursor) 233 ): 234 self.has_multiple_slices = True 235 return CursorBasedCheckpointReader( 236 stream_slices=mappings_or_slices, 237 cursor=cursor, 238 read_state_from_cursor=checkpoint_mode == CheckpointMode.RESUMABLE_FULL_REFRESH, 239 ) 240 241 return super()._get_checkpoint_reader(logger, cursor_field, sync_mode, stream_state)
33@deprecated("DeclarativeStream has been deprecated in favor of the concurrent DefaultStream") 34@dataclass 35class DeclarativeStream(Stream): 36 """ 37 DeclarativeStream is a Stream that delegates most of its logic to its schema_load and retriever 38 39 Attributes: 40 name (str): stream name 41 primary_key (Optional[Union[str, List[str], List[List[str]]]]): the primary key of the stream 42 schema_loader (SchemaLoader): The schema loader 43 retriever (Retriever): The retriever 44 config (Config): The user-provided configuration as specified by the source's spec 45 stream_cursor_field (Optional[Union[InterpolatedString, str]]): The cursor field 46 stream. Transformations are applied in the order in which they are defined. 47 """ 48 49 retriever: Retriever 50 config: Config 51 parameters: InitVar[Mapping[str, Any]] 52 name: str 53 primary_key: Optional[Union[str, List[str], List[List[str]]]] 54 state_migrations: List[StateMigration] = field(repr=True, default_factory=list) 55 schema_loader: Optional[SchemaLoader] = None 56 _name: str = field(init=False, repr=False, default="") 57 _primary_key: str = field(init=False, repr=False, default="") 58 stream_cursor_field: Optional[Union[InterpolatedString, str]] = None 59 60 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 61 self._stream_cursor_field = ( 62 InterpolatedString.create(self.stream_cursor_field, parameters=parameters) 63 if isinstance(self.stream_cursor_field, str) 64 else self.stream_cursor_field 65 ) 66 self._schema_loader = ( 67 self.schema_loader 68 if self.schema_loader 69 else DefaultSchemaLoader(config=self.config, parameters=parameters) 70 ) 71 72 @property # type: ignore 73 def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: 74 return self._primary_key 75 76 @primary_key.setter 77 def primary_key(self, value: str) -> None: 78 if not isinstance(value, property): 79 self._primary_key = value 80 81 @property 82 def exit_on_rate_limit(self) -> bool: 83 if isinstance(self.retriever, AsyncRetriever): 84 return self.retriever.exit_on_rate_limit 85 86 return self.retriever.requester.exit_on_rate_limit # type: ignore # abstract Retriever class has not requester attribute 87 88 @exit_on_rate_limit.setter 89 def exit_on_rate_limit(self, value: bool) -> None: 90 if isinstance(self.retriever, AsyncRetriever): 91 self.retriever.exit_on_rate_limit = value 92 else: 93 self.retriever.requester.exit_on_rate_limit = value # type: ignore[attr-defined] 94 95 @property # type: ignore 96 def name(self) -> str: 97 """ 98 :return: Stream name. By default this is the implementing class name, but it can be overridden as needed. 99 """ 100 return self._name 101 102 @name.setter 103 def name(self, value: str) -> None: 104 if not isinstance(value, property): 105 self._name = value 106 107 @property 108 def state(self) -> MutableMapping[str, Any]: 109 return self.retriever.state # type: ignore 110 111 @state.setter 112 def state(self, value: MutableMapping[str, Any]) -> None: 113 """State setter, accept state serialized by state getter.""" 114 state: Mapping[str, Any] = value 115 if self.state_migrations: 116 for migration in self.state_migrations: 117 if migration.should_migrate(state): 118 state = migration.migrate(state) 119 self.retriever.state = state 120 121 def get_updated_state( 122 self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any] 123 ) -> MutableMapping[str, Any]: 124 return self.state 125 126 @property 127 def cursor_field(self) -> Union[str, List[str]]: 128 """ 129 Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field. 130 :return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor. 131 """ 132 cursor = self._stream_cursor_field.eval(self.config) # type: ignore # _stream_cursor_field is always cast to interpolated string 133 return cursor if cursor else [] 134 135 @property 136 def is_resumable(self) -> bool: 137 # Declarative sources always implement state getter/setter, but whether it supports checkpointing is based on 138 # if the retriever has a cursor defined. 139 return self.retriever.cursor is not None if hasattr(self.retriever, "cursor") else False 140 141 def read_records( 142 self, 143 sync_mode: SyncMode, 144 cursor_field: Optional[List[str]] = None, 145 stream_slice: Optional[Mapping[str, Any]] = None, 146 stream_state: Optional[Mapping[str, Any]] = None, 147 ) -> Iterable[Mapping[str, Any]]: 148 """ 149 :param: stream_state We knowingly avoid using stream_state as we want cursors to manage their own state. 150 """ 151 if stream_slice is None or ( 152 not isinstance(stream_slice, StreamSlice) and stream_slice == {} 153 ): 154 # As the parameter is Optional, many would just call `read_records(sync_mode)` during testing without specifying the field 155 # As part of the declarative model without custom components, this should never happen as the CDK would wire up a 156 # SinglePartitionRouter that would create this StreamSlice properly 157 # As part of the declarative model with custom components, a user that would return a `None` slice would now have the default 158 # empty slice which seems to make sense. 159 stream_slice = StreamSlice(partition={}, cursor_slice={}) 160 if not isinstance(stream_slice, StreamSlice): 161 raise ValueError( 162 f"DeclarativeStream does not support stream_slices that are not StreamSlice. Got {stream_slice}" 163 ) 164 yield from self.retriever.read_records(self.get_json_schema(), stream_slice) # type: ignore # records are of the correct type 165 166 def get_json_schema(self) -> Mapping[str, Any]: # type: ignore 167 """ 168 :return: A dict of the JSON schema representing this stream. 169 170 The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property. 171 Override as needed. 172 """ 173 return self._schema_loader.get_json_schema() 174 175 def stream_slices( 176 self, 177 *, 178 sync_mode: SyncMode, 179 cursor_field: Optional[List[str]] = None, 180 stream_state: Optional[Mapping[str, Any]] = None, 181 ) -> Iterable[Optional[StreamSlice]]: 182 """ 183 Override to define the slices for this stream. See the stream slicing section of the docs for more information. 184 185 :param sync_mode: 186 :param cursor_field: 187 :param stream_state: we knowingly avoid using stream_state as we want cursors to manage their own state 188 :return: 189 """ 190 return self.retriever.stream_slices() 191 192 @property 193 def state_checkpoint_interval(self) -> Optional[int]: 194 """ 195 We explicitly disable checkpointing here. There are a couple reasons for that and not all are documented here but: 196 * In the case where records are not ordered, the granularity of what is ordered is the slice. Therefore, we will only update the 197 cursor value once at the end of every slice. 198 * Updating the state once every record would generate issues for data feed stop conditions or semi-incremental syncs where the 199 important state is the one at the beginning of the slice 200 """ 201 return None 202 203 def get_cursor(self) -> Optional[Cursor]: 204 return None 205 206 def _get_checkpoint_reader( 207 self, 208 logger: logging.Logger, 209 cursor_field: Optional[List[str]], 210 sync_mode: SyncMode, 211 stream_state: MutableMapping[str, Any], 212 ) -> CheckpointReader: 213 """ 214 This method is overridden to prevent issues with stream slice classification for incremental streams that have parent streams. 215 216 The classification logic, when used with `itertools.tee`, creates a copy of the stream slices. When `stream_slices` is called 217 the second time, the parent records generated during the classification phase are lost. This occurs because `itertools.tee` 218 only buffers the results, meaning the logic in `simple_retriever` that observes and updates the cursor isn't executed again. 219 220 By overriding this method, we ensure that the stream slices are processed correctly and parent records are not lost, 221 allowing the cursor to function as expected. 222 """ 223 mappings_or_slices = self.stream_slices( 224 cursor_field=cursor_field, 225 sync_mode=sync_mode, # todo: change this interface to no longer rely on sync_mode for behavior 226 stream_state=stream_state, 227 ) 228 229 cursor = self.get_cursor() 230 checkpoint_mode = self._checkpoint_mode 231 232 if isinstance( 233 cursor, (GlobalSubstreamCursor, PerPartitionCursor, PerPartitionWithGlobalCursor) 234 ): 235 self.has_multiple_slices = True 236 return CursorBasedCheckpointReader( 237 stream_slices=mappings_or_slices, 238 cursor=cursor, 239 read_state_from_cursor=checkpoint_mode == CheckpointMode.RESUMABLE_FULL_REFRESH, 240 ) 241 242 return super()._get_checkpoint_reader(logger, cursor_field, sync_mode, stream_state)
DeclarativeStream is a Stream that delegates most of its logic to its schema_load and retriever
Attributes:
- name (str): stream name
- primary_key (Optional[Union[str, List[str], List[List[str]]]]): the primary key of the stream
- schema_loader (SchemaLoader): The schema loader
- retriever (Retriever): The retriever
- config (Config): The user-provided configuration as specified by the source's spec
- stream_cursor_field (Optional[Union[InterpolatedString, str]]): The cursor field
- stream. Transformations are applied in the order in which they are defined.
95 @property # type: ignore 96 def name(self) -> str: 97 """ 98 :return: Stream name. By default this is the implementing class name, but it can be overridden as needed. 99 """ 100 return self._name
Returns
Stream name. By default this is the implementing class name, but it can be overridden as needed.
72 @property # type: ignore 73 def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: 74 return self._primary_key
Returns
string if single primary key, list of strings if composite primary key, list of list of strings if composite primary key consisting of nested fields. If the stream has no primary keys, return None.
81 @property 82 def exit_on_rate_limit(self) -> bool: 83 if isinstance(self.retriever, AsyncRetriever): 84 return self.retriever.exit_on_rate_limit 85 86 return self.retriever.requester.exit_on_rate_limit # type: ignore # abstract Retriever class has not requester attribute
Exit on rate limit getter, should return bool value. False if the stream will retry endlessly when rate limited.
107 @property 108 def state(self) -> MutableMapping[str, Any]: 109 return self.retriever.state # type: ignore
State setter, accept state serialized by state getter.
121 def get_updated_state( 122 self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any] 123 ) -> MutableMapping[str, Any]: 124 return self.state
DEPRECATED. Please use explicit state property instead, see IncrementalMixin docs.
Override to extract state from the latest record. Needed to implement incremental sync.
Inspects the latest record extracted from the data source and the current state object and return an updated state object.
For example: if the state object is based on created_at timestamp, and the current state is {'created_at': 10}, and the latest_record is {'name': 'octavia', 'created_at': 20 } then this method would return {'created_at': 20} to indicate state should be updated to this object.
Parameters
- current_stream_state: The stream's current state object
- latest_record: The latest record extracted from the stream
Returns
An updated state object
126 @property 127 def cursor_field(self) -> Union[str, List[str]]: 128 """ 129 Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field. 130 :return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor. 131 """ 132 cursor = self._stream_cursor_field.eval(self.config) # type: ignore # _stream_cursor_field is always cast to interpolated string 133 return cursor if cursor else []
Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
Returns
The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.
135 @property 136 def is_resumable(self) -> bool: 137 # Declarative sources always implement state getter/setter, but whether it supports checkpointing is based on 138 # if the retriever has a cursor defined. 139 return self.retriever.cursor is not None if hasattr(self.retriever, "cursor") else False
Returns
True if this stream allows the checkpointing of sync progress and can resume from it on subsequent attempts. This differs from supports_incremental because certain kinds of streams like those supporting resumable full refresh can checkpoint progress in between attempts for improved fault tolerance. However, they will start from the beginning on the next sync job.
141 def read_records( 142 self, 143 sync_mode: SyncMode, 144 cursor_field: Optional[List[str]] = None, 145 stream_slice: Optional[Mapping[str, Any]] = None, 146 stream_state: Optional[Mapping[str, Any]] = None, 147 ) -> Iterable[Mapping[str, Any]]: 148 """ 149 :param: stream_state We knowingly avoid using stream_state as we want cursors to manage their own state. 150 """ 151 if stream_slice is None or ( 152 not isinstance(stream_slice, StreamSlice) and stream_slice == {} 153 ): 154 # As the parameter is Optional, many would just call `read_records(sync_mode)` during testing without specifying the field 155 # As part of the declarative model without custom components, this should never happen as the CDK would wire up a 156 # SinglePartitionRouter that would create this StreamSlice properly 157 # As part of the declarative model with custom components, a user that would return a `None` slice would now have the default 158 # empty slice which seems to make sense. 159 stream_slice = StreamSlice(partition={}, cursor_slice={}) 160 if not isinstance(stream_slice, StreamSlice): 161 raise ValueError( 162 f"DeclarativeStream does not support stream_slices that are not StreamSlice. Got {stream_slice}" 163 ) 164 yield from self.retriever.read_records(self.get_json_schema(), stream_slice) # type: ignore # records are of the correct type
Parameters
- stream_state We knowingly avoid using stream_state as we want cursors to manage their own state.
166 def get_json_schema(self) -> Mapping[str, Any]: # type: ignore 167 """ 168 :return: A dict of the JSON schema representing this stream. 169 170 The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property. 171 Override as needed. 172 """ 173 return self._schema_loader.get_json_schema()
Returns
A dict of the JSON schema representing this stream.
The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property. Override as needed.
175 def stream_slices( 176 self, 177 *, 178 sync_mode: SyncMode, 179 cursor_field: Optional[List[str]] = None, 180 stream_state: Optional[Mapping[str, Any]] = None, 181 ) -> Iterable[Optional[StreamSlice]]: 182 """ 183 Override to define the slices for this stream. See the stream slicing section of the docs for more information. 184 185 :param sync_mode: 186 :param cursor_field: 187 :param stream_state: we knowingly avoid using stream_state as we want cursors to manage their own state 188 :return: 189 """ 190 return self.retriever.stream_slices()
Override to define the slices for this stream. See the stream slicing section of the docs for more information.
Parameters
- sync_mode:
- cursor_field:
- stream_state: we knowingly avoid using stream_state as we want cursors to manage their own state
Returns
192 @property 193 def state_checkpoint_interval(self) -> Optional[int]: 194 """ 195 We explicitly disable checkpointing here. There are a couple reasons for that and not all are documented here but: 196 * In the case where records are not ordered, the granularity of what is ordered is the slice. Therefore, we will only update the 197 cursor value once at the end of every slice. 198 * Updating the state once every record would generate issues for data feed stop conditions or semi-incremental syncs where the 199 important state is the one at the beginning of the slice 200 """ 201 return None
We explicitly disable checkpointing here. There are a couple reasons for that and not all are documented here but:
- In the case where records are not ordered, the granularity of what is ordered is the slice. Therefore, we will only update the cursor value once at the end of every slice.
- Updating the state once every record would generate issues for data feed stop conditions or semi-incremental syncs where the important state is the one at the beginning of the slice
A Cursor is an interface that a stream can implement to manage how its internal state is read and updated while reading records. Historically, Python connectors had no concept of a cursor to manage state. Python streams need to define a cursor implementation and override this method to manage state through a Cursor.