airbyte_cdk.sources.declarative.incremental

 1#
 2# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
 3#
 4
 5from airbyte_cdk.sources.declarative.incremental.concurrent_partition_cursor import (
 6    ConcurrentCursorFactory,
 7    ConcurrentPerPartitionCursor,
 8)
 9from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor
10from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
11from airbyte_cdk.sources.declarative.incremental.global_substream_cursor import (
12    GlobalSubstreamCursor,
13)
14from airbyte_cdk.sources.declarative.incremental.per_partition_cursor import (
15    CursorFactory,
16    PerPartitionCursor,
17)
18from airbyte_cdk.sources.declarative.incremental.per_partition_with_global import (
19    PerPartitionWithGlobalCursor,
20)
21from airbyte_cdk.sources.declarative.incremental.resumable_full_refresh_cursor import (
22    ChildPartitionResumableFullRefreshCursor,
23    ResumableFullRefreshCursor,
24)
25
26__all__ = [
27    "CursorFactory",
28    "ConcurrentCursorFactory",
29    "ConcurrentPerPartitionCursor",
30    "DatetimeBasedCursor",
31    "DeclarativeCursor",
32    "GlobalSubstreamCursor",
33    "PerPartitionCursor",
34    "PerPartitionWithGlobalCursor",
35    "ResumableFullRefreshCursor",
36    "ChildPartitionResumableFullRefreshCursor",
37]
class CursorFactory:
20class CursorFactory:
21    def __init__(self, create_function: Callable[[], DeclarativeCursor]):
22        self._create_function = create_function
23
24    def create(self) -> DeclarativeCursor:
25        return self._create_function()
CursorFactory( create_function: Callable[[], DeclarativeCursor])
21    def __init__(self, create_function: Callable[[], DeclarativeCursor]):
22        self._create_function = create_function
def create( self) -> DeclarativeCursor:
24    def create(self) -> DeclarativeCursor:
25        return self._create_function()
class ConcurrentCursorFactory:
35class ConcurrentCursorFactory:
36    def __init__(self, create_function: Callable[..., ConcurrentCursor]):
37        self._create_function = create_function
38
39    def create(
40        self, stream_state: Mapping[str, Any], runtime_lookback_window: Optional[timedelta]
41    ) -> ConcurrentCursor:
42        return self._create_function(
43            stream_state=stream_state, runtime_lookback_window=runtime_lookback_window
44        )
ConcurrentCursorFactory( create_function: Callable[..., airbyte_cdk.ConcurrentCursor])
36    def __init__(self, create_function: Callable[..., ConcurrentCursor]):
37        self._create_function = create_function
def create( self, stream_state: Mapping[str, Any], runtime_lookback_window: Optional[datetime.timedelta]) -> airbyte_cdk.ConcurrentCursor:
39    def create(
40        self, stream_state: Mapping[str, Any], runtime_lookback_window: Optional[timedelta]
41    ) -> ConcurrentCursor:
42        return self._create_function(
43            stream_state=stream_state, runtime_lookback_window=runtime_lookback_window
44        )
class ConcurrentPerPartitionCursor(airbyte_cdk.sources.streams.concurrent.cursor.Cursor):
 47class ConcurrentPerPartitionCursor(Cursor):
 48    """
 49    Manages state per partition when a stream has many partitions, preventing data loss or duplication.
 50
 51    Attributes:
 52        DEFAULT_MAX_PARTITIONS_NUMBER (int): Maximum number of partitions to retain in memory (default is 10,000).
 53
 54    - **Partition Limitation Logic**
 55      Ensures the number of tracked partitions does not exceed the specified limit to prevent memory overuse. Oldest partitions are removed when the limit is reached.
 56
 57    - **Global Cursor Fallback**
 58      New partitions use global state as the initial state to progress the state for deleted or new partitions. The history data added after the initial sync will be missing.
 59
 60    CurrentPerPartitionCursor expects the state of the ConcurrentCursor to follow the format {cursor_field: cursor_value}.
 61    """
 62
 63    DEFAULT_MAX_PARTITIONS_NUMBER = 25_000
 64    SWITCH_TO_GLOBAL_LIMIT = 10_000
 65    _NO_STATE: Mapping[str, Any] = {}
 66    _NO_CURSOR_STATE: Mapping[str, Any] = {}
 67    _GLOBAL_STATE_KEY = "state"
 68    _PERPARTITION_STATE_KEY = "states"
 69    _KEY = 0
 70    _VALUE = 1
 71
 72    def __init__(
 73        self,
 74        cursor_factory: ConcurrentCursorFactory,
 75        partition_router: PartitionRouter,
 76        stream_name: str,
 77        stream_namespace: Optional[str],
 78        stream_state: Any,
 79        message_repository: MessageRepository,
 80        connector_state_manager: ConnectorStateManager,
 81        connector_state_converter: AbstractStreamStateConverter,
 82        cursor_field: CursorField,
 83    ) -> None:
 84        self._global_cursor: Optional[StreamState] = {}
 85        self._stream_name = stream_name
 86        self._stream_namespace = stream_namespace
 87        self._message_repository = message_repository
 88        self._connector_state_manager = connector_state_manager
 89        self._connector_state_converter = connector_state_converter
 90        self._cursor_field = cursor_field
 91
 92        self._cursor_factory = cursor_factory
 93        self._partition_router = partition_router
 94
 95        # The dict is ordered to ensure that once the maximum number of partitions is reached,
 96        # the oldest partitions can be efficiently removed, maintaining the most recent partitions.
 97        self._cursor_per_partition: OrderedDict[str, ConcurrentCursor] = OrderedDict()
 98        self._semaphore_per_partition: OrderedDict[str, threading.Semaphore] = OrderedDict()
 99
100        # Parent-state tracking: store each partition’s parent state in creation order
101        self._partition_parent_state_map: OrderedDict[str, Mapping[str, Any]] = OrderedDict()
102
103        self._finished_partitions: set[str] = set()
104        self._lock = threading.Lock()
105        self._timer = Timer()
106        self._new_global_cursor: Optional[StreamState] = None
107        self._lookback_window: int = 0
108        self._parent_state: Optional[StreamState] = None
109        self._number_of_partitions: int = 0
110        self._use_global_cursor: bool = False
111        self._partition_serializer = PerPartitionKeySerializer()
112        # Track the last time a state message was emitted
113        self._last_emission_time: float = 0.0
114
115        self._set_initial_state(stream_state)
116
117    @property
118    def cursor_field(self) -> CursorField:
119        return self._cursor_field
120
121    @property
122    def state(self) -> MutableMapping[str, Any]:
123        state: dict[str, Any] = {"use_global_cursor": self._use_global_cursor}
124        if not self._use_global_cursor:
125            states = []
126            for partition_tuple, cursor in self._cursor_per_partition.items():
127                if cursor.state:
128                    states.append(
129                        {
130                            "partition": self._to_dict(partition_tuple),
131                            "cursor": copy.deepcopy(cursor.state),
132                        }
133                    )
134            state[self._PERPARTITION_STATE_KEY] = states
135
136        if self._global_cursor:
137            state[self._GLOBAL_STATE_KEY] = self._global_cursor
138        if self._lookback_window is not None:
139            state["lookback_window"] = self._lookback_window
140        if self._parent_state is not None:
141            state["parent_state"] = self._parent_state
142        return state
143
144    def close_partition(self, partition: Partition) -> None:
145        # Attempt to retrieve the stream slice
146        stream_slice: Optional[StreamSlice] = partition.to_slice()  # type: ignore[assignment]
147
148        # Ensure stream_slice is not None
149        if stream_slice is None:
150            raise ValueError("stream_slice cannot be None")
151
152        partition_key = self._to_partition_key(stream_slice.partition)
153        with self._lock:
154            self._semaphore_per_partition[partition_key].acquire()
155            if not self._use_global_cursor:
156                self._cursor_per_partition[partition_key].close_partition(partition=partition)
157                cursor = self._cursor_per_partition[partition_key]
158                if (
159                    partition_key in self._finished_partitions
160                    and self._semaphore_per_partition[partition_key]._value == 0
161                ):
162                    self._update_global_cursor(cursor.state[self.cursor_field.cursor_field_key])
163
164            self._check_and_update_parent_state()
165
166            self._emit_state_message()
167
168    def _check_and_update_parent_state(self) -> None:
169        """
170        Pop the leftmost partition state from _partition_parent_state_map only if
171        *all partitions* up to (and including) that partition key in _semaphore_per_partition
172        are fully finished (i.e. in _finished_partitions and semaphore._value == 0).
173        Additionally, delete finished semaphores with a value of 0 to free up memory,
174        as they are only needed to track errors and completion status.
175        """
176        last_closed_state = None
177
178        while self._partition_parent_state_map:
179            # Look at the earliest partition key in creation order
180            earliest_key = next(iter(self._partition_parent_state_map))
181
182            # Verify ALL partitions from the left up to earliest_key are finished
183            all_left_finished = True
184            for p_key, sem in list(
185                self._semaphore_per_partition.items()
186            ):  # Use list to allow modification during iteration
187                # If any earlier partition is still not finished, we must stop
188                if p_key not in self._finished_partitions or sem._value != 0:
189                    all_left_finished = False
190                    break
191                # Once we've reached earliest_key in the semaphore order, we can stop checking
192                if p_key == earliest_key:
193                    break
194
195            # If the partitions up to earliest_key are not all finished, break the while-loop
196            if not all_left_finished:
197                break
198
199            # Pop the leftmost entry from parent-state map
200            _, closed_parent_state = self._partition_parent_state_map.popitem(last=False)
201            last_closed_state = closed_parent_state
202
203            # Clean up finished semaphores with value 0 up to and including earliest_key
204            for p_key in list(self._semaphore_per_partition.keys()):
205                sem = self._semaphore_per_partition[p_key]
206                if p_key in self._finished_partitions and sem._value == 0:
207                    del self._semaphore_per_partition[p_key]
208                    logger.debug(f"Deleted finished semaphore for partition {p_key} with value 0")
209                if p_key == earliest_key:
210                    break
211
212        # Update _parent_state if we popped at least one partition
213        if last_closed_state is not None:
214            self._parent_state = last_closed_state
215
216    def ensure_at_least_one_state_emitted(self) -> None:
217        """
218        The platform expects at least one state message on successful syncs. Hence, whatever happens, we expect this method to be
219        called.
220        """
221        if not any(
222            semaphore_item[1]._value for semaphore_item in self._semaphore_per_partition.items()
223        ):
224            self._global_cursor = self._new_global_cursor
225            self._lookback_window = self._timer.finish()
226            self._parent_state = self._partition_router.get_stream_state()
227        self._emit_state_message(throttle=False)
228
229    def _throttle_state_message(self) -> Optional[float]:
230        """
231        Throttles the state message emission to once every 60 seconds.
232        """
233        current_time = time.time()
234        if current_time - self._last_emission_time <= 60:
235            return None
236        return current_time
237
238    def _emit_state_message(self, throttle: bool = True) -> None:
239        if throttle:
240            current_time = self._throttle_state_message()
241            if current_time is None:
242                return
243            self._last_emission_time = current_time
244        self._connector_state_manager.update_state_for_stream(
245            self._stream_name,
246            self._stream_namespace,
247            self.state,
248        )
249        state_message = self._connector_state_manager.create_state_message(
250            self._stream_name, self._stream_namespace
251        )
252        self._message_repository.emit_message(state_message)
253
254    def stream_slices(self) -> Iterable[StreamSlice]:
255        if self._timer.is_running():
256            raise RuntimeError("stream_slices has been executed more than once.")
257
258        slices = self._partition_router.stream_slices()
259        self._timer.start()
260        for partition, last, parent_state in iterate_with_last_flag_and_state(
261            slices, self._partition_router.get_stream_state
262        ):
263            yield from self._generate_slices_from_partition(partition, parent_state)
264
265    def _generate_slices_from_partition(
266        self, partition: StreamSlice, parent_state: Mapping[str, Any]
267    ) -> Iterable[StreamSlice]:
268        # Ensure the maximum number of partitions is not exceeded
269        self._ensure_partition_limit()
270
271        partition_key = self._to_partition_key(partition.partition)
272
273        cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition))
274        if not cursor:
275            cursor = self._create_cursor(
276                self._global_cursor,
277                self._lookback_window if self._global_cursor else 0,
278            )
279            with self._lock:
280                self._number_of_partitions += 1
281                self._cursor_per_partition[partition_key] = cursor
282        self._semaphore_per_partition[partition_key] = threading.Semaphore(0)
283
284        with self._lock:
285            if (
286                len(self._partition_parent_state_map) == 0
287                or self._partition_parent_state_map[
288                    next(reversed(self._partition_parent_state_map))
289                ]
290                != parent_state
291            ):
292                self._partition_parent_state_map[partition_key] = deepcopy(parent_state)
293
294        for cursor_slice, is_last_slice, _ in iterate_with_last_flag_and_state(
295            cursor.stream_slices(),
296            lambda: None,
297        ):
298            self._semaphore_per_partition[partition_key].release()
299            if is_last_slice:
300                self._finished_partitions.add(partition_key)
301            yield StreamSlice(
302                partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields
303            )
304
305    def _ensure_partition_limit(self) -> None:
306        """
307        Ensure the maximum number of partitions does not exceed the predefined limit.
308
309        Steps:
310        1. Attempt to remove partitions that are marked as finished in `_finished_partitions`.
311           These partitions are considered processed and safe to delete.
312        2. If the limit is still exceeded and no finished partitions are available for removal,
313           remove the oldest partition unconditionally. We expect failed partitions to be removed.
314
315        Logging:
316        - Logs a warning each time a partition is removed, indicating whether it was finished
317          or removed due to being the oldest.
318        """
319        if not self._use_global_cursor and self.limit_reached():
320            logger.info(
321                f"Exceeded the 'SWITCH_TO_GLOBAL_LIMIT' of {self.SWITCH_TO_GLOBAL_LIMIT}. "
322                f"Switching to global cursor for {self._stream_name}."
323            )
324            self._use_global_cursor = True
325
326        with self._lock:
327            while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1:
328                # Try removing finished partitions first
329                for partition_key in list(self._cursor_per_partition.keys()):
330                    if partition_key in self._finished_partitions and (
331                        partition_key not in self._semaphore_per_partition
332                        or self._semaphore_per_partition[partition_key]._value == 0
333                    ):
334                        oldest_partition = self._cursor_per_partition.pop(
335                            partition_key
336                        )  # Remove the oldest partition
337                        logger.warning(
338                            f"The maximum number of partitions has been reached. Dropping the oldest finished partition: {oldest_partition}. Over limit: {self._number_of_partitions - self.DEFAULT_MAX_PARTITIONS_NUMBER}."
339                        )
340                        break
341                else:
342                    # If no finished partitions can be removed, fall back to removing the oldest partition
343                    oldest_partition = self._cursor_per_partition.popitem(last=False)[
344                        1
345                    ]  # Remove the oldest partition
346                    logger.warning(
347                        f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._number_of_partitions - self.DEFAULT_MAX_PARTITIONS_NUMBER}."
348                    )
349
350    def _set_initial_state(self, stream_state: StreamState) -> None:
351        """
352        Initialize the cursor's state using the provided `stream_state`.
353
354        This method supports global and per-partition state initialization.
355
356        - **Global State**: If `states` is missing, the `state` is treated as global and applied to all partitions.
357          The `global state` holds a single cursor position representing the latest processed record across all partitions.
358
359        - **Lookback Window**: Configured via `lookback_window`, it defines the period (in seconds) for reprocessing records.
360          This ensures robustness in case of upstream data delays or reordering. If not specified, it defaults to 0.
361
362        - **Per-Partition State**: If `states` is present, each partition's cursor state is initialized separately.
363
364        - **Parent State**: (if available) Used to initialize partition routers based on parent streams.
365
366        Args:
367            stream_state (StreamState): The state of the streams to be set. The format of the stream state should be:
368                {
369                    "states": [
370                        {
371                            "partition": {
372                                "partition_key": "value"
373                            },
374                            "cursor": {
375                                "last_updated": "2023-05-27T00:00:00Z"
376                            }
377                        }
378                    ],
379                    "state": {
380                        "last_updated": "2023-05-27T00:00:00Z"
381                    },
382                    lookback_window: 10,
383                    "parent_state": {
384                        "parent_stream_name": {
385                            "last_updated": "2023-05-27T00:00:00Z"
386                        }
387                    }
388                }
389        """
390        if not stream_state:
391            return
392
393        if (
394            self._PERPARTITION_STATE_KEY not in stream_state
395            and self._GLOBAL_STATE_KEY not in stream_state
396        ):
397            # We assume that `stream_state` is in a global format that can be applied to all partitions.
398            # Example: {"global_state_format_key": "global_state_format_value"}
399            self._set_global_state(stream_state)
400
401        else:
402            self._use_global_cursor = stream_state.get("use_global_cursor", False)
403
404            self._lookback_window = int(stream_state.get("lookback_window", 0))
405
406            for state in stream_state.get(self._PERPARTITION_STATE_KEY, []):
407                self._number_of_partitions += 1
408                self._cursor_per_partition[self._to_partition_key(state["partition"])] = (
409                    self._create_cursor(state["cursor"])
410                )
411
412            # set default state for missing partitions if it is per partition with fallback to global
413            if self._GLOBAL_STATE_KEY in stream_state:
414                self._set_global_state(stream_state[self._GLOBAL_STATE_KEY])
415
416        # Set initial parent state
417        if stream_state.get("parent_state"):
418            self._parent_state = stream_state["parent_state"]
419
420        # Set parent state for partition routers based on parent streams
421        self._partition_router.set_initial_state(stream_state)
422
423    def _set_global_state(self, stream_state: Mapping[str, Any]) -> None:
424        """
425        Initializes the global cursor state from the provided stream state.
426
427        If the cursor field key is present in the stream state, its value is parsed,
428        formatted, and stored as the global cursor. This ensures consistency in state
429        representation across partitions.
430        """
431        if self.cursor_field.cursor_field_key in stream_state:
432            global_state_value = stream_state[self.cursor_field.cursor_field_key]
433            final_format_global_state_value = self._connector_state_converter.output_format(
434                self._connector_state_converter.parse_value(global_state_value)
435            )
436
437            fixed_global_state = {
438                self.cursor_field.cursor_field_key: final_format_global_state_value
439            }
440
441            self._global_cursor = deepcopy(fixed_global_state)
442            self._new_global_cursor = deepcopy(fixed_global_state)
443
444    def observe(self, record: Record) -> None:
445        if not record.associated_slice:
446            raise ValueError(
447                "Invalid state as stream slices that are emitted should refer to an existing cursor"
448            )
449
450        record_cursor = self._connector_state_converter.output_format(
451            self._connector_state_converter.parse_value(self._cursor_field.extract_value(record))
452        )
453        self._update_global_cursor(record_cursor)
454        if not self._use_global_cursor:
455            self._cursor_per_partition[
456                self._to_partition_key(record.associated_slice.partition)
457            ].observe(record)
458
459    def _update_global_cursor(self, value: Any) -> None:
460        if (
461            self._new_global_cursor is None
462            or self._new_global_cursor[self.cursor_field.cursor_field_key] < value
463        ):
464            self._new_global_cursor = {self.cursor_field.cursor_field_key: copy.deepcopy(value)}
465
466    def _to_partition_key(self, partition: Mapping[str, Any]) -> str:
467        return self._partition_serializer.to_partition_key(partition)
468
469    def _to_dict(self, partition_key: str) -> Mapping[str, Any]:
470        return self._partition_serializer.to_partition(partition_key)
471
472    def _create_cursor(
473        self, cursor_state: Any, runtime_lookback_window: int = 0
474    ) -> ConcurrentCursor:
475        cursor = self._cursor_factory.create(
476            stream_state=deepcopy(cursor_state),
477            runtime_lookback_window=timedelta(seconds=runtime_lookback_window),
478        )
479        return cursor
480
481    def should_be_synced(self, record: Record) -> bool:
482        return self._get_cursor(record).should_be_synced(record)
483
484    def _get_cursor(self, record: Record) -> ConcurrentCursor:
485        if not record.associated_slice:
486            raise ValueError(
487                "Invalid state as stream slices that are emitted should refer to an existing cursor"
488            )
489        partition_key = self._to_partition_key(record.associated_slice.partition)
490        if partition_key not in self._cursor_per_partition:
491            raise ValueError(
492                "Invalid state as stream slices that are emitted should refer to an existing cursor"
493            )
494        cursor = self._cursor_per_partition[partition_key]
495        return cursor
496
497    def limit_reached(self) -> bool:
498        return self._number_of_partitions > self.SWITCH_TO_GLOBAL_LIMIT

