airbyte_cdk.sources.streams.checkpoint
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2 3 4from .checkpoint_reader import ( 5 CheckpointMode, 6 CheckpointReader, 7 CursorBasedCheckpointReader, 8 FullRefreshCheckpointReader, 9 IncrementalCheckpointReader, 10 LegacyCursorBasedCheckpointReader, 11 ResumableFullRefreshCheckpointReader, 12) 13from .cursor import Cursor 14from .resumable_full_refresh_cursor import ResumableFullRefreshCursor 15 16__all__ = [ 17 "CheckpointMode", 18 "CheckpointReader", 19 "Cursor", 20 "CursorBasedCheckpointReader", 21 "FullRefreshCheckpointReader", 22 "IncrementalCheckpointReader", 23 "LegacyCursorBasedCheckpointReader", 24 "ResumableFullRefreshCheckpointReader", 25 "ResumableFullRefreshCursor", 26]
13class CheckpointMode(Enum): 14 INCREMENTAL = "incremental" 15 RESUMABLE_FULL_REFRESH = "resumable_full_refresh" 16 FULL_REFRESH = "full_refresh"
An enumeration.
22class CheckpointReader(ABC): 23 """ 24 CheckpointReader manages how to iterate over a stream's partitions and serves as the bridge for interpreting the current state 25 of the stream that should be emitted back to the platform. 26 """ 27 28 @abstractmethod 29 def next(self) -> Optional[Mapping[str, Any]]: 30 """ 31 Returns the next slice that will be used to fetch the next group of records. Returning None indicates that the reader 32 has finished iterating over all slices. 33 """ 34 35 @abstractmethod 36 def observe(self, new_state: Mapping[str, Any]) -> None: 37 """ 38 Updates the internal state of the checkpoint reader based on the incoming stream state from a connector. 39 40 WARNING: This is used to retain backwards compatibility with streams using the legacy get_stream_state() method. 41 In order to uptake Resumable Full Refresh, connectors must migrate streams to use the state setter/getter methods. 42 """ 43 44 @abstractmethod 45 def get_checkpoint(self) -> Optional[Mapping[str, Any]]: 46 """ 47 Retrieves the current state value of the stream. The connector does not emit state messages if the checkpoint value is None. 48 """
CheckpointReader manages how to iterate over a stream's partitions and serves as the bridge for interpreting the current state of the stream that should be emitted back to the platform.
28 @abstractmethod 29 def next(self) -> Optional[Mapping[str, Any]]: 30 """ 31 Returns the next slice that will be used to fetch the next group of records. Returning None indicates that the reader 32 has finished iterating over all slices. 33 """
Returns the next slice that will be used to fetch the next group of records. Returning None indicates that the reader has finished iterating over all slices.
35 @abstractmethod 36 def observe(self, new_state: Mapping[str, Any]) -> None: 37 """ 38 Updates the internal state of the checkpoint reader based on the incoming stream state from a connector. 39 40 WARNING: This is used to retain backwards compatibility with streams using the legacy get_stream_state() method. 41 In order to uptake Resumable Full Refresh, connectors must migrate streams to use the state setter/getter methods. 42 """
Updates the internal state of the checkpoint reader based on the incoming stream state from a connector.
WARNING: This is used to retain backwards compatibility with streams using the legacy get_stream_state() method. In order to uptake Resumable Full Refresh, connectors must migrate streams to use the state setter/getter methods.
44 @abstractmethod 45 def get_checkpoint(self) -> Optional[Mapping[str, Any]]: 46 """ 47 Retrieves the current state value of the stream. The connector does not emit state messages if the checkpoint value is None. 48 """
Retrieves the current state value of the stream. The connector does not emit state messages if the checkpoint value is None.
12class Cursor(ABC): 13 """ 14 Cursors are components that allow for checkpointing the current state of a sync. They keep track of what data has been consumed 15 and allows for syncs to be resumed from a specific point based on that information. 16 """ 17 18 @abstractmethod 19 def set_initial_state(self, stream_state: StreamState) -> None: 20 """ 21 Cursors are not initialized with their state. As state is needed in order to function properly, this method should be called 22 before calling anything else 23 24 :param stream_state: The state of the stream as returned by get_stream_state 25 """ 26 27 def observe(self, stream_slice: StreamSlice, record: Record) -> None: 28 """ 29 Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read. 30 31 :param stream_slice: The current slice, which may or may not contain the most recently observed record 32 :param record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the 33 stream state may need to be deferred depending on whether the source reliably orders records by the cursor field. 34 """ 35 pass 36 37 @abstractmethod 38 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 39 """ 40 Update state based on the stream slice. Note that `stream_slice.cursor_slice` and `most_recent_record.associated_slice` are expected 41 to be the same but we make it explicit here that `stream_slice` should be leveraged to update the state. We do not pass in the 42 latest record, since cursor instances should maintain the relevant internal state on their own. 43 44 :param stream_slice: slice to close 45 """ 46 47 @abstractmethod 48 def get_stream_state(self) -> StreamState: 49 """ 50 Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it 51 is used for two things: 52 * Interpolation of the requests 53 * Transformation of records 54 * Saving the state 55 56 For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that 57 allows for emitting the state to the platform. 58 """ 59 60 @abstractmethod 61 def should_be_synced(self, record: Record) -> bool: 62 """ 63 Evaluating if a record should be synced allows for filtering and stop condition on pagination 64 """ 65 66 @abstractmethod 67 def is_greater_than_or_equal(self, first: Record, second: Record) -> bool: 68 """ 69 Evaluating which record is greater in terms of cursor. This is used to avoid having to capture all the records to close a slice 70 """ 71 72 @abstractmethod 73 def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: 74 """ 75 Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in 76 a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of 77 a specific parent delineated by the incoming slice's partition object. 78 """
Cursors are components that allow for checkpointing the current state of a sync. They keep track of what data has been consumed and allows for syncs to be resumed from a specific point based on that information.
18 @abstractmethod 19 def set_initial_state(self, stream_state: StreamState) -> None: 20 """ 21 Cursors are not initialized with their state. As state is needed in order to function properly, this method should be called 22 before calling anything else 23 24 :param stream_state: The state of the stream as returned by get_stream_state 25 """
Cursors are not initialized with their state. As state is needed in order to function properly, this method should be called before calling anything else
Parameters
- stream_state: The state of the stream as returned by get_stream_state
27 def observe(self, stream_slice: StreamSlice, record: Record) -> None: 28 """ 29 Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read. 30 31 :param stream_slice: The current slice, which may or may not contain the most recently observed record 32 :param record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the 33 stream state may need to be deferred depending on whether the source reliably orders records by the cursor field. 34 """ 35 pass
Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read.
Parameters
- stream_slice: The current slice, which may or may not contain the most recently observed record
- record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the stream state may need to be deferred depending on whether the source reliably orders records by the cursor field.
37 @abstractmethod 38 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 39 """ 40 Update state based on the stream slice. Note that `stream_slice.cursor_slice` and `most_recent_record.associated_slice` are expected 41 to be the same but we make it explicit here that `stream_slice` should be leveraged to update the state. We do not pass in the 42 latest record, since cursor instances should maintain the relevant internal state on their own. 43 44 :param stream_slice: slice to close 45 """
Update state based on the stream slice. Note that stream_slice.cursor_slice
and most_recent_record.associated_slice
are expected
to be the same but we make it explicit here that stream_slice
should be leveraged to update the state. We do not pass in the
latest record, since cursor instances should maintain the relevant internal state on their own.
Parameters
- stream_slice: slice to close
47 @abstractmethod 48 def get_stream_state(self) -> StreamState: 49 """ 50 Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it 51 is used for two things: 52 * Interpolation of the requests 53 * Transformation of records 54 * Saving the state 55 56 For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that 57 allows for emitting the state to the platform. 58 """
Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it is used for two things:
- Interpolation of the requests
- Transformation of records
- Saving the state
For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that allows for emitting the state to the platform.
60 @abstractmethod 61 def should_be_synced(self, record: Record) -> bool: 62 """ 63 Evaluating if a record should be synced allows for filtering and stop condition on pagination 64 """
Evaluating if a record should be synced allows for filtering and stop condition on pagination
66 @abstractmethod 67 def is_greater_than_or_equal(self, first: Record, second: Record) -> bool: 68 """ 69 Evaluating which record is greater in terms of cursor. This is used to avoid having to capture all the records to close a slice 70 """
Evaluating which record is greater in terms of cursor. This is used to avoid having to capture all the records to close a slice
72 @abstractmethod 73 def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: 74 """ 75 Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in 76 a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of 77 a specific parent delineated by the incoming slice's partition object. 78 """
Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of a specific parent delineated by the incoming slice's partition object.
84class CursorBasedCheckpointReader(CheckpointReader): 85 """ 86 CursorBasedCheckpointReader is used by streams that implement a Cursor in order to manage state. This allows the checkpoint 87 reader to delegate the complexity of fetching state to the cursor and focus on the iteration over a stream's partitions. 88 89 This reader supports the Cursor interface used by Python and low-code sources. Not to be confused with Cursor interface 90 that belongs to the Concurrent CDK. 91 """ 92 93 def __init__( 94 self, 95 cursor: Cursor, 96 stream_slices: Iterable[Optional[Mapping[str, Any]]], 97 read_state_from_cursor: bool = False, 98 ): 99 self._cursor = cursor 100 self._stream_slices = iter(stream_slices) 101 # read_state_from_cursor is used to delineate that partitions should determine when to stop syncing dynamically according 102 # to the value of the state at runtime. This currently only applies to streams that use resumable full refresh. 103 self._read_state_from_cursor = read_state_from_cursor 104 self._current_slice: Optional[StreamSlice] = None 105 self._finished_sync = False 106 self._previous_state: Optional[Mapping[str, Any]] = None 107 108 def next(self) -> Optional[Mapping[str, Any]]: 109 try: 110 self.current_slice = self._find_next_slice() 111 return self.current_slice 112 except StopIteration: 113 self._finished_sync = True 114 return None 115 116 def observe(self, new_state: Mapping[str, Any]) -> None: 117 # Cursor based checkpoint readers don't need to observe the new state because it has already been updated by the cursor 118 # while processing records 119 pass 120 121 def get_checkpoint(self) -> Optional[Mapping[str, Any]]: 122 # This is used to avoid sending a duplicate state messages 123 new_state = self._cursor.get_stream_state() 124 if new_state != self._previous_state: 125 self._previous_state = new_state 126 return new_state 127 else: 128 return None 129 130 def _find_next_slice(self) -> StreamSlice: 131 """ 132 _find_next_slice() returns the next slice of data should be synced for the current stream according to its cursor. 133 This function supports iterating over a stream's slices across two dimensions. The first dimension is the stream's 134 partitions like parent records for a substream. The inner dimension iterates over the cursor value like a date 135 range for incremental streams or a pagination checkpoint for resumable full refresh. 136 137 The basic algorithm for iterating through a stream's slices is: 138 1. The first time next() is invoked we get the first partition 139 2. If the current partition is already complete as a result of a previous sync attempt, continue iterating until 140 we find an un-synced partition. 141 2. For streams whose cursor value is determined dynamically using stream state 142 1. Get the state for the current partition 143 2. If the current partition's state is complete, continue iterating over partitions 144 3. If the current partition's state is still in progress, emit the next cursor value 145 4. If the current partition is complete as delineated by the sentinel value, get the next incomplete partition 146 3. When stream has processed all partitions, the iterator will raise a StopIteration exception signaling there are no more 147 slices left for extracting more records. 148 """ 149 150 if self._read_state_from_cursor: 151 if self.current_slice is None: 152 # current_slice is None represents the first time we are iterating over a stream's slices. The first slice to 153 # sync not been assigned yet and must first be read from the iterator 154 next_slice = self.read_and_convert_slice() 155 state_for_slice = self._cursor.select_state(next_slice) 156 if state_for_slice == FULL_REFRESH_COMPLETE_STATE: 157 # Skip every slice that already has the terminal complete value indicating that a previous attempt 158 # successfully synced the slice 159 has_more = True 160 while has_more: 161 next_slice = self.read_and_convert_slice() 162 state_for_slice = self._cursor.select_state(next_slice) 163 has_more = state_for_slice == FULL_REFRESH_COMPLETE_STATE 164 return StreamSlice( 165 cursor_slice=state_for_slice or {}, 166 partition=next_slice.partition, 167 extra_fields=next_slice.extra_fields, 168 ) 169 else: 170 state_for_slice = self._cursor.select_state(self.current_slice) 171 if state_for_slice == FULL_REFRESH_COMPLETE_STATE: 172 # If the current slice is is complete, move to the next slice and skip the next slices that already 173 # have the terminal complete value indicating that a previous attempt was successfully read. 174 # Dummy initialization for mypy since we'll iterate at least once to get the next slice 175 next_candidate_slice = StreamSlice(cursor_slice={}, partition={}) 176 has_more = True 177 while has_more: 178 next_candidate_slice = self.read_and_convert_slice() 179 state_for_slice = self._cursor.select_state(next_candidate_slice) 180 has_more = state_for_slice == FULL_REFRESH_COMPLETE_STATE 181 return StreamSlice( 182 cursor_slice=state_for_slice or {}, 183 partition=next_candidate_slice.partition, 184 extra_fields=next_candidate_slice.extra_fields, 185 ) 186 # The reader continues to process the current partition if it's state is still in progress 187 return StreamSlice( 188 cursor_slice=state_for_slice or {}, 189 partition=self.current_slice.partition, 190 extra_fields=self.current_slice.extra_fields, 191 ) 192 else: 193 # Unlike RFR cursors that iterate dynamically according to how stream state is updated, most cursors operate 194 # on a fixed set of slices determined before reading records. They just iterate to the next slice 195 return self.read_and_convert_slice() 196 197 @property 198 def current_slice(self) -> Optional[StreamSlice]: 199 return self._current_slice 200 201 @current_slice.setter 202 def current_slice(self, value: StreamSlice) -> None: 203 self._current_slice = value 204 205 def read_and_convert_slice(self) -> StreamSlice: 206 next_slice = next(self._stream_slices) 207 if not isinstance(next_slice, StreamSlice): 208 raise ValueError( 209 f"{self.current_slice} should be of type StreamSlice. This is likely a bug in the CDK, please contact Airbyte support" 210 ) 211 return next_slice
CursorBasedCheckpointReader is used by streams that implement a Cursor in order to manage state. This allows the checkpoint reader to delegate the complexity of fetching state to the cursor and focus on the iteration over a stream's partitions.
This reader supports the Cursor interface used by Python and low-code sources. Not to be confused with Cursor interface that belongs to the Concurrent CDK.
93 def __init__( 94 self, 95 cursor: Cursor, 96 stream_slices: Iterable[Optional[Mapping[str, Any]]], 97 read_state_from_cursor: bool = False, 98 ): 99 self._cursor = cursor 100 self._stream_slices = iter(stream_slices) 101 # read_state_from_cursor is used to delineate that partitions should determine when to stop syncing dynamically according 102 # to the value of the state at runtime. This currently only applies to streams that use resumable full refresh. 103 self._read_state_from_cursor = read_state_from_cursor 104 self._current_slice: Optional[StreamSlice] = None 105 self._finished_sync = False 106 self._previous_state: Optional[Mapping[str, Any]] = None
108 def next(self) -> Optional[Mapping[str, Any]]: 109 try: 110 self.current_slice = self._find_next_slice() 111 return self.current_slice 112 except StopIteration: 113 self._finished_sync = True 114 return None
Returns the next slice that will be used to fetch the next group of records. Returning None indicates that the reader has finished iterating over all slices.
116 def observe(self, new_state: Mapping[str, Any]) -> None: 117 # Cursor based checkpoint readers don't need to observe the new state because it has already been updated by the cursor 118 # while processing records 119 pass
Updates the internal state of the checkpoint reader based on the incoming stream state from a connector.
WARNING: This is used to retain backwards compatibility with streams using the legacy get_stream_state() method. In order to uptake Resumable Full Refresh, connectors must migrate streams to use the state setter/getter methods.
121 def get_checkpoint(self) -> Optional[Mapping[str, Any]]: 122 # This is used to avoid sending a duplicate state messages 123 new_state = self._cursor.get_stream_state() 124 if new_state != self._previous_state: 125 self._previous_state = new_state 126 return new_state 127 else: 128 return None
Retrieves the current state value of the stream. The connector does not emit state messages if the checkpoint value is None.
205 def read_and_convert_slice(self) -> StreamSlice: 206 next_slice = next(self._stream_slices) 207 if not isinstance(next_slice, StreamSlice): 208 raise ValueError( 209 f"{self.current_slice} should be of type StreamSlice. This is likely a bug in the CDK, please contact Airbyte support" 210 ) 211 return next_slice
313class FullRefreshCheckpointReader(CheckpointReader): 314 """ 315 FullRefreshCheckpointReader iterates over data that cannot be checkpointed incrementally during the sync because the stream 316 is not capable of managing state. At the end of a sync, a final state message is emitted to signal completion. 317 """ 318 319 def __init__(self, stream_slices: Iterable[Optional[Mapping[str, Any]]]): 320 self._stream_slices = iter(stream_slices) 321 self._final_checkpoint = False 322 323 def next(self) -> Optional[Mapping[str, Any]]: 324 try: 325 return next(self._stream_slices) 326 except StopIteration: 327 self._final_checkpoint = True 328 return None 329 330 def observe(self, new_state: Mapping[str, Any]) -> None: 331 pass 332 333 def get_checkpoint(self) -> Optional[Mapping[str, Any]]: 334 if self._final_checkpoint: 335 return {"__ab_no_cursor_state_message": True} 336 return None
FullRefreshCheckpointReader iterates over data that cannot be checkpointed incrementally during the sync because the stream is not capable of managing state. At the end of a sync, a final state message is emitted to signal completion.
323 def next(self) -> Optional[Mapping[str, Any]]: 324 try: 325 return next(self._stream_slices) 326 except StopIteration: 327 self._final_checkpoint = True 328 return None
Returns the next slice that will be used to fetch the next group of records. Returning None indicates that the reader has finished iterating over all slices.
Updates the internal state of the checkpoint reader based on the incoming stream state from a connector.
WARNING: This is used to retain backwards compatibility with streams using the legacy get_stream_state() method. In order to uptake Resumable Full Refresh, connectors must migrate streams to use the state setter/getter methods.
333 def get_checkpoint(self) -> Optional[Mapping[str, Any]]: 334 if self._final_checkpoint: 335 return {"__ab_no_cursor_state_message": True} 336 return None
Retrieves the current state value of the stream. The connector does not emit state messages if the checkpoint value is None.
51class IncrementalCheckpointReader(CheckpointReader): 52 """ 53 IncrementalCheckpointReader handles iterating through a stream based on partitioned windows of data that are determined 54 before syncing data. 55 """ 56 57 def __init__( 58 self, stream_state: Mapping[str, Any], stream_slices: Iterable[Optional[Mapping[str, Any]]] 59 ): 60 self._state: Optional[Mapping[str, Any]] = stream_state 61 self._stream_slices = iter(stream_slices) 62 self._has_slices = False 63 64 def next(self) -> Optional[Mapping[str, Any]]: 65 try: 66 next_slice = next(self._stream_slices) 67 self._has_slices = True 68 return next_slice 69 except StopIteration: 70 # This is used to avoid sending a duplicate state message at the end of a sync since the stream has already 71 # emitted state at the end of each slice. If we want to avoid this extra complexity, we can also just accept 72 # that every sync emits a final duplicate state 73 if self._has_slices: 74 self._state = None 75 return None 76 77 def observe(self, new_state: Mapping[str, Any]) -> None: 78 self._state = new_state 79 80 def get_checkpoint(self) -> Optional[Mapping[str, Any]]: 81 return self._state
IncrementalCheckpointReader handles iterating through a stream based on partitioned windows of data that are determined before syncing data.
64 def next(self) -> Optional[Mapping[str, Any]]: 65 try: 66 next_slice = next(self._stream_slices) 67 self._has_slices = True 68 return next_slice 69 except StopIteration: 70 # This is used to avoid sending a duplicate state message at the end of a sync since the stream has already 71 # emitted state at the end of each slice. If we want to avoid this extra complexity, we can also just accept 72 # that every sync emits a final duplicate state 73 if self._has_slices: 74 self._state = None 75 return None
Returns the next slice that will be used to fetch the next group of records. Returning None indicates that the reader has finished iterating over all slices.
Updates the internal state of the checkpoint reader based on the incoming stream state from a connector.
WARNING: This is used to retain backwards compatibility with streams using the legacy get_stream_state() method. In order to uptake Resumable Full Refresh, connectors must migrate streams to use the state setter/getter methods.
214class LegacyCursorBasedCheckpointReader(CursorBasedCheckpointReader): 215 """ 216 This (unfortunate) class operates like an adapter to retain backwards compatibility with legacy sources that take in stream_slice 217 in the form of a Mapping instead of the StreamSlice object. Internally, the reader still operates over StreamSlices, but it 218 is instantiated with and emits stream slices in the form of a Mapping[str, Any]. The logic of how partitions and cursors 219 are iterated over is synonymous with CursorBasedCheckpointReader. 220 221 We also retain the existing top level fields defined by the connector so the fields are present on dependent methods. For example, 222 the resulting mapping structure passed back to the stream's read_records() method looks like: 223 { 224 "cursor_slice": { 225 "next_page_token": 10 226 }, 227 "partition": { 228 "repository": "airbytehq/airbyte" 229 }, 230 "next_page_token": 10, 231 "repository": "airbytehq/airbyte" 232 } 233 """ 234 235 def __init__( 236 self, 237 cursor: Cursor, 238 stream_slices: Iterable[Optional[Mapping[str, Any]]], 239 read_state_from_cursor: bool = False, 240 ): 241 super().__init__( 242 cursor=cursor, 243 stream_slices=stream_slices, 244 read_state_from_cursor=read_state_from_cursor, 245 ) 246 247 def next(self) -> Optional[Mapping[str, Any]]: 248 try: 249 self.current_slice = self._find_next_slice() 250 251 if "partition" in dict(self.current_slice): 252 raise ValueError("Stream is configured to use invalid stream slice key 'partition'") 253 elif "cursor_slice" in dict(self.current_slice): 254 raise ValueError( 255 "Stream is configured to use invalid stream slice key 'cursor_slice'" 256 ) 257 258 # We convert StreamSlice to a regular mapping because legacy connectors operate on the basic Mapping object. We 259 # also duplicate all fields at the top level for backwards compatibility for existing Python sources 260 return { 261 "partition": self.current_slice.partition, 262 "cursor_slice": self.current_slice.cursor_slice, 263 **dict(self.current_slice), 264 } 265 except StopIteration: 266 self._finished_sync = True 267 return None 268 269 def read_and_convert_slice(self) -> StreamSlice: 270 next_mapping_slice = next(self._stream_slices) 271 if not isinstance(next_mapping_slice, Mapping): 272 raise ValueError( 273 f"{self.current_slice} should be of type Mapping. This is likely a bug in the CDK, please contact Airbyte support" 274 ) 275 276 # The legacy reader is instantiated with an iterable of stream slice mappings. We convert each into a StreamSlice 277 # to sanely process them during the sync and to reuse the existing Python defined cursors 278 return StreamSlice( 279 partition=next_mapping_slice, 280 cursor_slice={}, 281 )
This (unfortunate) class operates like an adapter to retain backwards compatibility with legacy sources that take in stream_slice in the form of a Mapping instead of the StreamSlice object. Internally, the reader still operates over StreamSlices, but it is instantiated with and emits stream slices in the form of a Mapping[str, Any]. The logic of how partitions and cursors are iterated over is synonymous with CursorBasedCheckpointReader.
We also retain the existing top level fields defined by the connector so the fields are present on dependent methods. For example, the resulting mapping structure passed back to the stream's read_records() method looks like: { "cursor_slice": { "next_page_token": 10 }, "partition": { "repository": "airbytehq/airbyte" }, "next_page_token": 10, "repository": "airbytehq/airbyte" }
247 def next(self) -> Optional[Mapping[str, Any]]: 248 try: 249 self.current_slice = self._find_next_slice() 250 251 if "partition" in dict(self.current_slice): 252 raise ValueError("Stream is configured to use invalid stream slice key 'partition'") 253 elif "cursor_slice" in dict(self.current_slice): 254 raise ValueError( 255 "Stream is configured to use invalid stream slice key 'cursor_slice'" 256 ) 257 258 # We convert StreamSlice to a regular mapping because legacy connectors operate on the basic Mapping object. We 259 # also duplicate all fields at the top level for backwards compatibility for existing Python sources 260 return { 261 "partition": self.current_slice.partition, 262 "cursor_slice": self.current_slice.cursor_slice, 263 **dict(self.current_slice), 264 } 265 except StopIteration: 266 self._finished_sync = True 267 return None
Returns the next slice that will be used to fetch the next group of records. Returning None indicates that the reader has finished iterating over all slices.
269 def read_and_convert_slice(self) -> StreamSlice: 270 next_mapping_slice = next(self._stream_slices) 271 if not isinstance(next_mapping_slice, Mapping): 272 raise ValueError( 273 f"{self.current_slice} should be of type Mapping. This is likely a bug in the CDK, please contact Airbyte support" 274 ) 275 276 # The legacy reader is instantiated with an iterable of stream slice mappings. We convert each into a StreamSlice 277 # to sanely process them during the sync and to reuse the existing Python defined cursors 278 return StreamSlice( 279 partition=next_mapping_slice, 280 cursor_slice={}, 281 )
Inherited Members
284class ResumableFullRefreshCheckpointReader(CheckpointReader): 285 """ 286 ResumableFullRefreshCheckpointReader allows for iteration over an unbounded set of records based on the pagination strategy 287 of the stream. Because the number of pages is unknown, the stream's current state is used to determine whether to continue 288 fetching more pages or stopping the sync. 289 """ 290 291 def __init__(self, stream_state: Mapping[str, Any]): 292 # The first attempt of an RFR stream has an empty {} incoming state, but should still make a first attempt to read records 293 # from the first page in next(). 294 self._first_page = bool(stream_state == {}) 295 self._state: Mapping[str, Any] = stream_state 296 297 def next(self) -> Optional[Mapping[str, Any]]: 298 if self._first_page: 299 self._first_page = False 300 return self._state 301 elif self._state == FULL_REFRESH_COMPLETE_STATE: 302 return None 303 else: 304 return self._state 305 306 def observe(self, new_state: Mapping[str, Any]) -> None: 307 self._state = new_state 308 309 def get_checkpoint(self) -> Optional[Mapping[str, Any]]: 310 return self._state or {}
ResumableFullRefreshCheckpointReader allows for iteration over an unbounded set of records based on the pagination strategy of the stream. Because the number of pages is unknown, the stream's current state is used to determine whether to continue fetching more pages or stopping the sync.
291 def __init__(self, stream_state: Mapping[str, Any]): 292 # The first attempt of an RFR stream has an empty {} incoming state, but should still make a first attempt to read records 293 # from the first page in next(). 294 self._first_page = bool(stream_state == {}) 295 self._state: Mapping[str, Any] = stream_state
297 def next(self) -> Optional[Mapping[str, Any]]: 298 if self._first_page: 299 self._first_page = False 300 return self._state 301 elif self._state == FULL_REFRESH_COMPLETE_STATE: 302 return None 303 else: 304 return self._state
Returns the next slice that will be used to fetch the next group of records. Returning None indicates that the reader has finished iterating over all slices.
Updates the internal state of the checkpoint reader based on the incoming stream state from a connector.
WARNING: This is used to retain backwards compatibility with streams using the legacy get_stream_state() method. In order to uptake Resumable Full Refresh, connectors must migrate streams to use the state setter/getter methods.
11@dataclass 12class ResumableFullRefreshCursor(Cursor): 13 """ 14 Cursor that allows for the checkpointing of sync progress according to a synthetic cursor based on the pagination state 15 of the stream. Resumable full refresh syncs are only intended to retain state in between sync attempts of the same job 16 with the platform responsible for removing said state. 17 """ 18 19 def __init__(self) -> None: 20 self._cursor: StreamState = {} 21 22 def get_stream_state(self) -> StreamState: 23 return self._cursor 24 25 def set_initial_state(self, stream_state: StreamState) -> None: 26 self._cursor = stream_state 27 28 def observe(self, stream_slice: StreamSlice, record: Record) -> None: 29 """ 30 Resumable full refresh manages state using a page number so it does not need to update state by observing incoming records. 31 """ 32 pass 33 34 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 35 self._cursor = stream_slice.cursor_slice 36 37 def should_be_synced(self, record: Record) -> bool: 38 """ 39 Unlike date-based cursors which filter out records outside slice boundaries, resumable full refresh records exist within pages 40 that don't have filterable bounds. We should always return them. 41 """ 42 return True 43 44 def is_greater_than_or_equal(self, first: Record, second: Record) -> bool: 45 """ 46 RFR record don't have ordering to be compared between one another. 47 """ 48 return False 49 50 def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: 51 # A top-level RFR cursor only manages the state of a single partition 52 return self._cursor
Cursor that allows for the checkpointing of sync progress according to a synthetic cursor based on the pagination state of the stream. Resumable full refresh syncs are only intended to retain state in between sync attempts of the same job with the platform responsible for removing said state.
Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it is used for two things:
- Interpolation of the requests
- Transformation of records
- Saving the state
For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that allows for emitting the state to the platform.
Cursors are not initialized with their state. As state is needed in order to function properly, this method should be called before calling anything else
Parameters
- stream_state: The state of the stream as returned by get_stream_state
28 def observe(self, stream_slice: StreamSlice, record: Record) -> None: 29 """ 30 Resumable full refresh manages state using a page number so it does not need to update state by observing incoming records. 31 """ 32 pass
Resumable full refresh manages state using a page number so it does not need to update state by observing incoming records.
34 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 35 self._cursor = stream_slice.cursor_slice
Update state based on the stream slice. Note that stream_slice.cursor_slice
and most_recent_record.associated_slice
are expected
to be the same but we make it explicit here that stream_slice
should be leveraged to update the state. We do not pass in the
latest record, since cursor instances should maintain the relevant internal state on their own.
Parameters
- stream_slice: slice to close
37 def should_be_synced(self, record: Record) -> bool: 38 """ 39 Unlike date-based cursors which filter out records outside slice boundaries, resumable full refresh records exist within pages 40 that don't have filterable bounds. We should always return them. 41 """ 42 return True
Unlike date-based cursors which filter out records outside slice boundaries, resumable full refresh records exist within pages that don't have filterable bounds. We should always return them.
44 def is_greater_than_or_equal(self, first: Record, second: Record) -> bool: 45 """ 46 RFR record don't have ordering to be compared between one another. 47 """ 48 return False
RFR record don't have ordering to be compared between one another.
50 def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: 51 # A top-level RFR cursor only manages the state of a single partition 52 return self._cursor
Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of a specific parent delineated by the incoming slice's partition object.