airbyte_cdk.sources.streams.checkpoint.cursor
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from abc import ABC, abstractmethod 6from typing import Any, Optional 7 8from airbyte_cdk.sources.types import Record, StreamSlice, StreamState 9 10 11class Cursor(ABC): 12 """ 13 Cursors are components that allow for checkpointing the current state of a sync. They keep track of what data has been consumed 14 and allows for syncs to be resumed from a specific point based on that information. 15 """ 16 17 @abstractmethod 18 def set_initial_state(self, stream_state: StreamState) -> None: 19 """ 20 Cursors are not initialized with their state. As state is needed in order to function properly, this method should be called 21 before calling anything else 22 23 :param stream_state: The state of the stream as returned by get_stream_state 24 """ 25 26 def observe(self, stream_slice: StreamSlice, record: Record) -> None: 27 """ 28 Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read. 29 30 :param stream_slice: The current slice, which may or may not contain the most recently observed record 31 :param record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the 32 stream state may need to be deferred depending on whether the source reliably orders records by the cursor field. 33 """ 34 pass 35 36 @abstractmethod 37 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 38 """ 39 Update state based on the stream slice. Note that `stream_slice.cursor_slice` and `most_recent_record.associated_slice` are expected 40 to be the same but we make it explicit here that `stream_slice` should be leveraged to update the state. We do not pass in the 41 latest record, since cursor instances should maintain the relevant internal state on their own. 42 43 :param stream_slice: slice to close 44 """ 45 46 @abstractmethod 47 def get_stream_state(self) -> StreamState: 48 """ 49 Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it 50 is used for two things: 51 * Interpolation of the requests 52 * Transformation of records 53 * Saving the state 54 55 For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that 56 allows for emitting the state to the platform. 57 """ 58 59 @abstractmethod 60 def should_be_synced(self, record: Record) -> bool: 61 """ 62 Evaluating if a record should be synced allows for filtering and stop condition on pagination 63 """ 64 65 @abstractmethod 66 def is_greater_than_or_equal(self, first: Record, second: Record) -> bool: 67 """ 68 Evaluating which record is greater in terms of cursor. This is used to avoid having to capture all the records to close a slice 69 """ 70 71 @abstractmethod 72 def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: 73 """ 74 Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in 75 a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of 76 a specific parent delineated by the incoming slice's partition object. 77 """
12class Cursor(ABC): 13 """ 14 Cursors are components that allow for checkpointing the current state of a sync. They keep track of what data has been consumed 15 and allows for syncs to be resumed from a specific point based on that information. 16 """ 17 18 @abstractmethod 19 def set_initial_state(self, stream_state: StreamState) -> None: 20 """ 21 Cursors are not initialized with their state. As state is needed in order to function properly, this method should be called 22 before calling anything else 23 24 :param stream_state: The state of the stream as returned by get_stream_state 25 """ 26 27 def observe(self, stream_slice: StreamSlice, record: Record) -> None: 28 """ 29 Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read. 30 31 :param stream_slice: The current slice, which may or may not contain the most recently observed record 32 :param record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the 33 stream state may need to be deferred depending on whether the source reliably orders records by the cursor field. 34 """ 35 pass 36 37 @abstractmethod 38 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 39 """ 40 Update state based on the stream slice. Note that `stream_slice.cursor_slice` and `most_recent_record.associated_slice` are expected 41 to be the same but we make it explicit here that `stream_slice` should be leveraged to update the state. We do not pass in the 42 latest record, since cursor instances should maintain the relevant internal state on their own. 43 44 :param stream_slice: slice to close 45 """ 46 47 @abstractmethod 48 def get_stream_state(self) -> StreamState: 49 """ 50 Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it 51 is used for two things: 52 * Interpolation of the requests 53 * Transformation of records 54 * Saving the state 55 56 For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that 57 allows for emitting the state to the platform. 58 """ 59 60 @abstractmethod 61 def should_be_synced(self, record: Record) -> bool: 62 """ 63 Evaluating if a record should be synced allows for filtering and stop condition on pagination 64 """ 65 66 @abstractmethod 67 def is_greater_than_or_equal(self, first: Record, second: Record) -> bool: 68 """ 69 Evaluating which record is greater in terms of cursor. This is used to avoid having to capture all the records to close a slice 70 """ 71 72 @abstractmethod 73 def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: 74 """ 75 Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in 76 a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of 77 a specific parent delineated by the incoming slice's partition object. 78 """
Cursors are components that allow for checkpointing the current state of a sync. They keep track of what data has been consumed and allows for syncs to be resumed from a specific point based on that information.
18 @abstractmethod 19 def set_initial_state(self, stream_state: StreamState) -> None: 20 """ 21 Cursors are not initialized with their state. As state is needed in order to function properly, this method should be called 22 before calling anything else 23 24 :param stream_state: The state of the stream as returned by get_stream_state 25 """
Cursors are not initialized with their state. As state is needed in order to function properly, this method should be called before calling anything else
Parameters
- stream_state: The state of the stream as returned by get_stream_state
27 def observe(self, stream_slice: StreamSlice, record: Record) -> None: 28 """ 29 Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read. 30 31 :param stream_slice: The current slice, which may or may not contain the most recently observed record 32 :param record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the 33 stream state may need to be deferred depending on whether the source reliably orders records by the cursor field. 34 """ 35 pass
Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read.
Parameters
- stream_slice: The current slice, which may or may not contain the most recently observed record
- record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the stream state may need to be deferred depending on whether the source reliably orders records by the cursor field.
37 @abstractmethod 38 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 39 """ 40 Update state based on the stream slice. Note that `stream_slice.cursor_slice` and `most_recent_record.associated_slice` are expected 41 to be the same but we make it explicit here that `stream_slice` should be leveraged to update the state. We do not pass in the 42 latest record, since cursor instances should maintain the relevant internal state on their own. 43 44 :param stream_slice: slice to close 45 """
Update state based on the stream slice. Note that stream_slice.cursor_slice
and most_recent_record.associated_slice
are expected
to be the same but we make it explicit here that stream_slice
should be leveraged to update the state. We do not pass in the
latest record, since cursor instances should maintain the relevant internal state on their own.
Parameters
- stream_slice: slice to close
47 @abstractmethod 48 def get_stream_state(self) -> StreamState: 49 """ 50 Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it 51 is used for two things: 52 * Interpolation of the requests 53 * Transformation of records 54 * Saving the state 55 56 For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that 57 allows for emitting the state to the platform. 58 """
Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it is used for two things:
- Interpolation of the requests
- Transformation of records
- Saving the state
For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that allows for emitting the state to the platform.
60 @abstractmethod 61 def should_be_synced(self, record: Record) -> bool: 62 """ 63 Evaluating if a record should be synced allows for filtering and stop condition on pagination 64 """
Evaluating if a record should be synced allows for filtering and stop condition on pagination
66 @abstractmethod 67 def is_greater_than_or_equal(self, first: Record, second: Record) -> bool: 68 """ 69 Evaluating which record is greater in terms of cursor. This is used to avoid having to capture all the records to close a slice 70 """
Evaluating which record is greater in terms of cursor. This is used to avoid having to capture all the records to close a slice
72 @abstractmethod 73 def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: 74 """ 75 Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in 76 a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of 77 a specific parent delineated by the incoming slice's partition object. 78 """
Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of a specific parent delineated by the incoming slice's partition object.