Manages state per partition when a stream has many partitions, preventing data loss or duplication.

Attributes:
  • DEFAULT_MAX_PARTITIONS_NUMBER (int): Maximum number of partitions to retain in memory (default is 10,000).

    • Partition Limitation Logic Ensures the number of tracked partitions does not exceed the specified limit to prevent memory overuse. Oldest partitions are removed when the limit is reached.

    • Global Cursor Fallback New partitions use global state as the initial state to progress the state for deleted or new partitions. The history data added after the initial sync will be missing.

CurrentPerPartitionCursor expects the state of the ConcurrentCursor to follow the format {cursor_field: cursor_value}.

ConcurrentPerPartitionCursor( cursor_factory: ConcurrentCursorFactory, partition_router: airbyte_cdk.sources.declarative.partition_routers.PartitionRouter, stream_name: str, stream_namespace: Optional[str], stream_state: Any, message_repository: airbyte_cdk.MessageRepository, connector_state_manager: airbyte_cdk.ConnectorStateManager, connector_state_converter: airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter.AbstractStreamStateConverter, cursor_field: airbyte_cdk.CursorField)
 72    def __init__(
 73        self,
 74        cursor_factory: ConcurrentCursorFactory,
 75        partition_router: PartitionRouter,
 76        stream_name: str,
 77        stream_namespace: Optional[str],
 78        stream_state: Any,
 79        message_repository: MessageRepository,
 80        connector_state_manager: ConnectorStateManager,
 81        connector_state_converter: AbstractStreamStateConverter,
 82        cursor_field: CursorField,
 83    ) -> None:
 84        self._global_cursor: Optional[StreamState] = {}
 85        self._stream_name = stream_name
 86        self._stream_namespace = stream_namespace
 87        self._message_repository = message_repository
 88        self._connector_state_manager = connector_state_manager
 89        self._connector_state_converter = connector_state_converter
 90        self._cursor_field = cursor_field
 91
 92        self._cursor_factory = cursor_factory
 93        self._partition_router = partition_router
 94
 95        # The dict is ordered to ensure that once the maximum number of partitions is reached,
 96        # the oldest partitions can be efficiently removed, maintaining the most recent partitions.
 97        self._cursor_per_partition: OrderedDict[str, ConcurrentCursor] = OrderedDict()
 98        self._semaphore_per_partition: OrderedDict[str, threading.Semaphore] = OrderedDict()
 99
100        # Parent-state tracking: store each partition’s parent state in creation order
101        self._partition_parent_state_map: OrderedDict[str, Mapping[str, Any]] = OrderedDict()
102
103        self._finished_partitions: set[str] = set()
104        self._lock = threading.Lock()
105        self._timer = Timer()
106        self._new_global_cursor: Optional[StreamState] = None
107        self._lookback_window: int = 0
108        self._parent_state: Optional[StreamState] = None
109        self._number_of_partitions: int = 0
110        self._use_global_cursor: bool = False
111        self._partition_serializer = PerPartitionKeySerializer()
112        # Track the last time a state message was emitted
113        self._last_emission_time: float = 0.0
114
115        self._set_initial_state(stream_state)
DEFAULT_MAX_PARTITIONS_NUMBER = 25000
SWITCH_TO_GLOBAL_LIMIT = 10000
cursor_field: airbyte_cdk.CursorField
117    @property
118    def cursor_field(self) -> CursorField:
119        return self._cursor_field
state: MutableMapping[str, Any]
121    @property
122    def state(self) -> MutableMapping[str, Any]:
123        state: dict[str, Any] = {"use_global_cursor": self._use_global_cursor}
124        if not self._use_global_cursor:
125            states = []
126            for partition_tuple, cursor in self._cursor_per_partition.items():
127                if cursor.state:
128                    states.append(
129                        {
130                            "partition": self._to_dict(partition_tuple),
131                            "cursor": copy.deepcopy(cursor.state),
132                        }
133                    )
134            state[self._PERPARTITION_STATE_KEY] = states
135
136        if self._global_cursor:
137            state[self._GLOBAL_STATE_KEY] = self._global_cursor
138        if self._lookback_window is not None:
139            state["lookback_window"] = self._lookback_window
140        if self._parent_state is not None:
141            state["parent_state"] = self._parent_state
142        return state
def close_partition( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition) -> None:
144    def close_partition(self, partition: Partition) -> None:
145        # Attempt to retrieve the stream slice
146        stream_slice: Optional[StreamSlice] = partition.to_slice()  # type: ignore[assignment]
147
148        # Ensure stream_slice is not None
149        if stream_slice is None:
150            raise ValueError("stream_slice cannot be None")
151
152        partition_key = self._to_partition_key(stream_slice.partition)
153        with self._lock:
154            self._semaphore_per_partition[partition_key].acquire()
155            if not self._use_global_cursor:
156                self._cursor_per_partition[partition_key].close_partition(partition=partition)
157                cursor = self._cursor_per_partition[partition_key]
158                if (
159                    partition_key in self._finished_partitions
160                    and self._semaphore_per_partition[partition_key]._value == 0
161                ):
162                    self._update_global_cursor(cursor.state[self.cursor_field.cursor_field_key])
163
164            self._check_and_update_parent_state()
165
166            self._emit_state_message()

Indicate to the cursor that the partition has been successfully processed

def ensure_at_least_one_state_emitted(self) -> None:
216    def ensure_at_least_one_state_emitted(self) -> None:
217        """
218        The platform expects at least one state message on successful syncs. Hence, whatever happens, we expect this method to be
219        called.
220        """
221        if not any(
222            semaphore_item[1]._value for semaphore_item in self._semaphore_per_partition.items()
223        ):
224            self._global_cursor = self._new_global_cursor
225            self._lookback_window = self._timer.finish()
226            self._parent_state = self._partition_router.get_stream_state()
227        self._emit_state_message(throttle=False)

The platform expects at least one state message on successful syncs. Hence, whatever happens, we expect this method to be called.

def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
254    def stream_slices(self) -> Iterable[StreamSlice]:
255        if self._timer.is_running():
256            raise RuntimeError("stream_slices has been executed more than once.")
257
258        slices = self._partition_router.stream_slices()
259        self._timer.start()
260        for partition, last, parent_state in iterate_with_last_flag_and_state(
261            slices, self._partition_router.get_stream_state
262        ):
263            yield from self._generate_slices_from_partition(partition, parent_state)

Default placeholder implementation of generate_slices. Subclasses can override this method to provide actual behavior.

def observe(self, record: airbyte_cdk.Record) -> None:
444    def observe(self, record: Record) -> None:
445        if not record.associated_slice:
446            raise ValueError(
447                "Invalid state as stream slices that are emitted should refer to an existing cursor"
448            )
449
450        record_cursor = self._connector_state_converter.output_format(
451            self._connector_state_converter.parse_value(self._cursor_field.extract_value(record))
452        )
453        self._update_global_cursor(record_cursor)
454        if not self._use_global_cursor:
455            self._cursor_per_partition[
456                self._to_partition_key(record.associated_slice.partition)
457            ].observe(record)

Indicate to the cursor that the record has been emitted

