airbyte_cdk.sources.streams.checkpoint

 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2
 3
 4from .checkpoint_reader import (
 5    CheckpointMode,
 6    CheckpointReader,
 7    CursorBasedCheckpointReader,
 8    FullRefreshCheckpointReader,
 9    IncrementalCheckpointReader,
10    LegacyCursorBasedCheckpointReader,
11    ResumableFullRefreshCheckpointReader,
12)
13from .cursor import Cursor
14from .resumable_full_refresh_cursor import ResumableFullRefreshCursor
15
16__all__ = [
17    "CheckpointMode",
18    "CheckpointReader",
19    "Cursor",
20    "CursorBasedCheckpointReader",
21    "FullRefreshCheckpointReader",
22    "IncrementalCheckpointReader",
23    "LegacyCursorBasedCheckpointReader",
24    "ResumableFullRefreshCheckpointReader",
25    "ResumableFullRefreshCursor",
26]
class CheckpointMode(enum.Enum):
13class CheckpointMode(Enum):
14    INCREMENTAL = "incremental"
15    RESUMABLE_FULL_REFRESH = "resumable_full_refresh"
16    FULL_REFRESH = "full_refresh"

An enumeration.

INCREMENTAL = <CheckpointMode.INCREMENTAL: 'incremental'>
RESUMABLE_FULL_REFRESH = <CheckpointMode.RESUMABLE_FULL_REFRESH: 'resumable_full_refresh'>
FULL_REFRESH = <CheckpointMode.FULL_REFRESH: 'full_refresh'>
class CheckpointReader(abc.ABC):
22class CheckpointReader(ABC):
23    """
24    CheckpointReader manages how to iterate over a stream's partitions and serves as the bridge for interpreting the current state
25    of the stream that should be emitted back to the platform.
26    """
27
28    @abstractmethod
29    def next(self) -> Optional[Mapping[str, Any]]:
30        """
31        Returns the next slice that will be used to fetch the next group of records. Returning None indicates that the reader
32        has finished iterating over all slices.
33        """
34
35    @abstractmethod
36    def observe(self, new_state: Mapping[str, Any]) -> None:
37        """
38        Updates the internal state of the checkpoint reader based on the incoming stream state from a connector.
39
40        WARNING: This is used to retain backwards compatibility with streams using the legacy get_stream_state() method.
41        In order to uptake Resumable Full Refresh, connectors must migrate streams to use the state setter/getter methods.
42        """
43
44    @abstractmethod
45    def get_checkpoint(self) -> Optional[Mapping[str, Any]]:
46        """
47        Retrieves the current state value of the stream. The connector does not emit state messages if the checkpoint value is None.
48        """

CheckpointReader manages how to iterate over a stream's partitions and serves as the bridge for interpreting the current state of the stream that should be emitted back to the platform.

@abstractmethod
def next(self) -> Optional[Mapping[str, Any]]:
28    @abstractmethod
29    def next(self) -> Optional[Mapping[str, Any]]:
30        """
31        Returns the next slice that will be used to fetch the next group of records. Returning None indicates that the reader
32        has finished iterating over all slices.
33        """

Returns the next slice that will be used to fetch the next group of records. Returning None indicates that the reader has finished iterating over all slices.

@abstractmethod
def observe(self, new_state: Mapping[str, Any]) -> None:
35    @abstractmethod
36    def observe(self, new_state: Mapping[str, Any]) -> None:
37        """
38        Updates the internal state of the checkpoint reader based on the incoming stream state from a connector.
39
40        WARNING: This is used to retain backwards compatibility with streams using the legacy get_stream_state() method.
41        In order to uptake Resumable Full Refresh, connectors must migrate streams to use the state setter/getter methods.
42        """

Updates the internal state of the checkpoint reader based on the incoming stream state from a connector.

WARNING: This is used to retain backwards compatibility with streams using the legacy get_stream_state() method. In order to uptake Resumable Full Refresh, connectors must migrate streams to use the state setter/getter methods.

@abstractmethod
def get_checkpoint(self) -> Optional[Mapping[str, Any]]:
44    @abstractmethod
45    def get_checkpoint(self) -> Optional[Mapping[str, Any]]:
46        """
47        Retrieves the current state value of the stream. The connector does not emit state messages if the checkpoint value is None.
48        """

Retrieves the current state value of the stream. The connector does not emit state messages if the checkpoint value is None.

