airbyte_cdk.sources.streams.checkpoint.cursor

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5from abc import ABC, abstractmethod
 6from typing import Any, Optional
 7
 8from airbyte_cdk.sources.types import Record, StreamSlice, StreamState
 9
10
11class Cursor(ABC):
12    """
13    Cursors are components that allow for checkpointing the current state of a sync. They keep track of what data has been consumed
14    and allows for syncs to be resumed from a specific point based on that information.
15    """
16
17    @abstractmethod
18    def set_initial_state(self, stream_state: StreamState) -> None:
19        """
20        Cursors are not initialized with their state. As state is needed in order to function properly, this method should be called
21        before calling anything else
22
23        :param stream_state: The state of the stream as returned by get_stream_state
24        """
25
26    def observe(self, stream_slice: StreamSlice, record: Record) -> None:
27        """
28        Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read.
29
30        :param stream_slice: The current slice, which may or may not contain the most recently observed record
31        :param record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the
32          stream state may need to be deferred depending on whether the source reliably orders records by the cursor field.
33        """
34        pass
35
36    @abstractmethod
37    def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
38        """
39        Update state based on the stream slice. Note that `stream_slice.cursor_slice` and `most_recent_record.associated_slice` are expected
40        to be the same but we make it explicit here that `stream_slice` should be leveraged to update the state. We do not pass in the
41        latest record, since cursor instances should maintain the relevant internal state on their own.
42
43        :param stream_slice: slice to close
44        """
45
46    @abstractmethod
47    def get_stream_state(self) -> StreamState:
48        """
49        Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it
50        is used for two things:
51        * Interpolation of the requests
52        * Transformation of records
53        * Saving the state
54
55        For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that
56        allows for emitting the state to the platform.
57        """
58
59    @abstractmethod
60    def should_be_synced(self, record: Record) -> bool:
61        """
62        Evaluating if a record should be synced allows for filtering and stop condition on pagination
63        """
64
65    @abstractmethod
66    def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
67        """
68        Evaluating which record is greater in terms of cursor. This is used to avoid having to capture all the records to close a slice
69        """
70
71    @abstractmethod
72    def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
73        """
74        Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in
75        a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of
76        a specific parent delineated by the incoming slice's partition object.
77        """
class Cursor(abc.ABC):
12class Cursor(ABC):
13    """
14    Cursors are components that allow for checkpointing the current state of a sync. They keep track of what data has been consumed
15    and allows for syncs to be resumed from a specific point based on that information.
16    """
17
18    @abstractmethod
19    def set_initial_state(self, stream_state: StreamState) -> None:
20        """
21        Cursors are not initialized with their state. As state is needed in order to function properly, this method should be called
22        before calling anything else
23
24        :param stream_state: The state of the stream as returned by get_stream_state
25        """
26
27    def observe(self, stream_slice: StreamSlice, record: Record) -> None:
28        """
29        Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read.
30
31        :param stream_slice: The current slice, which may or may not contain the most recently observed record
32        :param record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the
33          stream state may need to be deferred depending on whether the source reliably orders records by the cursor field.
34        """
35        pass
36
37    @abstractmethod
38    def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
39        """
40        Update state based on the stream slice. Note that `stream_slice.cursor_slice` and `most_recent_record.associated_slice` are expected
41        to be the same but we make it explicit here that `stream_slice` should be leveraged to update the state. We do not pass in the
42        latest record, since cursor instances should maintain the relevant internal state on their own.
43
44        :param stream_slice: slice to close
45        """
46
47    @abstractmethod
48    def get_stream_state(self) -> StreamState:
49        """
50        Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it
51        is used for two things:
52        * Interpolation of the requests
53        * Transformation of records
54        * Saving the state
55
56        For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that
57        allows for emitting the state to the platform.
58        """
59
60    @abstractmethod
61    def should_be_synced(self, record: Record) -> bool:
62        """
63        Evaluating if a record should be synced allows for filtering and stop condition on pagination
64        """
65
66    @abstractmethod
67    def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
68        """
69        Evaluating which record is greater in terms of cursor. This is used to avoid having to capture all the records to close a slice
70        """
71
72    @abstractmethod
73    def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
74        """
75        Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in
76        a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of
77        a specific parent delineated by the incoming slice's partition object.
78        """