def should_be_synced(self, record: airbyte_cdk.Record) -> bool:
481    def should_be_synced(self, record: Record) -> bool:
482        return self._get_cursor(record).should_be_synced(record)
def limit_reached(self) -> bool:
497    def limit_reached(self) -> bool:
498        return self._number_of_partitions > self.SWITCH_TO_GLOBAL_LIMIT
@dataclass
class DatetimeBasedCursor(airbyte_cdk.sources.declarative.incremental.DeclarativeCursor):
 28@dataclass
 29class DatetimeBasedCursor(DeclarativeCursor):
 30    """
 31    Slices the stream over a datetime range and create a state with format {<cursor_field>: <datetime> }
 32
 33    Given a start time, end time, a step function, and an optional lookback window,
 34    the stream slicer will partition the date range from start time - lookback window to end time.
 35
 36    The step function is defined as a string of the form ISO8601 duration
 37
 38    The timestamp format accepts the same format codes as datetime.strfptime, which are
 39    all the format codes required by the 1989 C standard.
 40    Full list of accepted format codes: https://man7.org/linux/man-pages/man3/strftime.3.html
 41
 42    Attributes:
 43        start_datetime (Union[MinMaxDatetime, str]): the datetime that determines the earliest record that should be synced
 44        end_datetime (Optional[Union[MinMaxDatetime, str]]): the datetime that determines the last record that should be synced
 45        cursor_field (Union[InterpolatedString, str]): record's cursor field
 46        datetime_format (str): format of the datetime
 47        step (Optional[str]): size of the timewindow (ISO8601 duration)
 48        cursor_granularity (Optional[str]): smallest increment the datetime_format has (ISO 8601 duration) that will be used to ensure that the start of a slice does not overlap with the end of the previous one
 49        config (Config): connection config
 50        start_time_option (Optional[RequestOption]): request option for start time
 51        end_time_option (Optional[RequestOption]): request option for end time
 52        partition_field_start (Optional[str]): partition start time field
 53        partition_field_end (Optional[str]): stream slice end time field
 54        lookback_window (Optional[InterpolatedString]): how many days before start_datetime to read data for (ISO8601 duration)
 55    """
 56
 57    start_datetime: Union[MinMaxDatetime, str]
 58    cursor_field: Union[InterpolatedString, str]
 59    datetime_format: str
 60    config: Config
 61    parameters: InitVar[Mapping[str, Any]]
 62    _highest_observed_cursor_field_value: Optional[str] = field(
 63        repr=False, default=None
 64    )  # tracks the latest observed datetime, which may not be safe to emit in the case of out-of-order records
 65    _cursor: Optional[str] = field(
 66        repr=False, default=None
 67    )  # tracks the latest observed datetime that is appropriate to emit as stream state
 68    end_datetime: Optional[Union[MinMaxDatetime, str]] = None
 69    step: Optional[Union[InterpolatedString, str]] = None
 70    cursor_granularity: Optional[str] = None
 71    start_time_option: Optional[RequestOption] = None
 72    end_time_option: Optional[RequestOption] = None
 73    partition_field_start: Optional[str] = None
 74    partition_field_end: Optional[str] = None
 75    lookback_window: Optional[Union[InterpolatedString, str]] = None
 76    message_repository: Optional[MessageRepository] = None
 77    is_compare_strictly: Optional[bool] = False
 78    cursor_datetime_formats: List[str] = field(default_factory=lambda: [])
 79
 80    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 81        if (self.step and not self.cursor_granularity) or (
 82            not self.step and self.cursor_granularity
 83        ):
 84            raise ValueError(
 85                f"If step is defined, cursor_granularity should be as well and vice-versa. "
 86                f"Right now, step is `{self.step}` and cursor_granularity is `{self.cursor_granularity}`"
 87            )
 88        self._start_datetime = MinMaxDatetime.create(self.start_datetime, parameters)
 89        self._end_datetime = (
 90            None if not self.end_datetime else MinMaxDatetime.create(self.end_datetime, parameters)
 91        )
 92
 93        self._timezone = datetime.timezone.utc
 94        self._interpolation = JinjaInterpolation()
 95
 96        self._step = (
 97            self._parse_timedelta(
 98                InterpolatedString.create(self.step, parameters=parameters).eval(self.config)
 99            )
100            if self.step
101            else datetime.timedelta.max
102        )
103        self._cursor_granularity = self._parse_timedelta(self.cursor_granularity)
104        self.cursor_field = InterpolatedString.create(self.cursor_field, parameters=parameters)
105        self._lookback_window = (
106            InterpolatedString.create(self.lookback_window, parameters=parameters)
107            if self.lookback_window
108            else None
109        )
110        self._partition_field_start = InterpolatedString.create(
111            self.partition_field_start or "start_time", parameters=parameters
112        )
113        self._partition_field_end = InterpolatedString.create(
114            self.partition_field_end or "end_time", parameters=parameters
115        )
116        self._parser = DatetimeParser()
117
118        # If datetime format is not specified then start/end datetime should inherit it from the stream slicer
119        if not self._start_datetime.datetime_format:
120            self._start_datetime.datetime_format = self.datetime_format
121        if self._end_datetime and not self._end_datetime.datetime_format:
122            self._end_datetime.datetime_format = self.datetime_format
123
124        if not self.cursor_datetime_formats:
125            self.cursor_datetime_formats = [self.datetime_format]
126
127        _validate_component_request_option_paths(
128            self.config, self.start_time_option, self.end_time_option
129        )
130
131    def get_stream_state(self) -> StreamState:
132        return {self.cursor_field.eval(self.config): self._cursor} if self._cursor else {}  # type: ignore  # cursor_field is converted to an InterpolatedString in __post_init__
133
134    def set_initial_state(self, stream_state: StreamState) -> None:
135        """
136        Cursors are not initialized with their state. As state is needed in order to function properly, this method should be called
137        before calling anything else
138
139        :param stream_state: The state of the stream as returned by get_stream_state
140        """
141        self._cursor = (
142            stream_state.get(self.cursor_field.eval(self.config)) if stream_state else None  # type: ignore [union-attr]
143        )
144
145    def observe(self, stream_slice: StreamSlice, record: Record) -> None:
146        """
147        Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read.
148
149        :param stream_slice: The current slice, which may or may not contain the most recently observed record
150        :param record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the
151          stream state may need to be deferred depending on whether the source reliably orders records by the cursor field.
152        """
153        record_cursor_value = record.get(self.cursor_field.eval(self.config))  # type: ignore  # cursor_field is converted to an InterpolatedString in __post_init__
154        # if the current record has no cursor value, we cannot meaningfully update the state based on it, so there is nothing more to do
155        if not record_cursor_value:
156            return
157
158        start_field = self._partition_field_start.eval(self.config)
159        end_field = self._partition_field_end.eval(self.config)
160        is_highest_observed_cursor_value = (
161            not self._highest_observed_cursor_field_value
162            or self.parse_date(record_cursor_value)
163            > self.parse_date(self._highest_observed_cursor_field_value)
164        )
165        if (
166            self._is_within_daterange_boundaries(
167                record,
168                stream_slice.get(start_field),  # type: ignore [arg-type]
169                stream_slice.get(end_field),  # type: ignore [arg-type]
170            )
171            and is_highest_observed_cursor_value
172        ):
173            self._highest_observed_cursor_field_value = record_cursor_value
174
175    def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
176        if stream_slice.partition:
177            raise ValueError(
178                f"Stream slice {stream_slice} should not have a partition. Got {stream_slice.partition}."
179            )
180        cursor_value_str_by_cursor_value_datetime = dict(
181            map(
182                # we need to ensure the cursor value is preserved as is in the state else the CATs might complain of something like
183                # 2023-01-04T17:30:19.000Z' <= '2023-01-04T17:30:19.000000Z'
184                lambda datetime_str: (self.parse_date(datetime_str), datetime_str),  # type: ignore # because of the filter on the next line, this will only be called with a str
185                filter(
186                    lambda item: item, [self._cursor, self._highest_observed_cursor_field_value]
187                ),
188            )
189        )
190        self._cursor = (
191            cursor_value_str_by_cursor_value_datetime[
192                max(cursor_value_str_by_cursor_value_datetime.keys())
193            ]
194            if cursor_value_str_by_cursor_value_datetime
195            else None
196        )
197
198    def stream_slices(self) -> Iterable[StreamSlice]:
199        """
200        Partition the daterange into slices of size = step.
201
202        The start of the window is the minimum datetime between start_datetime - lookback_window and the stream_state's datetime
203        The end of the window is the minimum datetime between the start of the window and end_datetime.
204
205        :return:
206        """
207        end_datetime = self.select_best_end_datetime()
208        start_datetime = self._calculate_earliest_possible_value(self.select_best_end_datetime())
209        return self._partition_daterange(start_datetime, end_datetime, self._step)
210
211    def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
212        # Datetime based cursors operate over slices made up of datetime ranges. Stream state is based on the progress
213        # through each slice and does not belong to a specific slice. We just return stream state as it is.
214        return self.get_stream_state()
215
216    def _calculate_earliest_possible_value(
217        self, end_datetime: datetime.datetime
218    ) -> datetime.datetime:
219        lookback_delta = self._parse_timedelta(
220            self._lookback_window.eval(self.config) if self._lookback_window else "P0D"
221        )
222        earliest_possible_start_datetime = min(
223            self._start_datetime.get_datetime(self.config), end_datetime
224        )
225        try:
226            cursor_datetime = (
227                self._calculate_cursor_datetime_from_state(self.get_stream_state()) - lookback_delta
228            )
229        except OverflowError:
230            # cursor_datetime defers to the minimum date if it does not exist in the state. Trying to subtract
231            # a timedelta from the minimum datetime results in an OverflowError
232            cursor_datetime = self._calculate_cursor_datetime_from_state(self.get_stream_state())
233        return max(earliest_possible_start_datetime, cursor_datetime)
234
235    def select_best_end_datetime(self) -> datetime.datetime:
236        """
237        Returns the optimal end datetime.
238        This method compares the current datetime with a pre-configured end datetime
239        and returns the earlier of the two. If no pre-configured end datetime is set,
240        the current datetime is returned.
241
242        :return datetime.datetime: The best end datetime, which is either the current datetime or the pre-configured end datetime, whichever is earlier.
243        """
244        now = datetime.datetime.now(tz=self._timezone)
245        if not self._end_datetime:
246            return now
247        return min(self._end_datetime.get_datetime(self.config), now)
248
249    def _calculate_cursor_datetime_from_state(
250        self, stream_state: Mapping[str, Any]
251    ) -> datetime.datetime:
252        if self.cursor_field.eval(self.config, stream_state=stream_state) in stream_state:  # type: ignore  # cursor_field is converted to an InterpolatedString in __post_init__
253            return self.parse_date(stream_state[self.cursor_field.eval(self.config)])  # type: ignore  # cursor_field is converted to an InterpolatedString in __post_init__
254        return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)
255
256    def _format_datetime(self, dt: datetime.datetime) -> str:
257        return self._parser.format(dt, self.datetime_format)
258
259    def _partition_daterange(
260        self,
261        start: datetime.datetime,
262        end: datetime.datetime,
263        step: Union[datetime.timedelta, Duration],
264    ) -> List[StreamSlice]:
265        start_field = self._partition_field_start.eval(self.config)
266        end_field = self._partition_field_end.eval(self.config)
267        dates = []
268
269        while self._is_within_date_range(start, end):
270            next_start = self._evaluate_next_start_date_safely(start, step)
271            end_date = self._get_date(next_start - self._cursor_granularity, end, min)
272            dates.append(
273                StreamSlice(
274                    partition={},
275                    cursor_slice={
276                        start_field: self._format_datetime(start),
277                        end_field: self._format_datetime(end_date),
278                    },
279                )
280            )
281            start = next_start
282        return dates
283
284    def _is_within_date_range(self, start: datetime.datetime, end: datetime.datetime) -> bool:
285        if self.is_compare_strictly:
286            return start < end
287        return start <= end
288
289    def _evaluate_next_start_date_safely(
290        self, start: datetime.datetime, step: datetime.timedelta
291    ) -> datetime.datetime:
292        """
293        Given that we set the default step at datetime.timedelta.max, we will generate an OverflowError when evaluating the next start_date
294        This method assumes that users would never enter a step that would generate an overflow. Given that would be the case, the code
295        would have broken anyway.
296        """
297        try:
298            return start + step
299        except OverflowError:
300            return datetime.datetime.max.replace(tzinfo=datetime.timezone.utc)
301
302    def _get_date(
303        self,
304        cursor_value: datetime.datetime,
305        default_date: datetime.datetime,
306        comparator: Callable[[datetime.datetime, datetime.datetime], datetime.datetime],
307    ) -> datetime.datetime:
308        cursor_date = cursor_value or default_date
309        return comparator(cursor_date, default_date)
310
311    def parse_date(self, date: str) -> datetime.datetime:
312        for datetime_format in self.cursor_datetime_formats + [self.datetime_format]:
313            try:
314                return self._parser.parse(date, datetime_format)
315            except ValueError:
316                pass
317        raise ValueError(f"No format in {self.cursor_datetime_formats} matching {date}")
318
319    @classmethod
320    def _parse_timedelta(cls, time_str: Optional[str]) -> Union[datetime.timedelta, Duration]:
321        """
322        :return Parses an ISO 8601 durations into datetime.timedelta or Duration objects.
323        """
324        if not time_str:
325            return datetime.timedelta(0)
326        return parse_duration(time_str)
327
328    def get_request_params(
329        self,
330        *,
331        stream_state: Optional[StreamState] = None,
332        stream_slice: Optional[StreamSlice] = None,
333        next_page_token: Optional[Mapping[str, Any]] = None,
334    ) -> Mapping[str, Any]:
335        return self._get_request_options(RequestOptionType.request_parameter, stream_slice)
336
337    def get_request_headers(
338        self,
339        *,
340        stream_state: Optional[StreamState] = None,
341        stream_slice: Optional[StreamSlice] = None,
342        next_page_token: Optional[Mapping[str, Any]] = None,
343    ) -> Mapping[str, Any]:
344        return self._get_request_options(RequestOptionType.header, stream_slice)
345
346    def get_request_body_data(
347        self,
348        *,
349        stream_state: Optional[StreamState] = None,
350        stream_slice: Optional[StreamSlice] = None,
351        next_page_token: Optional[Mapping[str, Any]] = None,
352    ) -> Mapping[str, Any]:
353        return self._get_request_options(RequestOptionType.body_data, stream_slice)
354
355    def get_request_body_json(
356        self,
357        *,
358        stream_state: Optional[StreamState] = None,
359        stream_slice: Optional[StreamSlice] = None,
360        next_page_token: Optional[Mapping[str, Any]] = None,
361    ) -> Mapping[str, Any]:
362        return self._get_request_options(RequestOptionType.body_json, stream_slice)
363
364    def request_kwargs(self) -> Mapping[str, Any]:
365        # Never update kwargs
366        return {}
367
368    def _get_request_options(
369        self, option_type: RequestOptionType, stream_slice: Optional[StreamSlice]
370    ) -> Mapping[str, Any]:
371        options: MutableMapping[str, Any] = {}
372        if not stream_slice:
373            return options
374
375        if self.start_time_option and self.start_time_option.inject_into == option_type:
376            start_time_value = stream_slice.get(self._partition_field_start.eval(self.config))
377            self.start_time_option.inject_into_request(options, start_time_value, self.config)
378
379        if self.end_time_option and self.end_time_option.inject_into == option_type:
380            end_time_value = stream_slice.get(self._partition_field_end.eval(self.config))
381            self.end_time_option.inject_into_request(options, end_time_value, self.config)
382
383        return options
384
385    def should_be_synced(self, record: Record) -> bool:
386        cursor_field = self.cursor_field.eval(self.config)  # type: ignore  # cursor_field is converted to an InterpolatedString in __post_init__
387        record_cursor_value = record.get(cursor_field)
388        if not record_cursor_value:
389            self._send_log(
390                Level.WARN,
391                f"Could not find cursor field `{cursor_field}` in record. The incremental sync will assume it needs to be synced",
392            )
393            return True
394        latest_possible_cursor_value = self.select_best_end_datetime()
395        earliest_possible_cursor_value = self._calculate_earliest_possible_value(
396            latest_possible_cursor_value
397        )
398        return self._is_within_daterange_boundaries(
399            record, earliest_possible_cursor_value, latest_possible_cursor_value
400        )
401
402    def _is_within_daterange_boundaries(
403        self,
404        record: Record,
405        start_datetime_boundary: Union[datetime.datetime, str],
406        end_datetime_boundary: Union[datetime.datetime, str],
407    ) -> bool:
408        cursor_field = self.cursor_field.eval(self.config)  # type: ignore  # cursor_field is converted to an InterpolatedString in __post_init__
409        record_cursor_value = record.get(cursor_field)
410        if not record_cursor_value:
411            self._send_log(
412                Level.WARN,
413                f"Could not find cursor field `{cursor_field}` in record. The record will not be considered when emitting sync state",
414            )
415            return False
416        if isinstance(start_datetime_boundary, str):
417            start_datetime_boundary = self.parse_date(start_datetime_boundary)
418        if isinstance(end_datetime_boundary, str):
419            end_datetime_boundary = self.parse_date(end_datetime_boundary)
420        return (
421            start_datetime_boundary <= self.parse_date(record_cursor_value) <= end_datetime_boundary
422        )
423
424    def _send_log(self, level: Level, message: str) -> None:
425        if self.message_repository:
426            self.message_repository.emit_message(
427                AirbyteMessage(
428                    type=Type.LOG,
429                    log=AirbyteLogMessage(level=level, message=message),
430                )
431            )
432
433    def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
434        cursor_field = self.cursor_field.eval(self.config)  # type: ignore  # cursor_field is converted to an InterpolatedString in __post_init__
435        first_cursor_value = first.get(cursor_field)
436        second_cursor_value = second.get(cursor_field)
437        if first_cursor_value and second_cursor_value:
438            return self.parse_date(first_cursor_value) >= self.parse_date(second_cursor_value)
439        elif first_cursor_value:
440            return True
441        else:
442            return False
443
444    def set_runtime_lookback_window(self, lookback_window_in_seconds: int) -> None:
445        """
446        Updates the lookback window based on a given number of seconds if the new duration
447        is greater than the currently configured lookback window.
448
449        :param lookback_window_in_seconds: The lookback duration in seconds to potentially update to.
450        """
451        runtime_lookback_window = duration_isoformat(timedelta(seconds=lookback_window_in_seconds))
452        config_lookback = parse_duration(
453            self._lookback_window.eval(self.config) if self._lookback_window else "P0D"
454        )
455
456        # Check if the new runtime lookback window is greater than the current config lookback
457        if parse_duration(runtime_lookback_window) > config_lookback:
458            self._lookback_window = InterpolatedString.create(
459                runtime_lookback_window, parameters={}
460            )

Slices the stream over a datetime range and create a state with format {: }

Given a start time, end time, a step function, and an optional lookback window, the stream slicer will partition the date range from start time - lookback window to end time.

The step function is defined as a string of the form ISO8601 duration

The timestamp format accepts the same format codes as datetime.strfptime, which are all the format codes required by the 1989 C standard. Full list of accepted format codes: https://man7.org/linux/man-pages/man3/strftime.3.html

Attributes:
  • start_datetime (Union[MinMaxDatetime, str]): the datetime that determines the earliest record that should be synced
  • end_datetime (Optional[Union[MinMaxDatetime, str]]): the datetime that determines the last record that should be synced
  • cursor_field (Union[InterpolatedString, str]): record's cursor field
  • datetime_format (str): format of the datetime
  • step (Optional[str]): size of the timewindow (ISO8601 duration)
  • cursor_granularity (Optional[str]): smallest increment the datetime_format has (ISO 8601 duration) that will be used to ensure that the start of a slice does not overlap with the end of the previous one
  • config (Config): connection config
  • start_time_option (Optional[RequestOption]): request option for start time
  • end_time_option (Optional[RequestOption]): request option for end time
  • partition_field_start (Optional[str]): partition start time field
  • partition_field_end (Optional[str]): stream slice end time field
  • lookback_window (Optional[InterpolatedString]): how many days before start_datetime to read data for (ISO8601 duration)
DatetimeBasedCursor( start_datetime: Union[airbyte_cdk.MinMaxDatetime, str], cursor_field: Union[airbyte_cdk.InterpolatedString, str], datetime_format: str, config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], _highest_observed_cursor_field_value: Optional[str] = None, _cursor: Optional[str] = None, end_datetime: Union[airbyte_cdk.MinMaxDatetime, str, NoneType] = None, step: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, cursor_granularity: Optional[str] = None, start_time_option: Optional[airbyte_cdk.RequestOption] = None, end_time_option: Optional[airbyte_cdk.RequestOption] = None, partition_field_start: Optional[str] = None, partition_field_end: Optional[str] = None, lookback_window: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, message_repository: Optional[airbyte_cdk.MessageRepository] = None, is_compare_strictly: Optional[bool] = False, cursor_datetime_formats: List[str] = <factory>)
start_datetime: Union[airbyte_cdk.MinMaxDatetime, str]
cursor_field: Union[airbyte_cdk.InterpolatedString, str]
datetime_format: str
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
end_datetime: Union[airbyte_cdk.MinMaxDatetime, str, NoneType] = None
step: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
cursor_granularity: Optional[str] = None
start_time_option: Optional[airbyte_cdk.RequestOption] = None
end_time_option: Optional[airbyte_cdk.RequestOption] = None
partition_field_start: Optional[str] = None
partition_field_end: Optional[str] = None
lookback_window: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
message_repository: Optional[airbyte_cdk.MessageRepository] = None
is_compare_strictly: Optional[bool] = False
cursor_datetime_formats: List[str]
def get_stream_state(self) -> Mapping[str, Any]:
131    def get_stream_state(self) -> StreamState:
132        return {self.cursor_field.eval(self.config): self._cursor} if self._cursor else {}  # type: ignore  # cursor_field is converted to an InterpolatedString in __post_init__

Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it is used for two things:

  • Interpolation of the requests
  • Transformation of records
  • Saving the state

For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that allows for emitting the state to the platform.

def set_initial_state(self, stream_state: Mapping[str, Any]) -> None:
134    def set_initial_state(self, stream_state: StreamState) -> None:
135        """
136        Cursors are not initialized with their state. As state is needed in order to function properly, this method should be called
137        before calling anything else
138
139        :param stream_state: The state of the stream as returned by get_stream_state
140        """
141        self._cursor = (
142            stream_state.get(self.cursor_field.eval(self.config)) if stream_state else None  # type: ignore [union-attr]
143        )