class Cursor(abc.ABC):
12class Cursor(ABC):
13    """
14    Cursors are components that allow for checkpointing the current state of a sync. They keep track of what data has been consumed
15    and allows for syncs to be resumed from a specific point based on that information.
16    """
17
18    @abstractmethod
19    def set_initial_state(self, stream_state: StreamState) -> None:
20        """
21        Cursors are not initialized with their state. As state is needed in order to function properly, this method should be called
22        before calling anything else
23
24        :param stream_state: The state of the stream as returned by get_stream_state
25        """
26
27    def observe(self, stream_slice: StreamSlice, record: Record) -> None:
28        """
29        Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read.
30
31        :param stream_slice: The current slice, which may or may not contain the most recently observed record
32        :param record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the
33          stream state may need to be deferred depending on whether the source reliably orders records by the cursor field.
34        """
35        pass
36
37    @abstractmethod
38    def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
39        """
40        Update state based on the stream slice. Note that `stream_slice.cursor_slice` and `most_recent_record.associated_slice` are expected
41        to be the same but we make it explicit here that `stream_slice` should be leveraged to update the state. We do not pass in the
42        latest record, since cursor instances should maintain the relevant internal state on their own.
43
44        :param stream_slice: slice to close
45        """
46
47    @abstractmethod
48    def get_stream_state(self) -> StreamState:
49        """
50        Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it
51        is used for two things:
52        * Interpolation of the requests
53        * Transformation of records
54        * Saving the state
55
56        For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that
57        allows for emitting the state to the platform.
58        """
59
60    @abstractmethod
61    def should_be_synced(self, record: Record) -> bool:
62        """
63        Evaluating if a record should be synced allows for filtering and stop condition on pagination
64        """
65
66    @abstractmethod
67    def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
68        """
69        Evaluating which record is greater in terms of cursor. This is used to avoid having to capture all the records to close a slice
70        """
71
72    @abstractmethod
73    def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
74        """
75        Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in
76        a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of
77        a specific parent delineated by the incoming slice's partition object.
78        """

Cursors are components that allow for checkpointing the current state of a sync. They keep track of what data has been consumed and allows for syncs to be resumed from a specific point based on that information.

@abstractmethod
def set_initial_state(self, stream_state: Mapping[str, Any]) -> None:
18    @abstractmethod
19    def set_initial_state(self, stream_state: StreamState) -> None:
20        """
21        Cursors are not initialized with their state. As state is needed in order to function properly, this method should be called
22        before calling anything else
23
24        :param stream_state: The state of the stream as returned by get_stream_state
25        """

Cursors are not initialized with their state. As state is needed in order to function properly, this method should be called before calling anything else

Parameters
  • stream_state: The state of the stream as returned by get_stream_state
def observe( self, stream_slice: airbyte_cdk.StreamSlice, record: airbyte_cdk.Record) -> None:
27    def observe(self, stream_slice: StreamSlice, record: Record) -> None:
28        """
29        Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read.
30
31        :param stream_slice: The current slice, which may or may not contain the most recently observed record
32        :param record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the
33          stream state may need to be deferred depending on whether the source reliably orders records by the cursor field.
34        """
35        pass

Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read.

Parameters
  • stream_slice: The current slice, which may or may not contain the most recently observed record
  • record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the stream state may need to be deferred depending on whether the source reliably orders records by the cursor field.
@abstractmethod
def close_slice( self, stream_slice: airbyte_cdk.StreamSlice, *args: Any) -> None:
37    @abstractmethod
38    def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
39        """
40        Update state based on the stream slice. Note that `stream_slice.cursor_slice` and `most_recent_record.associated_slice` are expected
41        to be the same but we make it explicit here that `stream_slice` should be leveraged to update the state. We do not pass in the
42        latest record, since cursor instances should maintain the relevant internal state on their own.
43
44        :param stream_slice: slice to close
45        """

Update state based on the stream slice. Note that stream_slice.cursor_slice and most_recent_record.associated_slice are expected to be the same but we make it explicit here that stream_slice should be leveraged to update the state. We do not pass in the latest record, since cursor instances should maintain the relevant internal state on their own.

Parameters
  • stream_slice: slice to close
@abstractmethod
def get_stream_state(self) -> Mapping[str, Any]:
47    @abstractmethod
48    def get_stream_state(self) -> StreamState:
49        """
50        Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it
51        is used for two things:
52        * Interpolation of the requests
53        * Transformation of records
54        * Saving the state
55
56        For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that
57        allows for emitting the state to the platform.
58        """

Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it is used for two things:

  • Interpolation of the requests
  • Transformation of records
  • Saving the state

For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that allows for emitting the state to the platform.

@abstractmethod
def should_be_synced(self, record: airbyte_cdk.Record) -> bool:
60    @abstractmethod
61    def should_be_synced(self, record: Record) -> bool:
62        """
63        Evaluating if a record should be synced allows for filtering and stop condition on pagination
64        """

Evaluating if a record should be synced allows for filtering and stop condition on pagination