Cursors are components that allow for checkpointing the current state of a sync. They keep track of what data has been consumed and allows for syncs to be resumed from a specific point based on that information.

@abstractmethod
def set_initial_state(self, stream_state: Mapping[str, Any]) -> None:
18    @abstractmethod
19    def set_initial_state(self, stream_state: StreamState) -> None:
20        """
21        Cursors are not initialized with their state. As state is needed in order to function properly, this method should be called
22        before calling anything else
23
24        :param stream_state: The state of the stream as returned by get_stream_state
25        """

Cursors are not initialized with their state. As state is needed in order to function properly, this method should be called before calling anything else

Parameters
  • stream_state: The state of the stream as returned by get_stream_state
def observe( self, stream_slice: airbyte_cdk.StreamSlice, record: airbyte_cdk.Record) -> None:
27    def observe(self, stream_slice: StreamSlice, record: Record) -> None:
28        """
29        Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read.
30
31        :param stream_slice: The current slice, which may or may not contain the most recently observed record
32        :param record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the
33          stream state may need to be deferred depending on whether the source reliably orders records by the cursor field.
34        """
35        pass

Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read.

Parameters
  • stream_slice: The current slice, which may or may not contain the most recently observed record
  • record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the stream state may need to be deferred depending on whether the source reliably orders records by the cursor field.
@abstractmethod
def close_slice( self, stream_slice: airbyte_cdk.StreamSlice, *args: Any) -> None:
37    @abstractmethod
38    def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
39        """
40        Update state based on the stream slice. Note that `stream_slice.cursor_slice` and `most_recent_record.associated_slice` are expected
41        to be the same but we make it explicit here that `stream_slice` should be leveraged to update the state. We do not pass in the
42        latest record, since cursor instances should maintain the relevant internal state on their own.
43
44        :param stream_slice: slice to close
45        """

Update state based on the stream slice. Note that stream_slice.cursor_slice and most_recent_record.associated_slice are expected to be the same but we make it explicit here that stream_slice should be leveraged to update the state. We do not pass in the latest record, since cursor instances should maintain the relevant internal state on their own.

Parameters
  • stream_slice: slice to close
@abstractmethod
def get_stream_state(self) -> Mapping[str, Any]:
47    @abstractmethod
48    def get_stream_state(self) -> StreamState:
49        """
50        Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it
51        is used for two things:
52        * Interpolation of the requests
53        * Transformation of records
54        * Saving the state
55
56        For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that
57        allows for emitting the state to the platform.
58        """

Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it is used for two things:

  • Interpolation of the requests
  • Transformation of records
  • Saving the state

For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that allows for emitting the state to the platform.

@abstractmethod
def should_be_synced(self, record: airbyte_cdk.Record) -> bool:
60    @abstractmethod
61    def should_be_synced(self, record: Record) -> bool:
62        """
63        Evaluating if a record should be synced allows for filtering and stop condition on pagination
64        """

Evaluating if a record should be synced allows for filtering and stop condition on pagination

@abstractmethod
def is_greater_than_or_equal( self, first: airbyte_cdk.Record, second: airbyte_cdk.Record) -> bool:
66    @abstractmethod
67    def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
68        """
69        Evaluating which record is greater in terms of cursor. This is used to avoid having to capture all the records to close a slice
70        """

Evaluating which record is greater in terms of cursor. This is used to avoid having to capture all the records to close a slice

@abstractmethod
def select_state( self, stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Optional[Mapping[str, Any]]:
72    @abstractmethod
73    def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
74        """
75        Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in
76        a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of
77        a specific parent delineated by the incoming slice's partition object.
78        """

Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of a specific parent delineated by the incoming slice's partition object.