Cursors are not initialized with their state. As state is needed in order to function properly, this method should be called before calling anything else

Parameters
  • stream_state: The state of the stream as returned by get_stream_state
def observe( self, stream_slice: airbyte_cdk.StreamSlice, record: airbyte_cdk.Record) -> None:
145    def observe(self, stream_slice: StreamSlice, record: Record) -> None:
146        """
147        Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read.
148
149        :param stream_slice: The current slice, which may or may not contain the most recently observed record
150        :param record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the
151          stream state may need to be deferred depending on whether the source reliably orders records by the cursor field.
152        """
153        record_cursor_value = record.get(self.cursor_field.eval(self.config))  # type: ignore  # cursor_field is converted to an InterpolatedString in __post_init__
154        # if the current record has no cursor value, we cannot meaningfully update the state based on it, so there is nothing more to do
155        if not record_cursor_value:
156            return
157
158        start_field = self._partition_field_start.eval(self.config)
159        end_field = self._partition_field_end.eval(self.config)
160        is_highest_observed_cursor_value = (
161            not self._highest_observed_cursor_field_value
162            or self.parse_date(record_cursor_value)
163            > self.parse_date(self._highest_observed_cursor_field_value)
164        )
165        if (
166            self._is_within_daterange_boundaries(
167                record,
168                stream_slice.get(start_field),  # type: ignore [arg-type]
169                stream_slice.get(end_field),  # type: ignore [arg-type]
170            )
171            and is_highest_observed_cursor_value
172        ):
173            self._highest_observed_cursor_field_value = record_cursor_value

Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read.

Parameters
  • stream_slice: The current slice, which may or may not contain the most recently observed record
  • record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the stream state may need to be deferred depending on whether the source reliably orders records by the cursor field.
def close_slice( self, stream_slice: airbyte_cdk.StreamSlice, *args: Any) -> None:
175    def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
176        if stream_slice.partition:
177            raise ValueError(
178                f"Stream slice {stream_slice} should not have a partition. Got {stream_slice.partition}."
179            )
180        cursor_value_str_by_cursor_value_datetime = dict(
181            map(
182                # we need to ensure the cursor value is preserved as is in the state else the CATs might complain of something like
183                # 2023-01-04T17:30:19.000Z' <= '2023-01-04T17:30:19.000000Z'
184                lambda datetime_str: (self.parse_date(datetime_str), datetime_str),  # type: ignore # because of the filter on the next line, this will only be called with a str
185                filter(
186                    lambda item: item, [self._cursor, self._highest_observed_cursor_field_value]
187                ),
188            )
189        )
190        self._cursor = (
191            cursor_value_str_by_cursor_value_datetime[
192                max(cursor_value_str_by_cursor_value_datetime.keys())
193            ]
194            if cursor_value_str_by_cursor_value_datetime
195            else None
196        )

Update state based on the stream slice. Note that stream_slice.cursor_slice and most_recent_record.associated_slice are expected to be the same but we make it explicit here that stream_slice should be leveraged to update the state. We do not pass in the latest record, since cursor instances should maintain the relevant internal state on their own.

Parameters
  • stream_slice: slice to close
def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
198    def stream_slices(self) -> Iterable[StreamSlice]:
199        """
200        Partition the daterange into slices of size = step.
201
202        The start of the window is the minimum datetime between start_datetime - lookback_window and the stream_state's datetime
203        The end of the window is the minimum datetime between the start of the window and end_datetime.
204
205        :return:
206        """
207        end_datetime = self.select_best_end_datetime()
208        start_datetime = self._calculate_earliest_possible_value(self.select_best_end_datetime())
209        return self._partition_daterange(start_datetime, end_datetime, self._step)

Partition the daterange into slices of size = step.

The start of the window is the minimum datetime between start_datetime - lookback_window and the stream_state's datetime The end of the window is the minimum datetime between the start of the window and end_datetime.