@abstractmethod
def is_greater_than_or_equal( self, first: airbyte_cdk.Record, second: airbyte_cdk.Record) -> bool:
66    @abstractmethod
67    def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
68        """
69        Evaluating which record is greater in terms of cursor. This is used to avoid having to capture all the records to close a slice
70        """

Evaluating which record is greater in terms of cursor. This is used to avoid having to capture all the records to close a slice

@abstractmethod
def select_state( self, stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Optional[Mapping[str, Any]]:
72    @abstractmethod
73    def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
74        """
75        Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in
76        a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of
77        a specific parent delineated by the incoming slice's partition object.
78        """

Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of a specific parent delineated by the incoming slice's partition object.

class CursorBasedCheckpointReader(airbyte_cdk.sources.streams.checkpoint.CheckpointReader):
 84class CursorBasedCheckpointReader(CheckpointReader):
 85    """
 86    CursorBasedCheckpointReader is used by streams that implement a Cursor in order to manage state. This allows the checkpoint
 87    reader to delegate the complexity of fetching state to the cursor and focus on the iteration over a stream's partitions.
 88
 89    This reader supports the Cursor interface used by Python and low-code sources. Not to be confused with Cursor interface
 90    that belongs to the Concurrent CDK.
 91    """
 92
 93    def __init__(
 94        self,
 95        cursor: Cursor,
 96        stream_slices: Iterable[Optional[Mapping[str, Any]]],
 97        read_state_from_cursor: bool = False,
 98    ):
 99        self._cursor = cursor
100        self._stream_slices = iter(stream_slices)
101        # read_state_from_cursor is used to delineate that partitions should determine when to stop syncing dynamically according
102        # to the value of the state at runtime. This currently only applies to streams that use resumable full refresh.
103        self._read_state_from_cursor = read_state_from_cursor
104        self._current_slice: Optional[StreamSlice] = None
105        self._finished_sync = False
106        self._previous_state: Optional[Mapping[str, Any]] = None
107
108    def next(self) -> Optional[Mapping[str, Any]]:
109        try:
110            self.current_slice = self._find_next_slice()
111            return self.current_slice
112        except StopIteration:
113            self._finished_sync = True
114            return None
115
116    def observe(self, new_state: Mapping[str, Any]) -> None:
117        # Cursor based checkpoint readers don't need to observe the new state because it has already been updated by the cursor
118        # while processing records
119        pass
120
121    def get_checkpoint(self) -> Optional[Mapping[str, Any]]:
122        # This is used to avoid sending a duplicate state messages
123        new_state = self._cursor.get_stream_state()
124        if new_state != self._previous_state:
125            self._previous_state = new_state
126            return new_state
127        else:
128            return None
129
130    def _find_next_slice(self) -> StreamSlice:
131        """
132        _find_next_slice() returns the next slice of data should be synced for the current stream according to its cursor.
133        This function supports iterating over a stream's slices across two dimensions. The first dimension is the stream's
134        partitions like parent records for a substream. The inner dimension iterates over the cursor value like a date
135        range for incremental streams or a pagination checkpoint for resumable full refresh.
136
137        The basic algorithm for iterating through a stream's slices is:
138        1. The first time next() is invoked we get the first partition
139        2. If the current partition is already complete as a result of a previous sync attempt, continue iterating until
140           we find an un-synced partition.
141        2. For streams whose cursor value is determined dynamically using stream state
142            1. Get the state for the current partition
143            2. If the current partition's state is complete, continue iterating over partitions
144            3. If the current partition's state is still in progress, emit the next cursor value
145            4. If the current partition is complete as delineated by the sentinel value, get the next incomplete partition
146        3. When stream has processed all partitions, the iterator will raise a StopIteration exception signaling there are no more
147           slices left for extracting more records.
148        """
149
150        if self._read_state_from_cursor:
151            if self.current_slice is None:
152                # current_slice is None represents the first time we are iterating over a stream's slices. The first slice to
153                # sync not been assigned yet and must first be read from the iterator
154                next_slice = self.read_and_convert_slice()
155                state_for_slice = self._cursor.select_state(next_slice)
156                if state_for_slice == FULL_REFRESH_COMPLETE_STATE:
157                    # Skip every slice that already has the terminal complete value indicating that a previous attempt
158                    # successfully synced the slice
159                    has_more = True
160                    while has_more:
161                        next_slice = self.read_and_convert_slice()
162                        state_for_slice = self._cursor.select_state(next_slice)
163                        has_more = state_for_slice == FULL_REFRESH_COMPLETE_STATE
164                return StreamSlice(
165                    cursor_slice=state_for_slice or {},
166                    partition=next_slice.partition,
167                    extra_fields=next_slice.extra_fields,
168                )
169            else:
170                state_for_slice = self._cursor.select_state(self.current_slice)
171                if state_for_slice == FULL_REFRESH_COMPLETE_STATE:
172                    # If the current slice is is complete, move to the next slice and skip the next slices that already
173                    # have the terminal complete value indicating that a previous attempt was successfully read.
174                    # Dummy initialization for mypy since we'll iterate at least once to get the next slice
175                    next_candidate_slice = StreamSlice(cursor_slice={}, partition={})
176                    has_more = True
177                    while has_more:
178                        next_candidate_slice = self.read_and_convert_slice()
179                        state_for_slice = self._cursor.select_state(next_candidate_slice)
180                        has_more = state_for_slice == FULL_REFRESH_COMPLETE_STATE
181                    return StreamSlice(
182                        cursor_slice=state_for_slice or {},
183                        partition=next_candidate_slice.partition,
184                        extra_fields=next_candidate_slice.extra_fields,
185                    )
186                # The reader continues to process the current partition if it's state is still in progress
187                return StreamSlice(
188                    cursor_slice=state_for_slice or {},
189                    partition=self.current_slice.partition,
190                    extra_fields=self.current_slice.extra_fields,
191                )
192        else:
193            # Unlike RFR cursors that iterate dynamically according to how stream state is updated, most cursors operate
194            # on a fixed set of slices determined before reading records. They just iterate to the next slice
195            return self.read_and_convert_slice()
196
197    @property
198    def current_slice(self) -> Optional[StreamSlice]:
199        return self._current_slice
200
201    @current_slice.setter
202    def current_slice(self, value: StreamSlice) -> None:
203        self._current_slice = value
204
205    def read_and_convert_slice(self) -> StreamSlice:
206        next_slice = next(self._stream_slices)
207        if not isinstance(next_slice, StreamSlice):
208            raise ValueError(
209                f"{self.current_slice} should be of type StreamSlice. This is likely a bug in the CDK, please contact Airbyte support"
210            )
211        return next_slice

CursorBasedCheckpointReader is used by streams that implement a Cursor in order to manage state. This allows the checkpoint reader to delegate the complexity of fetching state to the cursor and focus on the iteration over a stream's partitions.

This reader supports the Cursor interface used by Python and low-code sources. Not to be confused with Cursor interface that belongs to the Concurrent CDK.

CursorBasedCheckpointReader( cursor: Cursor, stream_slices: Iterable[Optional[Mapping[str, Any]]], read_state_from_cursor: bool = False)
 93    def __init__(
 94        self,
 95        cursor: Cursor,
 96        stream_slices: Iterable[Optional[Mapping[str, Any]]],
 97        read_state_from_cursor: bool = False,
 98    ):
 99        self._cursor = cursor
100        self._stream_slices = iter(stream_slices)
101        # read_state_from_cursor is used to delineate that partitions should determine when to stop syncing dynamically according
102        # to the value of the state at runtime. This currently only applies to streams that use resumable full refresh.
103        self._read_state_from_cursor = read_state_from_cursor
104        self._current_slice: Optional[StreamSlice] = None
105        self._finished_sync = False
106        self._previous_state: Optional[Mapping[str, Any]] = None
def next(self) -> Optional[Mapping[str, Any]]:
108    def next(self) -> Optional[Mapping[str, Any]]:
109        try:
110            self.current_slice = self._find_next_slice()
111            return self.current_slice
112        except StopIteration:
113            self._finished_sync = True
114            return None

Returns the next slice that will be used to fetch the next group of records. Returning None indicates that the reader has finished iterating over all slices.

def observe(self, new_state: Mapping[str, Any]) -> None:
116    def observe(self, new_state: Mapping[str, Any]) -> None:
117        # Cursor based checkpoint readers don't need to observe the new state because it has already been updated by the cursor
118        # while processing records
119        pass

Updates the internal state of the checkpoint reader based on the incoming stream state from a connector.

WARNING: This is used to retain backwards compatibility with streams using the legacy get_stream_state() method. In order to uptake Resumable Full Refresh, connectors must migrate streams to use the state setter/getter methods.

def get_checkpoint(self) -> Optional[Mapping[str, Any]]:
121    def get_checkpoint(self) -> Optional[Mapping[str, Any]]:
122        # This is used to avoid sending a duplicate state messages
123        new_state = self._cursor.get_stream_state()
124        if new_state != self._previous_state:
125            self._previous_state = new_state
126            return new_state
127        else:
128            return None

Retrieves the current state value of the stream. The connector does not emit state messages if the checkpoint value is None.

current_slice: Optional[airbyte_cdk.StreamSlice]
197    @property
198    def current_slice(self) -> Optional[StreamSlice]:
199        return self._current_slice
def read_and_convert_slice(self) -> airbyte_cdk.StreamSlice:
205    def read_and_convert_slice(self) -> StreamSlice:
206        next_slice = next(self._stream_slices)
207        if not isinstance(next_slice, StreamSlice):
208            raise ValueError(
209                f"{self.current_slice} should be of type StreamSlice. This is likely a bug in the CDK, please contact Airbyte support"
210            )
211        return next_slice
class FullRefreshCheckpointReader(airbyte_cdk.sources.streams.checkpoint.CheckpointReader):
313class FullRefreshCheckpointReader(CheckpointReader):
314    """
315    FullRefreshCheckpointReader iterates over data that cannot be checkpointed incrementally during the sync because the stream
316    is not capable of managing state. At the end of a sync, a final state message is emitted to signal completion.
317    """
318
319    def __init__(self, stream_slices: Iterable[Optional[Mapping[str, Any]]]):
320        self._stream_slices = iter(stream_slices)
321        self._final_checkpoint = False
322
323    def next(self) -> Optional[Mapping[str, Any]]:
324        try:
325            return next(self._stream_slices)
326        except StopIteration:
327            self._final_checkpoint = True
328            return None
329
330    def observe(self, new_state: Mapping[str, Any]) -> None:
331        pass
332
333    def get_checkpoint(self) -> Optional[Mapping[str, Any]]:
334        if self._final_checkpoint:
335            return {"__ab_no_cursor_state_message": True}
336        return None

FullRefreshCheckpointReader iterates over data that cannot be checkpointed incrementally during the sync because the stream is not capable of managing state. At the end of a sync, a final state message is emitted to signal completion.

FullRefreshCheckpointReader(stream_slices: Iterable[Optional[Mapping[str, Any]]])
319    def __init__(self, stream_slices: Iterable[Optional[Mapping[str, Any]]]):
320        self._stream_slices = iter(stream_slices)
321        self._final_checkpoint = False
def next(self) -> Optional[Mapping[str, Any]]:
323    def next(self) -> Optional[Mapping[str, Any]]:
324        try:
325            return next(self._stream_slices)
326        except StopIteration:
327            self._final_checkpoint = True
328            return None

Returns the next slice that will be used to fetch the next group of records. Returning None indicates that the reader has finished iterating over all slices.

def observe(self, new_state: Mapping[str, Any]) -> None:
330    def observe(self, new_state: Mapping[str, Any]) -> None:
331        pass

Updates the internal state of the checkpoint reader based on the incoming stream state from a connector.

WARNING: This is used to retain backwards compatibility with streams using the legacy get_stream_state() method. In order to uptake Resumable Full Refresh, connectors must migrate streams to use the state setter/getter methods.

def get_checkpoint(self) -> Optional[Mapping[str, Any]]:
333    def get_checkpoint(self) -> Optional[Mapping[str, Any]]:
334        if self._final_checkpoint:
335            return {"__ab_no_cursor_state_message": True}
336        return None

Retrieves the current state value of the stream. The connector does not emit state messages if the checkpoint value is None.

class IncrementalCheckpointReader(airbyte_cdk.sources.streams.checkpoint.CheckpointReader):
51class IncrementalCheckpointReader(CheckpointReader):
52    """
53    IncrementalCheckpointReader handles iterating through a stream based on partitioned windows of data that are determined
54    before syncing data.
55    """
56
57    def __init__(
58        self, stream_state: Mapping[str, Any], stream_slices: Iterable[Optional[Mapping[str, Any]]]
59    ):
60        self._state: Optional[Mapping[str, Any]] = stream_state
61        self._stream_slices = iter(stream_slices)
62        self._has_slices = False
63
64    def next(self) -> Optional[Mapping[str, Any]]:
65        try:
66            next_slice = next(self._stream_slices)
67            self._has_slices = True
68            return next_slice
69        except StopIteration:
70            # This is used to avoid sending a duplicate state message at the end of a sync since the stream has already
71            # emitted state at the end of each slice. If we want to avoid this extra complexity, we can also just accept
72            # that every sync emits a final duplicate state
73            if self._has_slices:
74                self._state = None
75            return None
76
77    def observe(self, new_state: Mapping[str, Any]) -> None:
78        self._state = new_state
79
80    def get_checkpoint(self) -> Optional[Mapping[str, Any]]:
81        return self._state

IncrementalCheckpointReader handles iterating through a stream based on partitioned windows of data that are determined before syncing data.

IncrementalCheckpointReader( stream_state: Mapping[str, Any], stream_slices: Iterable[Optional[Mapping[str, Any]]])
57    def __init__(
58        self, stream_state: Mapping[str, Any], stream_slices: Iterable[Optional[Mapping[str, Any]]]
59    ):
60        self._state: Optional[Mapping[str, Any]] = stream_state
61        self._stream_slices = iter(stream_slices)
62        self._has_slices = False
def next(self) -> Optional[Mapping[str, Any]]:
64    def next(self) -> Optional[Mapping[str, Any]]:
65        try:
66            next_slice = next(self._stream_slices)
67            self._has_slices = True
68            return next_slice
69        except StopIteration:
70            # This is used to avoid sending a duplicate state message at the end of a sync since the stream has already
71            # emitted state at the end of each slice. If we want to avoid this extra complexity, we can also just accept
72            # that every sync emits a final duplicate state
73            if self._has_slices:
74                self._state = None
75            return None

Returns the next slice that will be used to fetch the next group of records. Returning None indicates that the reader has finished iterating over all slices.

def observe(self, new_state: Mapping[str, Any]) -> None:
77    def observe(self, new_state: Mapping[str, Any]) -> None:
78        self._state = new_state

Updates the internal state of the checkpoint reader based on the incoming stream state from a connector.

WARNING: This is used to retain backwards compatibility with streams using the legacy get_stream_state() method. In order to uptake Resumable Full Refresh, connectors must migrate streams to use the state setter/getter methods.

def get_checkpoint(self) -> Optional[Mapping[str, Any]]:
80    def get_checkpoint(self) -> Optional[Mapping[str, Any]]:
81        return self._state

Retrieves the current state value of the stream. The connector does not emit state messages if the checkpoint value is None.

class LegacyCursorBasedCheckpointReader(airbyte_cdk.sources.streams.checkpoint.CursorBasedCheckpointReader):
214class LegacyCursorBasedCheckpointReader(CursorBasedCheckpointReader):
215    """
216    This (unfortunate) class operates like an adapter to retain backwards compatibility with legacy sources that take in stream_slice
217    in the form of a Mapping instead of the StreamSlice object. Internally, the reader still operates over StreamSlices, but it
218    is instantiated with and emits stream slices in the form of a Mapping[str, Any]. The logic of how partitions and cursors
219    are iterated over is synonymous with CursorBasedCheckpointReader.
220
221    We also retain the existing top level fields defined by the connector so the fields are present on dependent methods. For example,
222    the resulting mapping structure passed back to the stream's read_records() method looks like:
223    {
224      "cursor_slice": {
225        "next_page_token": 10
226      },
227      "partition": {
228        "repository": "airbytehq/airbyte"
229      },
230      "next_page_token": 10,
231      "repository": "airbytehq/airbyte"
232    }
233    """
234
235    def __init__(
236        self,
237        cursor: Cursor,
238        stream_slices: Iterable[Optional[Mapping[str, Any]]],
239        read_state_from_cursor: bool = False,
240    ):
241        super().__init__(
242            cursor=cursor,
243            stream_slices=stream_slices,
244            read_state_from_cursor=read_state_from_cursor,
245        )
246
247    def next(self) -> Optional[Mapping[str, Any]]:
248        try:
249            self.current_slice = self._find_next_slice()
250
251            if "partition" in dict(self.current_slice):
252                raise ValueError("Stream is configured to use invalid stream slice key 'partition'")
253            elif "cursor_slice" in dict(self.current_slice):
254                raise ValueError(
255                    "Stream is configured to use invalid stream slice key 'cursor_slice'"
256                )
257
258            # We convert StreamSlice to a regular mapping because legacy connectors operate on the basic Mapping object. We
259            # also duplicate all fields at the top level for backwards compatibility for existing Python sources
260            return {
261                "partition": self.current_slice.partition,
262                "cursor_slice": self.current_slice.cursor_slice,
263                **dict(self.current_slice),
264            }
265        except StopIteration:
266            self._finished_sync = True
267            return None
268
269    def read_and_convert_slice(self) -> StreamSlice:
270        next_mapping_slice = next(self._stream_slices)
271        if not isinstance(next_mapping_slice, Mapping):
272            raise ValueError(
273                f"{self.current_slice} should be of type Mapping. This is likely a bug in the CDK, please contact Airbyte support"
274            )
275
276        # The legacy reader is instantiated with an iterable of stream slice mappings. We convert each into a StreamSlice
277        # to sanely process them during the sync and to reuse the existing Python defined cursors
278        return StreamSlice(
279            partition=next_mapping_slice,
280            cursor_slice={},
281        )

This (unfortunate) class operates like an adapter to retain backwards compatibility with legacy sources that take in stream_slice in the form of a Mapping instead of the StreamSlice object. Internally, the reader still operates over StreamSlices, but it is instantiated with and emits stream slices in the form of a Mapping[str, Any]. The logic of how partitions and cursors are iterated over is synonymous with CursorBasedCheckpointReader.

We also retain the existing top level fields defined by the connector so the fields are present on dependent methods. For example, the resulting mapping structure passed back to the stream's read_records() method looks like: { "cursor_slice": { "next_page_token": 10 }, "partition": { "repository": "airbytehq/airbyte" }, "next_page_token": 10, "repository": "airbytehq/airbyte" }

LegacyCursorBasedCheckpointReader( cursor: Cursor, stream_slices: Iterable[Optional[Mapping[str, Any]]], read_state_from_cursor: bool = False)
235    def __init__(
236        self,
237        cursor: Cursor,
238        stream_slices: Iterable[Optional[Mapping[str, Any]]],
239        read_state_from_cursor: bool = False,
240    ):
241        super().__init__(
242            cursor=cursor,
243            stream_slices=stream_slices,
244            read_state_from_cursor=read_state_from_cursor,
245        )
def next(self) -> Optional[Mapping[str, Any]]:
247    def next(self) -> Optional[Mapping[str, Any]]:
248        try:
249            self.current_slice = self._find_next_slice()
250
251            if "partition" in dict(self.current_slice):
252                raise ValueError("Stream is configured to use invalid stream slice key 'partition'")
253            elif "cursor_slice" in dict(self.current_slice):
254                raise ValueError(
255                    "Stream is configured to use invalid stream slice key 'cursor_slice'"
256                )
257
258            # We convert StreamSlice to a regular mapping because legacy connectors operate on the basic Mapping object. We
259            # also duplicate all fields at the top level for backwards compatibility for existing Python sources
260            return {
261                "partition": self.current_slice.partition,
262                "cursor_slice": self.current_slice.cursor_slice,
263                **dict(self.current_slice),
264            }
265        except StopIteration:
266            self._finished_sync = True
267            return None

Returns the next slice that will be used to fetch the next group of records. Returning None indicates that the reader has finished iterating over all slices.

def read_and_convert_slice(self) -> airbyte_cdk.StreamSlice:
269    def read_and_convert_slice(self) -> StreamSlice:
270        next_mapping_slice = next(self._stream_slices)
271        if not isinstance(next_mapping_slice, Mapping):
272            raise ValueError(
273                f"{self.current_slice} should be of type Mapping. This is likely a bug in the CDK, please contact Airbyte support"
274            )
275
276        # The legacy reader is instantiated with an iterable of stream slice mappings. We convert each into a StreamSlice
277        # to sanely process them during the sync and to reuse the existing Python defined cursors
278        return StreamSlice(
279            partition=next_mapping_slice,
280            cursor_slice={},
281        )
class ResumableFullRefreshCheckpointReader(airbyte_cdk.sources.streams.checkpoint.CheckpointReader):
284class ResumableFullRefreshCheckpointReader(CheckpointReader):
285    """
286    ResumableFullRefreshCheckpointReader allows for iteration over an unbounded set of records based on the pagination strategy
287    of the stream. Because the number of pages is unknown, the stream's current state is used to determine whether to continue
288    fetching more pages or stopping the sync.
289    """
290
291    def __init__(self, stream_state: Mapping[str, Any]):
292        # The first attempt of an RFR stream has an empty {} incoming state, but should still make a first attempt to read records
293        # from the first page in next().
294        self._first_page = bool(stream_state == {})
295        self._state: Mapping[str, Any] = stream_state
296
297    def next(self) -> Optional[Mapping[str, Any]]:
298        if self._first_page:
299            self._first_page = False
300            return self._state
301        elif self._state == FULL_REFRESH_COMPLETE_STATE:
302            return None
303        else:
304            return self._state
305
306    def observe(self, new_state: Mapping[str, Any]) -> None:
307        self._state = new_state
308
309    def get_checkpoint(self) -> Optional[Mapping[str, Any]]:
310        return self._state or {}

ResumableFullRefreshCheckpointReader allows for iteration over an unbounded set of records based on the pagination strategy of the stream. Because the number of pages is unknown, the stream's current state is used to determine whether to continue fetching more pages or stopping the sync.

ResumableFullRefreshCheckpointReader(stream_state: Mapping[str, Any])
291    def __init__(self, stream_state: Mapping[str, Any]):
292        # The first attempt of an RFR stream has an empty {} incoming state, but should still make a first attempt to read records
293        # from the first page in next().
294        self._first_page = bool(stream_state == {})
295        self._state: Mapping[str, Any] = stream_state
def next(self) -> Optional[Mapping[str, Any]]:
297    def next(self) -> Optional[Mapping[str, Any]]:
298        if self._first_page:
299            self._first_page = False
300            return self._state
301        elif self._state == FULL_REFRESH_COMPLETE_STATE:
302            return None
303        else:
304            return self._state

Returns the next slice that will be used to fetch the next group of records. Returning None indicates that the reader has finished iterating over all slices.

def observe(self, new_state: Mapping[str, Any]) -> None:
306    def observe(self, new_state: Mapping[str, Any]) -> None:
307        self._state = new_state

Updates the internal state of the checkpoint reader based on the incoming stream state from a connector.

WARNING: This is used to retain backwards compatibility with streams using the legacy get_stream_state() method. In order to uptake Resumable Full Refresh, connectors must migrate streams to use the state setter/getter methods.

def get_checkpoint(self) -> Optional[Mapping[str, Any]]:
309    def get_checkpoint(self) -> Optional[Mapping[str, Any]]:
310        return self._state or {}

Retrieves the current state value of the stream. The connector does not emit state messages if the checkpoint value is None.

@dataclass
class ResumableFullRefreshCursor(airbyte_cdk.sources.streams.checkpoint.Cursor):
11@dataclass
12class ResumableFullRefreshCursor(Cursor):
13    """
14    Cursor that allows for the checkpointing of sync progress according to a synthetic cursor based on the pagination state
15    of the stream. Resumable full refresh syncs are only intended to retain state in between sync attempts of the same job
16    with the platform responsible for removing said state.
17    """
18
19    def __init__(self) -> None:
20        self._cursor: StreamState = {}
21
22    def get_stream_state(self) -> StreamState:
23        return self._cursor
24
25    def set_initial_state(self, stream_state: StreamState) -> None:
26        self._cursor = stream_state
27
28    def observe(self, stream_slice: StreamSlice, record: Record) -> None:
29        """
30        Resumable full refresh manages state using a page number so it does not need to update state by observing incoming records.
31        """
32        pass
33
34    def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
35        self._cursor = stream_slice.cursor_slice
36
37    def should_be_synced(self, record: Record) -> bool:
38        """
39        Unlike date-based cursors which filter out records outside slice boundaries, resumable full refresh records exist within pages
40        that don't have filterable bounds. We should always return them.
41        """
42        return True
43
44    def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
45        """
46        RFR record don't have ordering to be compared between one another.
47        """
48        return False
49
50    def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
51        # A top-level RFR cursor only manages the state of a single partition
52        return self._cursor

Cursor that allows for the checkpointing of sync progress according to a synthetic cursor based on the pagination state of the stream. Resumable full refresh syncs are only intended to retain state in between sync attempts of the same job with the platform responsible for removing said state.

def get_stream_state(self) -> Mapping[str, Any]:
22    def get_stream_state(self) -> StreamState:
23        return self._cursor

Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it is used for two things:

  • Interpolation of the requests
  • Transformation of records
  • Saving the state

For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that allows for emitting the state to the platform.

def set_initial_state(self, stream_state: Mapping[str, Any]) -> None:
25    def set_initial_state(self, stream_state: StreamState) -> None:
26        self._cursor = stream_state

Cursors are not initialized with their state. As state is needed in order to function properly, this method should be called before calling anything else

Parameters
  • stream_state: The state of the stream as returned by get_stream_state
def observe( self, stream_slice: airbyte_cdk.StreamSlice, record: airbyte_cdk.Record) -> None:
28    def observe(self, stream_slice: StreamSlice, record: Record) -> None:
29        """
30        Resumable full refresh manages state using a page number so it does not need to update state by observing incoming records.
31        """
32        pass

Resumable full refresh manages state using a page number so it does not need to update state by observing incoming records.

def close_slice( self, stream_slice: airbyte_cdk.StreamSlice, *args: Any) -> None:
34    def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
35        self._cursor = stream_slice.cursor_slice

Update state based on the stream slice. Note that stream_slice.cursor_slice and most_recent_record.associated_slice are expected to be the same but we make it explicit here that stream_slice should be leveraged to update the state. We do not pass in the latest record, since cursor instances should maintain the relevant internal state on their own.

Parameters
  • stream_slice: slice to close
def should_be_synced(self, record: airbyte_cdk.Record) -> bool:
37    def should_be_synced(self, record: Record) -> bool:
38        """
39        Unlike date-based cursors which filter out records outside slice boundaries, resumable full refresh records exist within pages
40        that don't have filterable bounds. We should always return them.
41        """
42        return True

Unlike date-based cursors which filter out records outside slice boundaries, resumable full refresh records exist within pages that don't have filterable bounds. We should always return them.

def is_greater_than_or_equal( self, first: airbyte_cdk.Record, second: airbyte_cdk.Record) -> bool:
44    def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
45        """
46        RFR record don't have ordering to be compared between one another.
47        """
48        return False

RFR record don't have ordering to be compared between one another.

def select_state( self, stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Optional[Mapping[str, Any]]:
50    def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
51        # A top-level RFR cursor only manages the state of a single partition
52        return self._cursor

Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of a specific parent delineated by the incoming slice's partition object.