Returns
def select_state( self, stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Optional[Mapping[str, Any]]:
211    def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
212        # Datetime based cursors operate over slices made up of datetime ranges. Stream state is based on the progress
213        # through each slice and does not belong to a specific slice. We just return stream state as it is.
214        return self.get_stream_state()

Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of a specific parent delineated by the incoming slice's partition object.

def select_best_end_datetime(self) -> datetime.datetime:
235    def select_best_end_datetime(self) -> datetime.datetime:
236        """
237        Returns the optimal end datetime.
238        This method compares the current datetime with a pre-configured end datetime
239        and returns the earlier of the two. If no pre-configured end datetime is set,
240        the current datetime is returned.
241
242        :return datetime.datetime: The best end datetime, which is either the current datetime or the pre-configured end datetime, whichever is earlier.
243        """
244        now = datetime.datetime.now(tz=self._timezone)
245        if not self._end_datetime:
246            return now
247        return min(self._end_datetime.get_datetime(self.config), now)

Returns the optimal end datetime. This method compares the current datetime with a pre-configured end datetime and returns the earlier of the two. If no pre-configured end datetime is set, the current datetime is returned.

Returns

The best end datetime, which is either the current datetime or the pre-configured end datetime, whichever is earlier.

def parse_date(self, date: str) -> datetime.datetime:
311    def parse_date(self, date: str) -> datetime.datetime:
312        for datetime_format in self.cursor_datetime_formats + [self.datetime_format]:
313            try:
314                return self._parser.parse(date, datetime_format)
315            except ValueError:
316                pass
317        raise ValueError(f"No format in {self.cursor_datetime_formats} matching {date}")
def get_request_params( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
328    def get_request_params(
329        self,
330        *,
331        stream_state: Optional[StreamState] = None,
332        stream_slice: Optional[StreamSlice] = None,
333        next_page_token: Optional[Mapping[str, Any]] = None,
334    ) -> Mapping[str, Any]:
335        return self._get_request_options(RequestOptionType.request_parameter, stream_slice)

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def get_request_headers( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
337    def get_request_headers(
338        self,
339        *,
340        stream_state: Optional[StreamState] = None,
341        stream_slice: Optional[StreamSlice] = None,
342        next_page_token: Optional[Mapping[str, Any]] = None,
343    ) -> Mapping[str, Any]:
344        return self._get_request_options(RequestOptionType.header, stream_slice)

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def get_request_body_data( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
346    def get_request_body_data(
347        self,
348        *,
349        stream_state: Optional[StreamState] = None,
350        stream_slice: Optional[StreamSlice] = None,
351        next_page_token: Optional[Mapping[str, Any]] = None,
352    ) -> Mapping[str, Any]:
353        return self._get_request_options(RequestOptionType.body_data, stream_slice)

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_request_body_json( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
355    def get_request_body_json(
356        self,
357        *,
358        stream_state: Optional[StreamState] = None,
359        stream_slice: Optional[StreamSlice] = None,
360        next_page_token: Optional[Mapping[str, Any]] = None,
361    ) -> Mapping[str, Any]:
362        return self._get_request_options(RequestOptionType.body_json, stream_slice)

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def request_kwargs(self) -> Mapping[str, Any]:
364    def request_kwargs(self) -> Mapping[str, Any]:
365        # Never update kwargs
366        return {}
def should_be_synced(self, record: airbyte_cdk.Record) -> bool:
385    def should_be_synced(self, record: Record) -> bool:
386        cursor_field = self.cursor_field.eval(self.config)  # type: ignore  # cursor_field is converted to an InterpolatedString in __post_init__
387        record_cursor_value = record.get(cursor_field)
388        if not record_cursor_value:
389            self._send_log(
390                Level.WARN,
391                f"Could not find cursor field `{cursor_field}` in record. The incremental sync will assume it needs to be synced",
392            )
393            return True
394        latest_possible_cursor_value = self.select_best_end_datetime()
395        earliest_possible_cursor_value = self._calculate_earliest_possible_value(
396            latest_possible_cursor_value
397        )
398        return self._is_within_daterange_boundaries(
399            record, earliest_possible_cursor_value, latest_possible_cursor_value
400        )

Evaluating if a record should be synced allows for filtering and stop condition on pagination

def is_greater_than_or_equal( self, first: airbyte_cdk.Record, second: airbyte_cdk.Record) -> bool:
433    def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
434        cursor_field = self.cursor_field.eval(self.config)  # type: ignore  # cursor_field is converted to an InterpolatedString in __post_init__
435        first_cursor_value = first.get(cursor_field)
436        second_cursor_value = second.get(cursor_field)
437        if first_cursor_value and second_cursor_value:
438            return self.parse_date(first_cursor_value) >= self.parse_date(second_cursor_value)
439        elif first_cursor_value:
440            return True
441        else:
442            return False

Evaluating which record is greater in terms of cursor. This is used to avoid having to capture all the records to close a slice

def set_runtime_lookback_window(self, lookback_window_in_seconds: int) -> None:
444    def set_runtime_lookback_window(self, lookback_window_in_seconds: int) -> None:
445        """
446        Updates the lookback window based on a given number of seconds if the new duration
447        is greater than the currently configured lookback window.
448
449        :param lookback_window_in_seconds: The lookback duration in seconds to potentially update to.
450        """
451        runtime_lookback_window = duration_isoformat(timedelta(seconds=lookback_window_in_seconds))
452        config_lookback = parse_duration(
453            self._lookback_window.eval(self.config) if self._lookback_window else "P0D"
454        )
455
456        # Check if the new runtime lookback window is greater than the current config lookback
457        if parse_duration(runtime_lookback_window) > config_lookback:
458            self._lookback_window = InterpolatedString.create(
459                runtime_lookback_window, parameters={}
460            )

Updates the lookback window based on a given number of seconds if the new duration is greater than the currently configured lookback window.

Parameters
  • lookback_window_in_seconds: The lookback duration in seconds to potentially update to.
10class DeclarativeCursor(Cursor, StreamSlicer, ABC):
11    """
12    DeclarativeCursors are components that allow for checkpointing syncs. In addition to managing the fetching and updating of
13    state, declarative cursors also manage stream slicing and injecting slice values into outbound requests.
14    """

DeclarativeCursors are components that allow for checkpointing syncs. In addition to managing the fetching and updating of state, declarative cursors also manage stream slicing and injecting slice values into outbound requests.

class GlobalSubstreamCursor(airbyte_cdk.sources.declarative.incremental.DeclarativeCursor):
 72class GlobalSubstreamCursor(DeclarativeCursor):
 73    """
 74    The GlobalSubstreamCursor is designed to track the state of substreams using a single global cursor.
 75    This class is beneficial for streams with many partitions, as it allows the state to be managed globally
 76    instead of per partition, simplifying state management and reducing the size of state messages.
 77
 78    This cursor is activated by setting the `global_substream_cursor` parameter for incremental sync.
 79
 80    Warnings:
 81    - This class enforces a minimal lookback window for substream based on the duration of the previous sync to avoid losing records. This lookback ensures that any records added or updated during the sync are captured in subsequent syncs.
 82    - The global cursor is updated only at the end of the sync. If the sync ends prematurely (e.g., due to an exception), the state will not be updated.
 83    - When using the `incremental_dependency` option, the sync will progress through parent records, preventing the sync from getting infinitely stuck. However, it is crucial to understand the requirements for both the `global_substream_cursor` and `incremental_dependency` options to avoid data loss.
 84    """
 85
 86    def __init__(self, stream_cursor: DatetimeBasedCursor, partition_router: PartitionRouter):
 87        self._stream_cursor = stream_cursor
 88        self._partition_router = partition_router
 89        self._timer = Timer()
 90        self._lock = threading.Lock()
 91        self._slice_semaphore = threading.Semaphore(
 92            0
 93        )  # Start with 0, indicating no slices being tracked
 94        self._all_slices_yielded = False
 95        self._lookback_window: Optional[int] = None
 96        self._current_partition: Optional[Mapping[str, Any]] = None
 97        self._last_slice: bool = False
 98        self._parent_state: Optional[Mapping[str, Any]] = None
 99
100    def start_slices_generation(self) -> None:
101        self._timer.start()
102
103    def stream_slices(self) -> Iterable[StreamSlice]:
104        """
105        Generates stream slices, ensuring the last slice is properly flagged and processed.
106
107        This method creates a sequence of stream slices by iterating over partitions and cursor slices.
108        It holds onto one slice in memory to set `_all_slices_yielded` to `True` before yielding the
109        final slice. A semaphore is used to track the processing of slices, ensuring that `close_slice`
110        is called only after all slices have been processed.
111
112        We expect the following events:
113        * Yields all the slices except the last one. At this point, `close_slice` won't actually close the global slice as `self._all_slices_yielded == False`
114        * Release the semaphore one last time before setting `self._all_slices_yielded = True`. This will cause `close_slice` to know about all the slices before we indicate that all slices have been yielded so the left side of `if self._all_slices_yielded and self._slice_semaphore._value == 0` will be false if not everything is closed
115        * Setting `self._all_slices_yielded = True`. We do that before actually yielding the last slice as the caller of `stream_slices` might stop iterating at any point and hence the code after `yield` might not be executed
116        * Yield the last slice. At that point, once there are as many slices yielded as closes, the global slice will be closed too
117        """
118        slice_generator = (
119            StreamSlice(
120                partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields
121            )
122            for partition in self._partition_router.stream_slices()
123            for cursor_slice in self._stream_cursor.stream_slices()
124        )
125
126        self.start_slices_generation()
127        for slice, last, state in iterate_with_last_flag_and_state(
128            slice_generator, self._partition_router.get_stream_state
129        ):
130            self._parent_state = state
131            self.register_slice(last)
132            yield slice
133        self._parent_state = self._partition_router.get_stream_state()
134
135    def generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[StreamSlice]:
136        slice_generator = (
137            StreamSlice(
138                partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields
139            )
140            for cursor_slice in self._stream_cursor.stream_slices()
141        )
142
143        yield from slice_generator
144
145    def register_slice(self, last: bool) -> None:
146        """
147        Tracks the processing of a stream slice.
148
149        Releases the semaphore for each slice. If it's the last slice (`last=True`),
150        sets `_all_slices_yielded` to `True` to indicate no more slices will be processed.
151
152        Args:
153            last (bool): True if the current slice is the last in the sequence.
154        """
155        self._slice_semaphore.release()
156        if last:
157            self._all_slices_yielded = True
158
159    def set_initial_state(self, stream_state: StreamState) -> None:
160        """
161        Set the initial state for the cursors.
162
163        This method initializes the state for the global cursor using the provided stream state.
164
165        Additionally, it sets the parent state for partition routers that are based on parent streams. If a partition router
166        does not have parent streams, this step will be skipped due to the default PartitionRouter implementation.
167
168        Args:
169            stream_state (StreamState): The state of the streams to be set. The format of the stream state should be:
170                {
171                    "state": {
172                        "last_updated": "2023-05-27T00:00:00Z"
173                    },
174                    "parent_state": {
175                        "parent_stream_name": {
176                            "last_updated": "2023-05-27T00:00:00Z"
177                        }
178                    },
179                    "lookback_window": 132
180                }
181        """
182        if not stream_state:
183            return
184
185        if "lookback_window" in stream_state:
186            self._lookback_window = stream_state["lookback_window"]
187            self._inject_lookback_into_stream_cursor(stream_state["lookback_window"])
188
189        if "state" in stream_state:
190            self._stream_cursor.set_initial_state(stream_state["state"])
191        elif "states" not in stream_state:
192            # We assume that `stream_state` is in the old global format
193            # Example: {"global_state_format_key": "global_state_format_value"}
194            self._stream_cursor.set_initial_state(stream_state)
195
196        # Set parent state for partition routers based on parent streams
197        self._partition_router.set_initial_state(stream_state)
198
199    def _inject_lookback_into_stream_cursor(self, lookback_window: int) -> None:
200        """
201        Modifies the stream cursor's lookback window based on the duration of the previous sync.
202        This adjustment ensures the cursor is set to the minimal lookback window necessary for
203        avoiding missing data.
204
205        Parameters:
206            lookback_window (int): The lookback duration in seconds to be set, derived from
207                                   the previous sync.
208
209        Raises:
210            ValueError: If the cursor does not support dynamic lookback window adjustments.
211        """
212        if hasattr(self._stream_cursor, "set_runtime_lookback_window"):
213            self._stream_cursor.set_runtime_lookback_window(lookback_window)
214        else:
215            raise ValueError(
216                "The cursor class for Global Substream Cursor does not have a set_runtime_lookback_window method"
217            )
218
219    def observe(self, stream_slice: StreamSlice, record: Record) -> None:
220        self._stream_cursor.observe(
221            StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), record
222        )
223
224    def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
225        """
226        Close the current stream slice.
227
228        This method is called when a stream slice is completed. For the global parent cursor, we close the child cursor
229        only after reading all slices. This ensures that we do not miss any child records from a later parent record
230        if the child cursor is earlier than a record from the first parent record.
231
232        Args:
233            stream_slice (StreamSlice): The stream slice to be closed.
234            *args (Any): Additional arguments.
235        """
236        with self._lock:
237            self._slice_semaphore.acquire()
238            if self._all_slices_yielded and self._slice_semaphore._value == 0:
239                self._lookback_window = self._timer.finish()
240                self._stream_cursor.close_slice(
241                    StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), *args
242                )
243
244    def get_stream_state(self) -> StreamState:
245        state: dict[str, Any] = {"state": self._stream_cursor.get_stream_state()}
246
247        if self._parent_state:
248            state["parent_state"] = self._parent_state
249
250        if self._lookback_window is not None:
251            state["lookback_window"] = self._lookback_window
252
253        return state
254
255    def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
256        # stream_slice is ignored as cursor is global
257        return self._stream_cursor.get_stream_state()
258
259    def get_request_params(
260        self,
261        *,
262        stream_state: Optional[StreamState] = None,
263        stream_slice: Optional[StreamSlice] = None,
264        next_page_token: Optional[Mapping[str, Any]] = None,
265    ) -> Mapping[str, Any]:
266        if stream_slice:
267            return self._partition_router.get_request_params(  # type: ignore # this always returns a mapping
268                stream_state=stream_state,
269                stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
270                next_page_token=next_page_token,
271            ) | self._stream_cursor.get_request_params(
272                stream_state=stream_state,
273                stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
274                next_page_token=next_page_token,
275            )
276        else:
277            raise ValueError("A partition needs to be provided in order to get request params")
278
279    def get_request_headers(
280        self,
281        *,
282        stream_state: Optional[StreamState] = None,
283        stream_slice: Optional[StreamSlice] = None,
284        next_page_token: Optional[Mapping[str, Any]] = None,
285    ) -> Mapping[str, Any]:
286        if stream_slice:
287            return self._partition_router.get_request_headers(  # type: ignore # this always returns a mapping
288                stream_state=stream_state,
289                stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
290                next_page_token=next_page_token,
291            ) | self._stream_cursor.get_request_headers(
292                stream_state=stream_state,
293                stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
294                next_page_token=next_page_token,
295            )
296        else:
297            raise ValueError("A partition needs to be provided in order to get request headers")
298
299    def get_request_body_data(
300        self,
301        *,
302        stream_state: Optional[StreamState] = None,
303        stream_slice: Optional[StreamSlice] = None,
304        next_page_token: Optional[Mapping[str, Any]] = None,
305    ) -> Union[Mapping[str, Any], str]:
306        if stream_slice:
307            return self._partition_router.get_request_body_data(  # type: ignore # this always returns a mapping
308                stream_state=stream_state,
309                stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
310                next_page_token=next_page_token,
311            ) | self._stream_cursor.get_request_body_data(
312                stream_state=stream_state,
313                stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
314                next_page_token=next_page_token,
315            )
316        else:
317            raise ValueError("A partition needs to be provided in order to get request body data")
318
319    def get_request_body_json(
320        self,
321        *,
322        stream_state: Optional[StreamState] = None,
323        stream_slice: Optional[StreamSlice] = None,
324        next_page_token: Optional[Mapping[str, Any]] = None,
325    ) -> Mapping[str, Any]:
326        if stream_slice:
327            return self._partition_router.get_request_body_json(  # type: ignore # this always returns a mapping
328                stream_state=stream_state,
329                stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
330                next_page_token=next_page_token,
331            ) | self._stream_cursor.get_request_body_json(
332                stream_state=stream_state,
333                stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
334                next_page_token=next_page_token,
335            )
336        else:
337            raise ValueError("A partition needs to be provided in order to get request body json")
338
339    def should_be_synced(self, record: Record) -> bool:
340        return self._stream_cursor.should_be_synced(self._convert_record_to_cursor_record(record))
341
342    def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
343        return self._stream_cursor.is_greater_than_or_equal(
344            self._convert_record_to_cursor_record(first),
345            self._convert_record_to_cursor_record(second),
346        )
347
348    @staticmethod
349    def _convert_record_to_cursor_record(record: Record) -> Record:
350        return Record(
351            data=record.data,
352            stream_name=record.stream_name,
353            associated_slice=StreamSlice(
354                partition={}, cursor_slice=record.associated_slice.cursor_slice
355            )
356            if record.associated_slice
357            else None,
358        )

The GlobalSubstreamCursor is designed to track the state of substreams using a single global cursor. This class is beneficial for streams with many partitions, as it allows the state to be managed globally instead of per partition, simplifying state management and reducing the size of state messages.

This cursor is activated by setting the global_substream_cursor parameter for incremental sync.

Warnings:

  • This class enforces a minimal lookback window for substream based on the duration of the previous sync to avoid losing records. This lookback ensures that any records added or updated during the sync are captured in subsequent syncs.
  • The global cursor is updated only at the end of the sync. If the sync ends prematurely (e.g., due to an exception), the state will not be updated.
  • When using the incremental_dependency option, the sync will progress through parent records, preventing the sync from getting infinitely stuck. However, it is crucial to understand the requirements for both the global_substream_cursor and incremental_dependency options to avoid data loss.
GlobalSubstreamCursor( stream_cursor: DatetimeBasedCursor, partition_router: airbyte_cdk.sources.declarative.partition_routers.PartitionRouter)
86    def __init__(self, stream_cursor: DatetimeBasedCursor, partition_router: PartitionRouter):
87        self._stream_cursor = stream_cursor
88        self._partition_router = partition_router
89        self._timer = Timer()
90        self._lock = threading.Lock()
91        self._slice_semaphore = threading.Semaphore(
92            0
93        )  # Start with 0, indicating no slices being tracked
94        self._all_slices_yielded = False
95        self._lookback_window: Optional[int] = None
96        self._current_partition: Optional[Mapping[str, Any]] = None
97        self._last_slice: bool = False
98        self._parent_state: Optional[Mapping[str, Any]] = None
def start_slices_generation(self) -> None:
100    def start_slices_generation(self) -> None:
101        self._timer.start()
def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
103    def stream_slices(self) -> Iterable[StreamSlice]:
104        """
105        Generates stream slices, ensuring the last slice is properly flagged and processed.
106
107        This method creates a sequence of stream slices by iterating over partitions and cursor slices.
108        It holds onto one slice in memory to set `_all_slices_yielded` to `True` before yielding the
109        final slice. A semaphore is used to track the processing of slices, ensuring that `close_slice`
110        is called only after all slices have been processed.
111
112        We expect the following events:
113        * Yields all the slices except the last one. At this point, `close_slice` won't actually close the global slice as `self._all_slices_yielded == False`
114        * Release the semaphore one last time before setting `self._all_slices_yielded = True`. This will cause `close_slice` to know about all the slices before we indicate that all slices have been yielded so the left side of `if self._all_slices_yielded and self._slice_semaphore._value == 0` will be false if not everything is closed
115        * Setting `self._all_slices_yielded = True`. We do that before actually yielding the last slice as the caller of `stream_slices` might stop iterating at any point and hence the code after `yield` might not be executed
116        * Yield the last slice. At that point, once there are as many slices yielded as closes, the global slice will be closed too
117        """
118        slice_generator = (
119            StreamSlice(
120                partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields
121            )
122            for partition in self._partition_router.stream_slices()
123            for cursor_slice in self._stream_cursor.stream_slices()
124        )
125
126        self.start_slices_generation()
127        for slice, last, state in iterate_with_last_flag_and_state(
128            slice_generator, self._partition_router.get_stream_state
129        ):
130            self._parent_state = state
131            self.register_slice(last)
132            yield slice
133        self._parent_state = self._partition_router.get_stream_state()

Generates stream slices, ensuring the last slice is properly flagged and processed.

This method creates a sequence of stream slices by iterating over partitions and cursor slices. It holds onto one slice in memory to set _all_slices_yielded to True before yielding the final slice. A semaphore is used to track the processing of slices, ensuring that close_slice is called only after all slices have been processed.

We expect the following events:

  • Yields all the slices except the last one. At this point, close_slice won't actually close the global slice as self._all_slices_yielded == False
  • Release the semaphore one last time before setting self._all_slices_yielded = True. This will cause close_slice to know about all the slices before we indicate that all slices have been yielded so the left side of if self._all_slices_yielded and self._slice_semaphore._value == 0 will be false if not everything is closed
  • Setting self._all_slices_yielded = True. We do that before actually yielding the last slice as the caller of stream_slices might stop iterating at any point and hence the code after yield might not be executed
  • Yield the last slice. At that point, once there are as many slices yielded as closes, the global slice will be closed too
def generate_slices_from_partition( self, partition: airbyte_cdk.StreamSlice) -> Iterable[airbyte_cdk.StreamSlice]:
135    def generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[StreamSlice]:
136        slice_generator = (
137            StreamSlice(
138                partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields
139            )
140            for cursor_slice in self._stream_cursor.stream_slices()
141        )
142
143        yield from slice_generator
def register_slice(self, last: bool) -> None:
145    def register_slice(self, last: bool) -> None:
146        """
147        Tracks the processing of a stream slice.
148
149        Releases the semaphore for each slice. If it's the last slice (`last=True`),
150        sets `_all_slices_yielded` to `True` to indicate no more slices will be processed.
151
152        Args:
153            last (bool): True if the current slice is the last in the sequence.
154        """
155        self._slice_semaphore.release()
156        if last:
157            self._all_slices_yielded = True

Tracks the processing of a stream slice.

Releases the semaphore for each slice. If it's the last slice (last=True), sets _all_slices_yielded to True to indicate no more slices will be processed.

Arguments:
  • last (bool): True if the current slice is the last in the sequence.
def set_initial_state(self, stream_state: Mapping[str, Any]) -> None:
159    def set_initial_state(self, stream_state: StreamState) -> None:
160        """
161        Set the initial state for the cursors.
162
163        This method initializes the state for the global cursor using the provided stream state.
164
165        Additionally, it sets the parent state for partition routers that are based on parent streams. If a partition router
166        does not have parent streams, this step will be skipped due to the default PartitionRouter implementation.
167
168        Args:
169            stream_state (StreamState): The state of the streams to be set. The format of the stream state should be:
170                {
171                    "state": {
172                        "last_updated": "2023-05-27T00:00:00Z"
173                    },
174                    "parent_state": {
175                        "parent_stream_name": {
176                            "last_updated": "2023-05-27T00:00:00Z"
177                        }
178                    },
179                    "lookback_window": 132
180                }
181        """
182        if not stream_state:
183            return
184
185        if "lookback_window" in stream_state:
186            self._lookback_window = stream_state["lookback_window"]
187            self._inject_lookback_into_stream_cursor(stream_state["lookback_window"])
188
189        if "state" in stream_state:
190            self._stream_cursor.set_initial_state(stream_state["state"])
191        elif "states" not in stream_state:
192            # We assume that `stream_state` is in the old global format
193            # Example: {"global_state_format_key": "global_state_format_value"}
194            self._stream_cursor.set_initial_state(stream_state)
195
196        # Set parent state for partition routers based on parent streams
197        self._partition_router.set_initial_state(stream_state)

Set the initial state for the cursors.

This method initializes the state for the global cursor using the provided stream state.

Additionally, it sets the parent state for partition routers that are based on parent streams. If a partition router does not have parent streams, this step will be skipped due to the default PartitionRouter implementation.

Arguments:
  • stream_state (StreamState): The state of the streams to be set. The format of the stream state should be: { "state": { "last_updated": "2023-05-27T00:00:00Z" }, "parent_state": { "parent_stream_name": { "last_updated": "2023-05-27T00:00:00Z" } }, "lookback_window": 132 }
def observe( self, stream_slice: airbyte_cdk.StreamSlice, record: airbyte_cdk.Record) -> None:
219    def observe(self, stream_slice: StreamSlice, record: Record) -> None:
220        self._stream_cursor.observe(
221            StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), record
222        )

Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read.

Parameters
  • stream_slice: The current slice, which may or may not contain the most recently observed record
  • record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the stream state may need to be deferred depending on whether the source reliably orders records by the cursor field.
def close_slice( self, stream_slice: airbyte_cdk.StreamSlice, *args: Any) -> None:
224    def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
225        """
226        Close the current stream slice.
227
228        This method is called when a stream slice is completed. For the global parent cursor, we close the child cursor
229        only after reading all slices. This ensures that we do not miss any child records from a later parent record
230        if the child cursor is earlier than a record from the first parent record.
231
232        Args:
233            stream_slice (StreamSlice): The stream slice to be closed.
234            *args (Any): Additional arguments.
235        """
236        with self._lock:
237            self._slice_semaphore.acquire()
238            if self._all_slices_yielded and self._slice_semaphore._value == 0:
239                self._lookback_window = self._timer.finish()
240                self._stream_cursor.close_slice(
241                    StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), *args
242                )

Close the current stream slice.

This method is called when a stream slice is completed. For the global parent cursor, we close the child cursor only after reading all slices. This ensures that we do not miss any child records from a later parent record if the child cursor is earlier than a record from the first parent record.

Arguments:
  • stream_slice (StreamSlice): The stream slice to be closed.
  • *args (Any): Additional arguments.
def get_stream_state(self) -> Mapping[str, Any]:
244    def get_stream_state(self) -> StreamState:
245        state: dict[str, Any] = {"state": self._stream_cursor.get_stream_state()}
246
247        if self._parent_state:
248            state["parent_state"] = self._parent_state
249
250        if self._lookback_window is not None:
251            state["lookback_window"] = self._lookback_window
252
253        return state

Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it is used for two things:

  • Interpolation of the requests
  • Transformation of records
  • Saving the state

For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that allows for emitting the state to the platform.

def select_state( self, stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Optional[Mapping[str, Any]]:
255    def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
256        # stream_slice is ignored as cursor is global
257        return self._stream_cursor.get_stream_state()

Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of a specific parent delineated by the incoming slice's partition object.

def get_request_params( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
259    def get_request_params(
260        self,
261        *,
262        stream_state: Optional[StreamState] = None,
263        stream_slice: Optional[StreamSlice] = None,
264        next_page_token: Optional[Mapping[str, Any]] = None,
265    ) -> Mapping[str, Any]:
266        if stream_slice:
267            return self._partition_router.get_request_params(  # type: ignore # this always returns a mapping
268                stream_state=stream_state,
269                stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
270                next_page_token=next_page_token,
271            ) | self._stream_cursor.get_request_params(
272                stream_state=stream_state,
273                stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
274                next_page_token=next_page_token,
275            )
276        else:
277            raise ValueError("A partition needs to be provided in order to get request params")

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def get_request_headers( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
279    def get_request_headers(
280        self,
281        *,
282        stream_state: Optional[StreamState] = None,
283        stream_slice: Optional[StreamSlice] = None,
284        next_page_token: Optional[Mapping[str, Any]] = None,
285    ) -> Mapping[str, Any]:
286        if stream_slice:
287            return self._partition_router.get_request_headers(  # type: ignore # this always returns a mapping
288                stream_state=stream_state,
289                stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
290                next_page_token=next_page_token,
291            ) | self._stream_cursor.get_request_headers(
292                stream_state=stream_state,
293                stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
294                next_page_token=next_page_token,
295            )
296        else:
297            raise ValueError("A partition needs to be provided in order to get request headers")

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def get_request_body_data( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Union[Mapping[str, Any], str]:
299    def get_request_body_data(
300        self,
301        *,
302        stream_state: Optional[StreamState] = None,
303        stream_slice: Optional[StreamSlice] = None,
304        next_page_token: Optional[Mapping[str, Any]] = None,
305    ) -> Union[Mapping[str, Any], str]:
306        if stream_slice:
307            return self._partition_router.get_request_body_data(  # type: ignore # this always returns a mapping
308                stream_state=stream_state,
309                stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
310                next_page_token=next_page_token,
311            ) | self._stream_cursor.get_request_body_data(
312                stream_state=stream_state,
313                stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
314                next_page_token=next_page_token,
315            )
316        else:
317            raise ValueError("A partition needs to be provided in order to get request body data")

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_request_body_json( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
319    def get_request_body_json(
320        self,
321        *,
322        stream_state: Optional[StreamState] = None,
323        stream_slice: Optional[StreamSlice] = None,
324        next_page_token: Optional[Mapping[str, Any]] = None,
325    ) -> Mapping[str, Any]:
326        if stream_slice:
327            return self._partition_router.get_request_body_json(  # type: ignore # this always returns a mapping
328                stream_state=stream_state,
329                stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
330                next_page_token=next_page_token,
331            ) | self._stream_cursor.get_request_body_json(
332                stream_state=stream_state,
333                stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
334                next_page_token=next_page_token,
335            )
336        else:
337            raise ValueError("A partition needs to be provided in order to get request body json")

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def should_be_synced(self, record: airbyte_cdk.Record) -> bool:
339    def should_be_synced(self, record: Record) -> bool:
340        return self._stream_cursor.should_be_synced(self._convert_record_to_cursor_record(record))

Evaluating if a record should be synced allows for filtering and stop condition on pagination

def is_greater_than_or_equal( self, first: airbyte_cdk.Record, second: airbyte_cdk.Record) -> bool:
342    def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
343        return self._stream_cursor.is_greater_than_or_equal(
344            self._convert_record_to_cursor_record(first),
345            self._convert_record_to_cursor_record(second),
346        )

Evaluating which record is greater in terms of cursor. This is used to avoid having to capture all the records to close a slice

class PerPartitionCursor(airbyte_cdk.sources.declarative.incremental.DeclarativeCursor):
 28class PerPartitionCursor(DeclarativeCursor):
 29    """
 30    Manages state per partition when a stream has many partitions, to prevent data loss or duplication.
 31
 32    **Partition Limitation and Limit Reached Logic**
 33
 34    - **DEFAULT_MAX_PARTITIONS_NUMBER**: The maximum number of partitions to keep in memory (default is 10,000).
 35    - **_cursor_per_partition**: An ordered dictionary that stores cursors for each partition.
 36    - **_over_limit**: A counter that increments each time an oldest partition is removed when the limit is exceeded.
 37
 38    The class ensures that the number of partitions tracked does not exceed the `DEFAULT_MAX_PARTITIONS_NUMBER` to prevent excessive memory usage.
 39
 40    - When the number of partitions exceeds the limit, the oldest partitions are removed from `_cursor_per_partition`, and `_over_limit` is incremented accordingly.
 41    - The `limit_reached` method returns `True` when `_over_limit` exceeds `DEFAULT_MAX_PARTITIONS_NUMBER`, indicating that the global cursor should be used instead of per-partition cursors.
 42
 43    This approach avoids unnecessary switching to a global cursor due to temporary spikes in partition counts, ensuring that switching is only done when a sustained high number of partitions is observed.
 44    """
 45
 46    DEFAULT_MAX_PARTITIONS_NUMBER = 10000
 47    _NO_STATE: Mapping[str, Any] = {}
 48    _NO_CURSOR_STATE: Mapping[str, Any] = {}
 49    _KEY = 0
 50    _VALUE = 1
 51    _state_to_migrate_from: Mapping[str, Any] = {}
 52
 53    def __init__(self, cursor_factory: CursorFactory, partition_router: PartitionRouter):
 54        self._cursor_factory = cursor_factory
 55        self._partition_router = partition_router
 56        # The dict is ordered to ensure that once the maximum number of partitions is reached,
 57        # the oldest partitions can be efficiently removed, maintaining the most recent partitions.
 58        self._cursor_per_partition: OrderedDict[str, DeclarativeCursor] = OrderedDict()
 59        self._over_limit = 0
 60        self._partition_serializer = PerPartitionKeySerializer()
 61
 62    def stream_slices(self) -> Iterable[StreamSlice]:
 63        slices = self._partition_router.stream_slices()
 64        for partition in slices:
 65            yield from self.generate_slices_from_partition(partition)
 66
 67    def generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[StreamSlice]:
 68        # Ensure the maximum number of partitions is not exceeded
 69        self._ensure_partition_limit()
 70
 71        cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition))
 72        if not cursor:
 73            partition_state = (
 74                self._state_to_migrate_from
 75                if self._state_to_migrate_from
 76                else self._NO_CURSOR_STATE
 77            )
 78            cursor = self._create_cursor(partition_state)
 79            self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor
 80
 81        for cursor_slice in cursor.stream_slices():
 82            yield StreamSlice(
 83                partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields
 84            )
 85
 86    def _ensure_partition_limit(self) -> None:
 87        """
 88        Ensure the maximum number of partitions is not exceeded. If so, the oldest added partition will be dropped.
 89        """
 90        while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1:
 91            self._over_limit += 1
 92            oldest_partition = self._cursor_per_partition.popitem(last=False)[
 93                0
 94            ]  # Remove the oldest partition
 95            logger.warning(
 96                f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._over_limit}."
 97            )
 98
 99    def limit_reached(self) -> bool:
100        return self._over_limit > self.DEFAULT_MAX_PARTITIONS_NUMBER
101
102    def set_initial_state(self, stream_state: StreamState) -> None:
103        """
104        Set the initial state for the cursors.
105
106        This method initializes the state for each partition cursor using the provided stream state.
107        If a partition state is provided in the stream state, it will update the corresponding partition cursor with this state.
108
109        Additionally, it sets the parent state for partition routers that are based on parent streams. If a partition router
110        does not have parent streams, this step will be skipped due to the default PartitionRouter implementation.
111
112        Args:
113            stream_state (StreamState): The state of the streams to be set. The format of the stream state should be:
114                {
115                    "states": [
116                        {
117                            "partition": {
118                                "partition_key": "value"
119                            },
120                            "cursor": {
121                                "last_updated": "2023-05-27T00:00:00Z"
122                            }
123                        }
124                    ],
125                    "parent_state": {
126                        "parent_stream_name": {
127                            "last_updated": "2023-05-27T00:00:00Z"
128                        }
129                    }
130                }
131        """
132        if not stream_state:
133            return
134
135        if "states" not in stream_state:
136            # We assume that `stream_state` is in a global format that can be applied to all partitions.
137            # Example: {"global_state_format_key": "global_state_format_value"}
138            self._state_to_migrate_from = stream_state
139
140        else:
141            for state in stream_state["states"]:
142                self._cursor_per_partition[self._to_partition_key(state["partition"])] = (
143                    self._create_cursor(state["cursor"])
144                )
145
146            # set default state for missing partitions if it is per partition with fallback to global
147            if "state" in stream_state:
148                self._state_to_migrate_from = stream_state["state"]
149
150        # Set parent state for partition routers based on parent streams
151        self._partition_router.set_initial_state(stream_state)
152
153    def observe(self, stream_slice: StreamSlice, record: Record) -> None:
154        self._cursor_per_partition[self._to_partition_key(stream_slice.partition)].observe(
155            StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), record
156        )
157
158    def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
159        try:
160            self._cursor_per_partition[self._to_partition_key(stream_slice.partition)].close_slice(
161                StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), *args
162            )
163        except KeyError as exception:
164            raise ValueError(
165                f"Partition {str(exception)} could not be found in current state based on the record. This is unexpected because "
166                f"we should only update state for partitions that were emitted during `stream_slices`"
167            )
168
169    def get_stream_state(self) -> StreamState:
170        states = []
171        for partition_tuple, cursor in self._cursor_per_partition.items():
172            cursor_state = cursor.get_stream_state()
173            if cursor_state:
174                states.append(
175                    {
176                        "partition": self._to_dict(partition_tuple),
177                        "cursor": cursor_state,
178                    }
179                )
180        state: dict[str, Any] = {"states": states}
181
182        parent_state = self._partition_router.get_stream_state()
183        if parent_state:
184            state["parent_state"] = parent_state
185        return state
186
187    def _get_state_for_partition(self, partition: Mapping[str, Any]) -> Optional[StreamState]:
188        cursor = self._cursor_per_partition.get(self._to_partition_key(partition))
189        if cursor:
190            return cursor.get_stream_state()
191
192        return None
193
194    @staticmethod
195    def _is_new_state(stream_state: Mapping[str, Any]) -> bool:
196        return not bool(stream_state)
197
198    def _to_partition_key(self, partition: Mapping[str, Any]) -> str:
199        return self._partition_serializer.to_partition_key(partition)
200
201    def _to_dict(self, partition_key: str) -> Mapping[str, Any]:
202        return self._partition_serializer.to_partition(partition_key)
203
204    def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
205        if not stream_slice:
206            raise ValueError("A partition needs to be provided in order to extract a state")
207
208        if not stream_slice:
209            return None
210
211        return self._get_state_for_partition(stream_slice.partition)
212
213    def _create_cursor(self, cursor_state: Any) -> DeclarativeCursor:
214        cursor = self._cursor_factory.create()
215        cursor.set_initial_state(cursor_state)
216        return cursor
217
218    def get_request_params(
219        self,
220        *,
221        stream_state: Optional[StreamState] = None,
222        stream_slice: Optional[StreamSlice] = None,
223        next_page_token: Optional[Mapping[str, Any]] = None,
224    ) -> Mapping[str, Any]:
225        if stream_slice:
226            if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition:
227                self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition))
228            return self._partition_router.get_request_params(  # type: ignore # this always returns a mapping
229                stream_state=stream_state,
230                stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
231                next_page_token=next_page_token,
232            ) | self._cursor_per_partition[
233                self._to_partition_key(stream_slice.partition)
234            ].get_request_params(
235                stream_state=stream_state,
236                stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
237                next_page_token=next_page_token,
238            )
239        else:
240            raise ValueError("A partition needs to be provided in order to get request params")
241
242    def get_request_headers(
243        self,
244        *,
245        stream_state: Optional[StreamState] = None,
246        stream_slice: Optional[StreamSlice] = None,
247        next_page_token: Optional[Mapping[str, Any]] = None,
248    ) -> Mapping[str, Any]:
249        if stream_slice:
250            if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition:
251                self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition))
252            return self._partition_router.get_request_headers(  # type: ignore # this always returns a mapping
253                stream_state=stream_state,
254                stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
255                next_page_token=next_page_token,
256            ) | self._cursor_per_partition[
257                self._to_partition_key(stream_slice.partition)
258            ].get_request_headers(
259                stream_state=stream_state,
260                stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
261                next_page_token=next_page_token,
262            )
263        else:
264            raise ValueError("A partition needs to be provided in order to get request headers")
265
266    def get_request_body_data(
267        self,
268        *,
269        stream_state: Optional[StreamState] = None,
270        stream_slice: Optional[StreamSlice] = None,
271        next_page_token: Optional[Mapping[str, Any]] = None,
272    ) -> Union[Mapping[str, Any], str]:
273        if stream_slice:
274            if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition:
275                self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition))
276            return self._partition_router.get_request_body_data(  # type: ignore # this always returns a mapping
277                stream_state=stream_state,
278                stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
279                next_page_token=next_page_token,
280            ) | self._cursor_per_partition[
281                self._to_partition_key(stream_slice.partition)
282            ].get_request_body_data(
283                stream_state=stream_state,
284                stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
285                next_page_token=next_page_token,
286            )
287        else:
288            raise ValueError("A partition needs to be provided in order to get request body data")
289
290    def get_request_body_json(
291        self,
292        *,
293        stream_state: Optional[StreamState] = None,
294        stream_slice: Optional[StreamSlice] = None,
295        next_page_token: Optional[Mapping[str, Any]] = None,
296    ) -> Mapping[str, Any]:
297        if stream_slice:
298            if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition:
299                self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition))
300            return self._partition_router.get_request_body_json(  # type: ignore # this always returns a mapping
301                stream_state=stream_state,
302                stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
303                next_page_token=next_page_token,
304            ) | self._cursor_per_partition[
305                self._to_partition_key(stream_slice.partition)
306            ].get_request_body_json(
307                stream_state=stream_state,
308                stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
309                next_page_token=next_page_token,
310            )
311        else:
312            raise ValueError("A partition needs to be provided in order to get request body json")
313
314    def should_be_synced(self, record: Record) -> bool:
315        return self._get_cursor(record).should_be_synced(
316            self._convert_record_to_cursor_record(record)
317        )
318
319    def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
320        if not first.associated_slice or not second.associated_slice:
321            raise ValueError(
322                f"Both records should have an associated slice but got {first.associated_slice} and {second.associated_slice}"
323            )
324        if first.associated_slice.partition != second.associated_slice.partition:
325            raise ValueError(
326                f"To compare records, partition should be the same but got {first.associated_slice.partition} and {second.associated_slice.partition}"
327            )
328
329        return self._get_cursor(first).is_greater_than_or_equal(
330            self._convert_record_to_cursor_record(first),
331            self._convert_record_to_cursor_record(second),
332        )
333
334    @staticmethod
335    def _convert_record_to_cursor_record(record: Record) -> Record:
336        return Record(
337            data=record.data,
338            stream_name=record.stream_name,
339            associated_slice=StreamSlice(
340                partition={}, cursor_slice=record.associated_slice.cursor_slice
341            )
342            if record.associated_slice
343            else None,
344        )
345
346    def _get_cursor(self, record: Record) -> DeclarativeCursor:
347        if not record.associated_slice:
348            raise ValueError(
349                "Invalid state as stream slices that are emitted should refer to an existing cursor"
350            )
351        partition_key = self._to_partition_key(record.associated_slice.partition)
352        if partition_key not in self._cursor_per_partition:
353            self._create_cursor_for_partition(partition_key)
354        cursor = self._cursor_per_partition[partition_key]
355        return cursor
356
357    def _create_cursor_for_partition(self, partition_key: str) -> None:
358        """
359        Dynamically creates and initializes a cursor for the specified partition.
360
361        This method is required for `ConcurrentPerPartitionCursor`. For concurrent cursors,
362        stream_slices is executed only for the concurrent cursor, so cursors per partition
363        are not created for the declarative cursor. This method ensures that a cursor is available
364        to create requests for the specified partition. The cursor is initialized
365        with the per-partition state if present in the initial state, or with the global state
366        adjusted by the lookback window, or with the state to migrate from.
367
368        Note:
369            This is a temporary workaround and should be removed once the declarative cursor
370            is decoupled from the concurrent cursor implementation.
371
372        Args:
373            partition_key (str): The unique identifier for the partition for which the cursor
374            needs to be created.
375        """
376        partition_state = (
377            self._state_to_migrate_from if self._state_to_migrate_from else self._NO_CURSOR_STATE
378        )
379        cursor = self._create_cursor(partition_state)
380
381        self._cursor_per_partition[partition_key] = cursor

Manages state per partition when a stream has many partitions, to prevent data loss or duplication.

Partition Limitation and Limit Reached Logic

  • DEFAULT_MAX_PARTITIONS_NUMBER: The maximum number of partitions to keep in memory (default is 10,000).
  • _cursor_per_partition: An ordered dictionary that stores cursors for each partition.
  • _over_limit: A counter that increments each time an oldest partition is removed when the limit is exceeded.

The class ensures that the number of partitions tracked does not exceed the DEFAULT_MAX_PARTITIONS_NUMBER to prevent excessive memory usage.

  • When the number of partitions exceeds the limit, the oldest partitions are removed from _cursor_per_partition, and _over_limit is incremented accordingly.
  • The limit_reached method returns True when _over_limit exceeds DEFAULT_MAX_PARTITIONS_NUMBER, indicating that the global cursor should be used instead of per-partition cursors.

This approach avoids unnecessary switching to a global cursor due to temporary spikes in partition counts, ensuring that switching is only done when a sustained high number of partitions is observed.

PerPartitionCursor( cursor_factory: CursorFactory, partition_router: airbyte_cdk.sources.declarative.partition_routers.PartitionRouter)
53    def __init__(self, cursor_factory: CursorFactory, partition_router: PartitionRouter):
54        self._cursor_factory = cursor_factory
55        self._partition_router = partition_router
56        # The dict is ordered to ensure that once the maximum number of partitions is reached,
57        # the oldest partitions can be efficiently removed, maintaining the most recent partitions.
58        self._cursor_per_partition: OrderedDict[str, DeclarativeCursor] = OrderedDict()
59        self._over_limit = 0
60        self._partition_serializer = PerPartitionKeySerializer()
DEFAULT_MAX_PARTITIONS_NUMBER = 10000
def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
62    def stream_slices(self) -> Iterable[StreamSlice]:
63        slices = self._partition_router.stream_slices()
64        for partition in slices:
65            yield from self.generate_slices_from_partition(partition)

Defines stream slices

Returns

An iterable of stream slices

def generate_slices_from_partition( self, partition: airbyte_cdk.StreamSlice) -> Iterable[airbyte_cdk.StreamSlice]:
67    def generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[StreamSlice]:
68        # Ensure the maximum number of partitions is not exceeded
69        self._ensure_partition_limit()
70
71        cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition))
72        if not cursor:
73            partition_state = (
74                self._state_to_migrate_from
75                if self._state_to_migrate_from
76                else self._NO_CURSOR_STATE
77            )
78            cursor = self._create_cursor(partition_state)
79            self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor
80
81        for cursor_slice in cursor.stream_slices():
82            yield StreamSlice(
83                partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields
84            )
def limit_reached(self) -> bool:
 99    def limit_reached(self) -> bool:
100        return self._over_limit > self.DEFAULT_MAX_PARTITIONS_NUMBER
def set_initial_state(self, stream_state: Mapping[str, Any]) -> None:
102    def set_initial_state(self, stream_state: StreamState) -> None:
103        """
104        Set the initial state for the cursors.
105
106        This method initializes the state for each partition cursor using the provided stream state.
107        If a partition state is provided in the stream state, it will update the corresponding partition cursor with this state.
108
109        Additionally, it sets the parent state for partition routers that are based on parent streams. If a partition router
110        does not have parent streams, this step will be skipped due to the default PartitionRouter implementation.
111
112        Args:
113            stream_state (StreamState): The state of the streams to be set. The format of the stream state should be:
114                {
115                    "states": [
116                        {
117                            "partition": {
118                                "partition_key": "value"
119                            },
120                            "cursor": {
121                                "last_updated": "2023-05-27T00:00:00Z"
122                            }
123                        }
124                    ],
125                    "parent_state": {
126                        "parent_stream_name": {
127                            "last_updated": "2023-05-27T00:00:00Z"
128                        }
129                    }
130                }
131        """
132        if not stream_state:
133            return
134
135        if "states" not in stream_state:
136            # We assume that `stream_state` is in a global format that can be applied to all partitions.
137            # Example: {"global_state_format_key": "global_state_format_value"}
138            self._state_to_migrate_from = stream_state
139
140        else:
141            for state in stream_state["states"]:
142                self._cursor_per_partition[self._to_partition_key(state["partition"])] = (
143                    self._create_cursor(state["cursor"])
144                )
145
146            # set default state for missing partitions if it is per partition with fallback to global
147            if "state" in stream_state:
148                self._state_to_migrate_from = stream_state["state"]
149
150        # Set parent state for partition routers based on parent streams
151        self._partition_router.set_initial_state(stream_state)

Set the initial state for the cursors.

This method initializes the state for each partition cursor using the provided stream state. If a partition state is provided in the stream state, it will update the corresponding partition cursor with this state.

Additionally, it sets the parent state for partition routers that are based on parent streams. If a partition router does not have parent streams, this step will be skipped due to the default PartitionRouter implementation.

Arguments:
  • stream_state (StreamState): The state of the streams to be set. The format of the stream state should be: { "states": [ { "partition": { "partition_key": "value" }, "cursor": { "last_updated": "2023-05-27T00:00:00Z" } } ], "parent_state": { "parent_stream_name": { "last_updated": "2023-05-27T00:00:00Z" } } }
def observe( self, stream_slice: airbyte_cdk.StreamSlice, record: airbyte_cdk.Record) -> None:
153    def observe(self, stream_slice: StreamSlice, record: Record) -> None:
154        self._cursor_per_partition[self._to_partition_key(stream_slice.partition)].observe(
155            StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), record
156        )

Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read.

Parameters
  • stream_slice: The current slice, which may or may not contain the most recently observed record
  • record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the stream state may need to be deferred depending on whether the source reliably orders records by the cursor field.
def close_slice( self, stream_slice: airbyte_cdk.StreamSlice, *args: Any) -> None:
158    def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
159        try:
160            self._cursor_per_partition[self._to_partition_key(stream_slice.partition)].close_slice(
161                StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), *args
162            )
163        except KeyError as exception:
164            raise ValueError(
165                f"Partition {str(exception)} could not be found in current state based on the record. This is unexpected because "
166                f"we should only update state for partitions that were emitted during `stream_slices`"
167            )

Update state based on the stream slice. Note that stream_slice.cursor_slice and most_recent_record.associated_slice are expected to be the same but we make it explicit here that stream_slice should be leveraged to update the state. We do not pass in the latest record, since cursor instances should maintain the relevant internal state on their own.

Parameters
  • stream_slice: slice to close
def get_stream_state(self) -> Mapping[str, Any]:
169    def get_stream_state(self) -> StreamState:
170        states = []
171        for partition_tuple, cursor in self._cursor_per_partition.items():
172            cursor_state = cursor.get_stream_state()
173            if cursor_state:
174                states.append(
175                    {
176                        "partition": self._to_dict(partition_tuple),
177                        "cursor": cursor_state,
178                    }
179                )
180        state: dict[str, Any] = {"states": states}
181
182        parent_state = self._partition_router.get_stream_state()
183        if parent_state:
184            state["parent_state"] = parent_state
185        return state

Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it is used for two things:

  • Interpolation of the requests
  • Transformation of records
  • Saving the state

For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that allows for emitting the state to the platform.

def select_state( self, stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Optional[Mapping[str, Any]]:
204    def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
205        if not stream_slice:
206            raise ValueError("A partition needs to be provided in order to extract a state")
207
208        if not stream_slice:
209            return None
210
211        return self._get_state_for_partition(stream_slice.partition)

Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of a specific parent delineated by the incoming slice's partition object.

def get_request_params( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
218    def get_request_params(
219        self,
220        *,
221        stream_state: Optional[StreamState] = None,
222        stream_slice: Optional[StreamSlice] = None,
223        next_page_token: Optional[Mapping[str, Any]] = None,
224    ) -> Mapping[str, Any]:
225        if stream_slice:
226            if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition:
227                self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition))
228            return self._partition_router.get_request_params(  # type: ignore # this always returns a mapping
229                stream_state=stream_state,
230                stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
231                next_page_token=next_page_token,
232            ) | self._cursor_per_partition[
233                self._to_partition_key(stream_slice.partition)
234            ].get_request_params(
235                stream_state=stream_state,
236                stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
237                next_page_token=next_page_token,
238            )
239        else:
240            raise ValueError("A partition needs to be provided in order to get request params")

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def get_request_headers( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
242    def get_request_headers(
243        self,
244        *,
245        stream_state: Optional[StreamState] = None,
246        stream_slice: Optional[StreamSlice] = None,
247        next_page_token: Optional[Mapping[str, Any]] = None,
248    ) -> Mapping[str, Any]:
249        if stream_slice:
250            if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition:
251                self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition))
252            return self._partition_router.get_request_headers(  # type: ignore # this always returns a mapping
253                stream_state=stream_state,
254                stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
255                next_page_token=next_page_token,
256            ) | self._cursor_per_partition[
257                self._to_partition_key(stream_slice.partition)
258            ].get_request_headers(
259                stream_state=stream_state,
260                stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
261                next_page_token=next_page_token,
262            )
263        else:
264            raise ValueError("A partition needs to be provided in order to get request headers")

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def get_request_body_data( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Union[Mapping[str, Any], str]:
266    def get_request_body_data(
267        self,
268        *,
269        stream_state: Optional[StreamState] = None,
270        stream_slice: Optional[StreamSlice] = None,
271        next_page_token: Optional[Mapping[str, Any]] = None,
272    ) -> Union[Mapping[str, Any], str]:
273        if stream_slice:
274            if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition:
275                self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition))
276            return self._partition_router.get_request_body_data(  # type: ignore # this always returns a mapping
277                stream_state=stream_state,
278                stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
279                next_page_token=next_page_token,
280            ) | self._cursor_per_partition[
281                self._to_partition_key(stream_slice.partition)
282            ].get_request_body_data(
283                stream_state=stream_state,
284                stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
285                next_page_token=next_page_token,
286            )
287        else:
288            raise ValueError("A partition needs to be provided in order to get request body data")

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_request_body_json( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
290    def get_request_body_json(
291        self,
292        *,
293        stream_state: Optional[StreamState] = None,
294        stream_slice: Optional[StreamSlice] = None,
295        next_page_token: Optional[Mapping[str, Any]] = None,
296    ) -> Mapping[str, Any]:
297        if stream_slice:
298            if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition:
299                self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition))
300            return self._partition_router.get_request_body_json(  # type: ignore # this always returns a mapping
301                stream_state=stream_state,
302                stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
303                next_page_token=next_page_token,
304            ) | self._cursor_per_partition[
305                self._to_partition_key(stream_slice.partition)
306            ].get_request_body_json(
307                stream_state=stream_state,
308                stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
309                next_page_token=next_page_token,
310            )
311        else:
312            raise ValueError("A partition needs to be provided in order to get request body json")

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def should_be_synced(self, record: airbyte_cdk.Record) -> bool:
314    def should_be_synced(self, record: Record) -> bool:
315        return self._get_cursor(record).should_be_synced(
316            self._convert_record_to_cursor_record(record)
317        )

Evaluating if a record should be synced allows for filtering and stop condition on pagination

def is_greater_than_or_equal( self, first: airbyte_cdk.Record, second: airbyte_cdk.Record) -> bool:
319    def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
320        if not first.associated_slice or not second.associated_slice:
321            raise ValueError(
322                f"Both records should have an associated slice but got {first.associated_slice} and {second.associated_slice}"
323            )
324        if first.associated_slice.partition != second.associated_slice.partition:
325            raise ValueError(
326                f"To compare records, partition should be the same but got {first.associated_slice.partition} and {second.associated_slice.partition}"
327            )
328
329        return self._get_cursor(first).is_greater_than_or_equal(
330            self._convert_record_to_cursor_record(first),
331            self._convert_record_to_cursor_record(second),
332        )

Evaluating which record is greater in terms of cursor. This is used to avoid having to capture all the records to close a slice

class PerPartitionWithGlobalCursor(airbyte_cdk.sources.declarative.incremental.DeclarativeCursor):
 21class PerPartitionWithGlobalCursor(DeclarativeCursor):
 22    """
 23    Manages state for streams with multiple partitions, with an optional fallback to a global cursor when specific conditions are met.
 24
 25    This cursor handles partitioned streams by maintaining individual state per partition using `PerPartitionCursor`. If the number of partitions exceeds a defined limit, it switches to a global cursor (`GlobalSubstreamCursor`) to manage state more efficiently.
 26
 27    **Overview**
 28
 29    - **Partition-Based State**: Initially manages state per partition to ensure accurate processing of each partition's data.
 30    - **Global Fallback**: Switches to a global cursor when the partition limit is exceeded to handle state management more effectively.
 31
 32    **Switching Logic**
 33
 34    - Monitors the number of partitions.
 35    - If `PerPartitionCursor.limit_reached()` returns `True`, sets `_use_global_cursor` to `True`, activating the global cursor.
 36
 37    **Active Cursor Selection**
 38
 39    - Uses the `_get_active_cursor()` helper method to select the active cursor based on the `_use_global_cursor` flag.
 40    - This simplifies the logic and ensures consistent cursor usage across methods.
 41
 42    **State Structure Example**
 43
 44    ```json
 45    {
 46        "states": [
 47            {
 48                "partition": {"partition_key": "partition_1"},
 49                "cursor": {"cursor_field": "2021-01-15"}
 50            },
 51            {
 52                "partition": {"partition_key": "partition_2"},
 53                "cursor": {"cursor_field": "2021-02-14"}
 54            }
 55        ],
 56        "state": {
 57            "cursor_field": "2021-02-15"
 58        },
 59        "use_global_cursor": false
 60    }
 61    ```
 62
 63    In this example, the cursor is using partition-based state management (`"use_global_cursor": false`), maintaining separate cursor states for each partition.
 64
 65    **Usage Scenario**
 66
 67    Suitable for streams where the number of partitions may vary significantly, requiring dynamic switching between per-partition and global state management to ensure data consistency and efficient synchronization.
 68    """
 69
 70    def __init__(
 71        self,
 72        cursor_factory: CursorFactory,
 73        partition_router: PartitionRouter,
 74        stream_cursor: DatetimeBasedCursor,
 75    ):
 76        self._partition_router = partition_router
 77        self._per_partition_cursor = PerPartitionCursor(cursor_factory, partition_router)
 78        self._global_cursor = GlobalSubstreamCursor(stream_cursor, partition_router)
 79        self._use_global_cursor = False
 80        self._current_partition: Optional[Mapping[str, Any]] = None
 81        self._last_slice: bool = False
 82        self._parent_state: Optional[Mapping[str, Any]] = None
 83
 84    def _get_active_cursor(self) -> Union[PerPartitionCursor, GlobalSubstreamCursor]:
 85        return self._global_cursor if self._use_global_cursor else self._per_partition_cursor
 86
 87    def stream_slices(self) -> Iterable[StreamSlice]:
 88        self._global_cursor.start_slices_generation()
 89
 90        # Iterate through partitions and process slices
 91        for partition, is_last_partition, parent_state in iterate_with_last_flag_and_state(
 92            self._partition_router.stream_slices(), self._partition_router.get_stream_state
 93        ):
 94            # Generate slices for the current cursor and handle the last slice using the flag
 95            self._parent_state = parent_state
 96            for slice, is_last_slice, _ in iterate_with_last_flag_and_state(
 97                self._get_active_cursor().generate_slices_from_partition(partition=partition),
 98                lambda: None,
 99            ):
100                self._global_cursor.register_slice(is_last_slice and is_last_partition)
101                yield slice
102        self._parent_state = self._partition_router.get_stream_state()
103
104    def set_initial_state(self, stream_state: StreamState) -> None:
105        """
106        Set the initial state for the cursors.
107        """
108        self._use_global_cursor = stream_state.get("use_global_cursor", False)
109
110        self._parent_state = stream_state.get("parent_state", {})
111
112        self._global_cursor.set_initial_state(stream_state)
113        if not self._use_global_cursor:
114            self._per_partition_cursor.set_initial_state(stream_state)
115
116    def observe(self, stream_slice: StreamSlice, record: Record) -> None:
117        if not self._use_global_cursor and self._per_partition_cursor.limit_reached():
118            self._use_global_cursor = True
119
120        if not self._use_global_cursor:
121            self._per_partition_cursor.observe(stream_slice, record)
122        self._global_cursor.observe(stream_slice, record)
123
124    def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
125        if not self._use_global_cursor:
126            self._per_partition_cursor.close_slice(stream_slice, *args)
127        self._global_cursor.close_slice(stream_slice, *args)
128
129    def get_stream_state(self) -> StreamState:
130        final_state: MutableMapping[str, Any] = {"use_global_cursor": self._use_global_cursor}
131
132        final_state.update(self._global_cursor.get_stream_state())
133        if not self._use_global_cursor:
134            final_state.update(self._per_partition_cursor.get_stream_state())
135
136        final_state["parent_state"] = self._parent_state
137        if not final_state.get("parent_state"):
138            del final_state["parent_state"]
139
140        return final_state
141
142    def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
143        return self._get_active_cursor().select_state(stream_slice)
144
145    def get_request_params(
146        self,
147        *,
148        stream_state: Optional[StreamState] = None,
149        stream_slice: Optional[StreamSlice] = None,
150        next_page_token: Optional[Mapping[str, Any]] = None,
151    ) -> Mapping[str, Any]:
152        return self._get_active_cursor().get_request_params(
153            stream_state=stream_state,
154            stream_slice=stream_slice,
155            next_page_token=next_page_token,
156        )
157
158    def get_request_headers(
159        self,
160        *,
161        stream_state: Optional[StreamState] = None,
162        stream_slice: Optional[StreamSlice] = None,
163        next_page_token: Optional[Mapping[str, Any]] = None,
164    ) -> Mapping[str, Any]:
165        return self._get_active_cursor().get_request_headers(
166            stream_state=stream_state,
167            stream_slice=stream_slice,
168            next_page_token=next_page_token,
169        )
170
171    def get_request_body_data(
172        self,
173        *,
174        stream_state: Optional[StreamState] = None,
175        stream_slice: Optional[StreamSlice] = None,
176        next_page_token: Optional[Mapping[str, Any]] = None,
177    ) -> Union[Mapping[str, Any], str]:
178        return self._get_active_cursor().get_request_body_data(
179            stream_state=stream_state,
180            stream_slice=stream_slice,
181            next_page_token=next_page_token,
182        )
183
184    def get_request_body_json(
185        self,
186        *,
187        stream_state: Optional[StreamState] = None,
188        stream_slice: Optional[StreamSlice] = None,
189        next_page_token: Optional[Mapping[str, Any]] = None,
190    ) -> Mapping[str, Any]:
191        return self._get_active_cursor().get_request_body_json(
192            stream_state=stream_state,
193            stream_slice=stream_slice,
194            next_page_token=next_page_token,
195        )
196
197    def should_be_synced(self, record: Record) -> bool:
198        return self._get_active_cursor().should_be_synced(record)
199
200    def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
201        return self._global_cursor.is_greater_than_or_equal(first, second)

Manages state for streams with multiple partitions, with an optional fallback to a global cursor when specific conditions are met.

This cursor handles partitioned streams by maintaining individual state per partition using PerPartitionCursor. If the number of partitions exceeds a defined limit, it switches to a global cursor (GlobalSubstreamCursor) to manage state more efficiently.

Overview

  • Partition-Based State: Initially manages state per partition to ensure accurate processing of each partition's data.
  • Global Fallback: Switches to a global cursor when the partition limit is exceeded to handle state management more effectively.

Switching Logic

Active Cursor Selection

  • Uses the _get_active_cursor() helper method to select the active cursor based on the _use_global_cursor flag.
  • This simplifies the logic and ensures consistent cursor usage across methods.

State Structure Example

{
    "states": [
        {
            "partition": {"partition_key": "partition_1"},
            "cursor": {"cursor_field": "2021-01-15"}
        },
        {
            "partition": {"partition_key": "partition_2"},
            "cursor": {"cursor_field": "2021-02-14"}
        }
    ],
    "state": {
        "cursor_field": "2021-02-15"
    },
    "use_global_cursor": false
}

In this example, the cursor is using partition-based state management ("use_global_cursor": false), maintaining separate cursor states for each partition.

Usage Scenario

Suitable for streams where the number of partitions may vary significantly, requiring dynamic switching between per-partition and global state management to ensure data consistency and efficient synchronization.

PerPartitionWithGlobalCursor( cursor_factory: CursorFactory, partition_router: airbyte_cdk.sources.declarative.partition_routers.PartitionRouter, stream_cursor: DatetimeBasedCursor)
70    def __init__(
71        self,
72        cursor_factory: CursorFactory,
73        partition_router: PartitionRouter,
74        stream_cursor: DatetimeBasedCursor,
75    ):
76        self._partition_router = partition_router
77        self._per_partition_cursor = PerPartitionCursor(cursor_factory, partition_router)
78        self._global_cursor = GlobalSubstreamCursor(stream_cursor, partition_router)
79        self._use_global_cursor = False
80        self._current_partition: Optional[Mapping[str, Any]] = None
81        self._last_slice: bool = False
82        self._parent_state: Optional[Mapping[str, Any]] = None
def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
 87    def stream_slices(self) -> Iterable[StreamSlice]:
 88        self._global_cursor.start_slices_generation()
 89
 90        # Iterate through partitions and process slices
 91        for partition, is_last_partition, parent_state in iterate_with_last_flag_and_state(
 92            self._partition_router.stream_slices(), self._partition_router.get_stream_state
 93        ):
 94            # Generate slices for the current cursor and handle the last slice using the flag
 95            self._parent_state = parent_state
 96            for slice, is_last_slice, _ in iterate_with_last_flag_and_state(
 97                self._get_active_cursor().generate_slices_from_partition(partition=partition),
 98                lambda: None,
 99            ):
100                self._global_cursor.register_slice(is_last_slice and is_last_partition)
101                yield slice
102        self._parent_state = self._partition_router.get_stream_state()

Defines stream slices

Returns

An iterable of stream slices

def set_initial_state(self, stream_state: Mapping[str, Any]) -> None:
104    def set_initial_state(self, stream_state: StreamState) -> None:
105        """
106        Set the initial state for the cursors.
107        """
108        self._use_global_cursor = stream_state.get("use_global_cursor", False)
109
110        self._parent_state = stream_state.get("parent_state", {})
111
112        self._global_cursor.set_initial_state(stream_state)
113        if not self._use_global_cursor:
114            self._per_partition_cursor.set_initial_state(stream_state)

Set the initial state for the cursors.

def observe( self, stream_slice: airbyte_cdk.StreamSlice, record: airbyte_cdk.Record) -> None:
116    def observe(self, stream_slice: StreamSlice, record: Record) -> None:
117        if not self._use_global_cursor and self._per_partition_cursor.limit_reached():
118            self._use_global_cursor = True
119
120        if not self._use_global_cursor:
121            self._per_partition_cursor.observe(stream_slice, record)
122        self._global_cursor.observe(stream_slice, record)

Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read.

Parameters
  • stream_slice: The current slice, which may or may not contain the most recently observed record
  • record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the stream state may need to be deferred depending on whether the source reliably orders records by the cursor field.
def close_slice( self, stream_slice: airbyte_cdk.StreamSlice, *args: Any) -> None:
124    def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
125        if not self._use_global_cursor:
126            self._per_partition_cursor.close_slice(stream_slice, *args)
127        self._global_cursor.close_slice(stream_slice, *args)

Update state based on the stream slice. Note that stream_slice.cursor_slice and most_recent_record.associated_slice are expected to be the same but we make it explicit here that stream_slice should be leveraged to update the state. We do not pass in the latest record, since cursor instances should maintain the relevant internal state on their own.

Parameters
  • stream_slice: slice to close
def get_stream_state(self) -> Mapping[str, Any]:
129    def get_stream_state(self) -> StreamState:
130        final_state: MutableMapping[str, Any] = {"use_global_cursor": self._use_global_cursor}
131
132        final_state.update(self._global_cursor.get_stream_state())
133        if not self._use_global_cursor:
134            final_state.update(self._per_partition_cursor.get_stream_state())
135
136        final_state["parent_state"] = self._parent_state
137        if not final_state.get("parent_state"):
138            del final_state["parent_state"]
139
140        return final_state

Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it is used for two things:

  • Interpolation of the requests
  • Transformation of records
  • Saving the state

For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that allows for emitting the state to the platform.

def select_state( self, stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Optional[Mapping[str, Any]]:
142    def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
143        return self._get_active_cursor().select_state(stream_slice)

Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of a specific parent delineated by the incoming slice's partition object.

def get_request_params( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
145    def get_request_params(
146        self,
147        *,
148        stream_state: Optional[StreamState] = None,
149        stream_slice: Optional[StreamSlice] = None,
150        next_page_token: Optional[Mapping[str, Any]] = None,
151    ) -> Mapping[str, Any]:
152        return self._get_active_cursor().get_request_params(
153            stream_state=stream_state,
154            stream_slice=stream_slice,
155            next_page_token=next_page_token,
156        )

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def get_request_headers( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
158    def get_request_headers(
159        self,
160        *,
161        stream_state: Optional[StreamState] = None,
162        stream_slice: Optional[StreamSlice] = None,
163        next_page_token: Optional[Mapping[str, Any]] = None,
164    ) -> Mapping[str, Any]:
165        return self._get_active_cursor().get_request_headers(
166            stream_state=stream_state,
167            stream_slice=stream_slice,
168            next_page_token=next_page_token,
169        )

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def get_request_body_data( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Union[Mapping[str, Any], str]:
171    def get_request_body_data(
172        self,
173        *,
174        stream_state: Optional[StreamState] = None,
175        stream_slice: Optional[StreamSlice] = None,
176        next_page_token: Optional[Mapping[str, Any]] = None,
177    ) -> Union[Mapping[str, Any], str]:
178        return self._get_active_cursor().get_request_body_data(
179            stream_state=stream_state,
180            stream_slice=stream_slice,
181            next_page_token=next_page_token,
182        )

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_request_body_json( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
184    def get_request_body_json(
185        self,
186        *,
187        stream_state: Optional[StreamState] = None,
188        stream_slice: Optional[StreamSlice] = None,
189        next_page_token: Optional[Mapping[str, Any]] = None,
190    ) -> Mapping[str, Any]:
191        return self._get_active_cursor().get_request_body_json(
192            stream_state=stream_state,
193            stream_slice=stream_slice,
194            next_page_token=next_page_token,
195        )

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def should_be_synced(self, record: airbyte_cdk.Record) -> bool:
197    def should_be_synced(self, record: Record) -> bool:
198        return self._get_active_cursor().should_be_synced(record)

Evaluating if a record should be synced allows for filtering and stop condition on pagination

def is_greater_than_or_equal( self, first: airbyte_cdk.Record, second: airbyte_cdk.Record) -> bool:
200    def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
201        return self._global_cursor.is_greater_than_or_equal(first, second)

Evaluating which record is greater in terms of cursor. This is used to avoid having to capture all the records to close a slice

@dataclass
class ResumableFullRefreshCursor(airbyte_cdk.sources.declarative.incremental.DeclarativeCursor):
 12@dataclass
 13class ResumableFullRefreshCursor(DeclarativeCursor):
 14    parameters: InitVar[Mapping[str, Any]]
 15
 16    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 17        self._cursor: StreamState = {}
 18
 19    def get_stream_state(self) -> StreamState:
 20        return self._cursor
 21
 22    def set_initial_state(self, stream_state: StreamState) -> None:
 23        self._cursor = stream_state
 24
 25    def observe(self, stream_slice: StreamSlice, record: Record) -> None:
 26        """
 27        Resumable full refresh manages state using a page number so it does not need to update state by observing incoming records.
 28        """
 29        pass
 30
 31    def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
 32        # The ResumableFullRefreshCursor doesn't support nested streams yet so receiving a partition is unexpected
 33        if stream_slice.partition:
 34            raise ValueError(
 35                f"Stream slice {stream_slice} should not have a partition. Got {stream_slice.partition}."
 36            )
 37        self._cursor = stream_slice.cursor_slice
 38
 39    def should_be_synced(self, record: Record) -> bool:
 40        """
 41        Unlike date-based cursors which filter out records outside slice boundaries, resumable full refresh records exist within pages
 42        that don't have filterable bounds. We should always return them.
 43        """
 44        return True
 45
 46    def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
 47        """
 48        RFR record don't have ordering to be compared between one another.
 49        """
 50        return False
 51
 52    def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
 53        # A top-level RFR cursor only manages the state of a single partition
 54        return self._cursor
 55
 56    def stream_slices(self) -> Iterable[StreamSlice]:
 57        """
 58        Resumable full refresh cursors only return a single slice and can't perform partitioning because iteration is done per-page
 59        along an unbounded set.
 60        """
 61        yield from [StreamSlice(cursor_slice=self._cursor, partition={})]
 62
 63    # This is an interesting pattern that might not seem obvious at first glance. This cursor itself has no functional need to
 64    # inject any request values into the outbound response because the up-to-date pagination state is already loaded and
 65    # maintained by the paginator component
 66    def get_request_params(
 67        self,
 68        *,
 69        stream_state: Optional[StreamState] = None,
 70        stream_slice: Optional[StreamSlice] = None,
 71        next_page_token: Optional[Mapping[str, Any]] = None,
 72    ) -> Mapping[str, Any]:
 73        return {}
 74
 75    def get_request_headers(
 76        self,
 77        *,
 78        stream_state: Optional[StreamState] = None,
 79        stream_slice: Optional[StreamSlice] = None,
 80        next_page_token: Optional[Mapping[str, Any]] = None,
 81    ) -> Mapping[str, Any]:
 82        return {}
 83
 84    def get_request_body_data(
 85        self,
 86        *,
 87        stream_state: Optional[StreamState] = None,
 88        stream_slice: Optional[StreamSlice] = None,
 89        next_page_token: Optional[Mapping[str, Any]] = None,
 90    ) -> Mapping[str, Any]:
 91        return {}
 92
 93    def get_request_body_json(
 94        self,
 95        *,
 96        stream_state: Optional[StreamState] = None,
 97        stream_slice: Optional[StreamSlice] = None,
 98        next_page_token: Optional[Mapping[str, Any]] = None,
 99    ) -> Mapping[str, Any]:
100        return {}
ResumableFullRefreshCursor(parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def get_stream_state(self) -> Mapping[str, Any]:
19    def get_stream_state(self) -> StreamState:
20        return self._cursor

Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it is used for two things:

  • Interpolation of the requests
  • Transformation of records
  • Saving the state

For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that allows for emitting the state to the platform.

def set_initial_state(self, stream_state: Mapping[str, Any]) -> None:
22    def set_initial_state(self, stream_state: StreamState) -> None:
23        self._cursor = stream_state

Cursors are not initialized with their state. As state is needed in order to function properly, this method should be called before calling anything else

Parameters
  • stream_state: The state of the stream as returned by get_stream_state
def observe( self, stream_slice: airbyte_cdk.StreamSlice, record: airbyte_cdk.Record) -> None:
25    def observe(self, stream_slice: StreamSlice, record: Record) -> None:
26        """
27        Resumable full refresh manages state using a page number so it does not need to update state by observing incoming records.
28        """
29        pass

Resumable full refresh manages state using a page number so it does not need to update state by observing incoming records.

def close_slice( self, stream_slice: airbyte_cdk.StreamSlice, *args: Any) -> None:
31    def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
32        # The ResumableFullRefreshCursor doesn't support nested streams yet so receiving a partition is unexpected
33        if stream_slice.partition:
34            raise ValueError(
35                f"Stream slice {stream_slice} should not have a partition. Got {stream_slice.partition}."
36            )
37        self._cursor = stream_slice.cursor_slice

Update state based on the stream slice. Note that stream_slice.cursor_slice and most_recent_record.associated_slice are expected to be the same but we make it explicit here that stream_slice should be leveraged to update the state. We do not pass in the latest record, since cursor instances should maintain the relevant internal state on their own.

Parameters
  • stream_slice: slice to close
def should_be_synced(self, record: airbyte_cdk.Record) -> bool:
39    def should_be_synced(self, record: Record) -> bool:
40        """
41        Unlike date-based cursors which filter out records outside slice boundaries, resumable full refresh records exist within pages
42        that don't have filterable bounds. We should always return them.
43        """
44        return True

Unlike date-based cursors which filter out records outside slice boundaries, resumable full refresh records exist within pages that don't have filterable bounds. We should always return them.

def is_greater_than_or_equal( self, first: airbyte_cdk.Record, second: airbyte_cdk.Record) -> bool:
46    def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
47        """
48        RFR record don't have ordering to be compared between one another.
49        """
50        return False

RFR record don't have ordering to be compared between one another.

def select_state( self, stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Optional[Mapping[str, Any]]:
52    def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
53        # A top-level RFR cursor only manages the state of a single partition
54        return self._cursor

Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of a specific parent delineated by the incoming slice's partition object.

def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
56    def stream_slices(self) -> Iterable[StreamSlice]:
57        """
58        Resumable full refresh cursors only return a single slice and can't perform partitioning because iteration is done per-page
59        along an unbounded set.
60        """
61        yield from [StreamSlice(cursor_slice=self._cursor, partition={})]

Resumable full refresh cursors only return a single slice and can't perform partitioning because iteration is done per-page along an unbounded set.

def get_request_params( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
66    def get_request_params(
67        self,
68        *,
69        stream_state: Optional[StreamState] = None,
70        stream_slice: Optional[StreamSlice] = None,
71        next_page_token: Optional[Mapping[str, Any]] = None,
72    ) -> Mapping[str, Any]:
73        return {}

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def get_request_headers( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
75    def get_request_headers(
76        self,
77        *,
78        stream_state: Optional[StreamState] = None,
79        stream_slice: Optional[StreamSlice] = None,
80        next_page_token: Optional[Mapping[str, Any]] = None,
81    ) -> Mapping[str, Any]:
82        return {}

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def get_request_body_data( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
84    def get_request_body_data(
85        self,
86        *,
87        stream_state: Optional[StreamState] = None,
88        stream_slice: Optional[StreamSlice] = None,
89        next_page_token: Optional[Mapping[str, Any]] = None,
90    ) -> Mapping[str, Any]:
91        return {}

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_request_body_json( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
 93    def get_request_body_json(
 94        self,
 95        *,
 96        stream_state: Optional[StreamState] = None,
 97        stream_slice: Optional[StreamSlice] = None,
 98        next_page_token: Optional[Mapping[str, Any]] = None,
 99    ) -> Mapping[str, Any]:
100        return {}

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

@dataclass
class ChildPartitionResumableFullRefreshCursor(airbyte_cdk.sources.declarative.incremental.ResumableFullRefreshCursor):
103@dataclass
104class ChildPartitionResumableFullRefreshCursor(ResumableFullRefreshCursor):
105    """
106    The Sub-stream Resumable Cursor for Full-Refresh substreams.
107    Follows the parent type `ResumableFullRefreshCursor` with a small override,
108    to provide the ability to close the substream's slice once it has finished processing.
109
110    Check the `close_slice` method overide for more info about the actual behaviour of this cursor.
111    """
112
113    def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
114        """
115        Once the current slice has finished syncing:
116         - paginator returns None
117         - no more slices to process
118
119        we assume that the records are processed and emitted already,
120        thus we have to set the cursor to ` __ab_full_refresh_sync_complete: true `,
121        otherwise there is a risk of Inf. Loop processing the same slice.
122        """
123        self._cursor = FULL_REFRESH_COMPLETE_STATE

The Sub-stream Resumable Cursor for Full-Refresh substreams. Follows the parent type ResumableFullRefreshCursor with a small override, to provide the ability to close the substream's slice once it has finished processing.

Check the close_slice method overide for more info about the actual behaviour of this cursor.

ChildPartitionResumableFullRefreshCursor(parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
def close_slice( self, stream_slice: airbyte_cdk.StreamSlice, *args: Any) -> None:
113    def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
114        """
115        Once the current slice has finished syncing:
116         - paginator returns None
117         - no more slices to process
118
119        we assume that the records are processed and emitted already,
120        thus we have to set the cursor to ` __ab_full_refresh_sync_complete: true `,
121        otherwise there is a risk of Inf. Loop processing the same slice.
122        """
123        self._cursor = FULL_REFRESH_COMPLETE_STATE
Once the current slice has finished syncing:
  • paginator returns None
  • no more slices to process

we assume that the records are processed and emitted already, thus we have to set the cursor to __ab_full_refresh_sync_complete: true, otherwise there is a risk of Inf. Loop processing the same slice.