airbyte_cdk.legacy.sources.declarative.incremental

 1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
 2
 3from airbyte_cdk.legacy.sources.declarative.incremental.datetime_based_cursor import (
 4    DatetimeBasedCursor,
 5)
 6from airbyte_cdk.legacy.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
 7from airbyte_cdk.legacy.sources.declarative.incremental.global_substream_cursor import (
 8    GlobalSubstreamCursor,
 9)
10from airbyte_cdk.legacy.sources.declarative.incremental.per_partition_cursor import (
11    CursorFactory,
12    PerPartitionCursor,
13)
14from airbyte_cdk.legacy.sources.declarative.incremental.per_partition_with_global import (
15    PerPartitionWithGlobalCursor,
16)
17from airbyte_cdk.legacy.sources.declarative.incremental.resumable_full_refresh_cursor import (
18    ChildPartitionResumableFullRefreshCursor,
19    ResumableFullRefreshCursor,
20)
21
22__all__ = [
23    "CursorFactory",
24    "DatetimeBasedCursor",
25    "DeclarativeCursor",
26    "GlobalSubstreamCursor",
27    "PerPartitionCursor",
28    "PerPartitionWithGlobalCursor",
29    "ResumableFullRefreshCursor",
30    "ChildPartitionResumableFullRefreshCursor",
31]
class CursorFactory:
20class CursorFactory:
21    def __init__(self, create_function: Callable[[], DeclarativeCursor]):
22        self._create_function = create_function
23
24    def create(self) -> DeclarativeCursor:
25        return self._create_function()
CursorFactory( create_function: Callable[[], DeclarativeCursor])
21    def __init__(self, create_function: Callable[[], DeclarativeCursor]):
22        self._create_function = create_function
def create( self) -> DeclarativeCursor:
24    def create(self) -> DeclarativeCursor:
25        return self._create_function()
@dataclass
class DatetimeBasedCursor(airbyte_cdk.legacy.sources.declarative.incremental.DeclarativeCursor):
 28@dataclass
 29class DatetimeBasedCursor(DeclarativeCursor):
 30    """
 31    Slices the stream over a datetime range and create a state with format {<cursor_field>: <datetime> }
 32
 33    Given a start time, end time, a step function, and an optional lookback window,
 34    the stream slicer will partition the date range from start time - lookback window to end time.
 35
 36    The step function is defined as a string of the form ISO8601 duration
 37
 38    The timestamp format accepts the same format codes as datetime.strfptime, which are
 39    all the format codes required by the 1989 C standard.
 40    Full list of accepted format codes: https://man7.org/linux/man-pages/man3/strftime.3.html
 41
 42    Attributes:
 43        start_datetime (Union[MinMaxDatetime, str]): the datetime that determines the earliest record that should be synced
 44        end_datetime (Optional[Union[MinMaxDatetime, str]]): the datetime that determines the last record that should be synced
 45        cursor_field (Union[InterpolatedString, str]): record's cursor field
 46        datetime_format (str): format of the datetime
 47        step (Optional[str]): size of the timewindow (ISO8601 duration)
 48        cursor_granularity (Optional[str]): smallest increment the datetime_format has (ISO 8601 duration) that will be used to ensure that the start of a slice does not overlap with the end of the previous one
 49        config (Config): connection config
 50        start_time_option (Optional[RequestOption]): request option for start time
 51        end_time_option (Optional[RequestOption]): request option for end time
 52        partition_field_start (Optional[str]): partition start time field
 53        partition_field_end (Optional[str]): stream slice end time field
 54        lookback_window (Optional[InterpolatedString]): how many days before start_datetime to read data for (ISO8601 duration)
 55    """
 56
 57    start_datetime: Union[MinMaxDatetime, str]
 58    cursor_field: Union[InterpolatedString, str]
 59    datetime_format: str
 60    config: Config
 61    parameters: InitVar[Mapping[str, Any]]
 62    _highest_observed_cursor_field_value: Optional[str] = field(
 63        repr=False, default=None
 64    )  # tracks the latest observed datetime, which may not be safe to emit in the case of out-of-order records
 65    _cursor: Optional[str] = field(
 66        repr=False, default=None
 67    )  # tracks the latest observed datetime that is appropriate to emit as stream state
 68    end_datetime: Optional[Union[MinMaxDatetime, str]] = None
 69    step: Optional[Union[InterpolatedString, str]] = None
 70    cursor_granularity: Optional[str] = None
 71    start_time_option: Optional[RequestOption] = None
 72    end_time_option: Optional[RequestOption] = None
 73    partition_field_start: Optional[str] = None
 74    partition_field_end: Optional[str] = None
 75    lookback_window: Optional[Union[InterpolatedString, str]] = None
 76    message_repository: Optional[MessageRepository] = None
 77    is_compare_strictly: Optional[bool] = False
 78    cursor_datetime_formats: List[str] = field(default_factory=lambda: [])
 79
 80    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 81        if (self.step and not self.cursor_granularity) or (
 82            not self.step and self.cursor_granularity
 83        ):
 84            raise ValueError(
 85                f"If step is defined, cursor_granularity should be as well and vice-versa. "
 86                f"Right now, step is `{self.step}` and cursor_granularity is `{self.cursor_granularity}`"
 87            )
 88        self._start_datetime = MinMaxDatetime.create(self.start_datetime, parameters)
 89        self._end_datetime = (
 90            None if not self.end_datetime else MinMaxDatetime.create(self.end_datetime, parameters)
 91        )
 92
 93        self._timezone = datetime.timezone.utc
 94        self._interpolation = JinjaInterpolation()
 95
 96        self._step = (
 97            self._parse_timedelta(
 98                InterpolatedString.create(self.step, parameters=parameters).eval(self.config)
 99            )
100            if self.step
101            else datetime.timedelta.max
102        )
103        self._cursor_granularity = self._parse_timedelta(self.cursor_granularity)
104        self.cursor_field = InterpolatedString.create(self.cursor_field, parameters=parameters)
105        self._lookback_window = (
106            InterpolatedString.create(self.lookback_window, parameters=parameters)
107            if self.lookback_window
108            else None
109        )
110        self._partition_field_start = InterpolatedString.create(
111            self.partition_field_start or "start_time", parameters=parameters
112        )
113        self._partition_field_end = InterpolatedString.create(
114            self.partition_field_end or "end_time", parameters=parameters
115        )
116        self._parser = DatetimeParser()
117
118        # If datetime format is not specified then start/end datetime should inherit it from the stream slicer
119        if not self._start_datetime.datetime_format:
120            self._start_datetime.datetime_format = self.datetime_format
121        if self._end_datetime and not self._end_datetime.datetime_format:
122            self._end_datetime.datetime_format = self.datetime_format
123
124        if not self.cursor_datetime_formats:
125            self.cursor_datetime_formats = [self.datetime_format]
126
127        _validate_component_request_option_paths(
128            self.config, self.start_time_option, self.end_time_option
129        )
130
131    def get_stream_state(self) -> StreamState:
132        return {self.cursor_field.eval(self.config): self._cursor} if self._cursor else {}  # type: ignore  # cursor_field is converted to an InterpolatedString in __post_init__
133
134    def set_initial_state(self, stream_state: StreamState) -> None:
135        """
136        Cursors are not initialized with their state. As state is needed in order to function properly, this method should be called
137        before calling anything else
138
139        :param stream_state: The state of the stream as returned by get_stream_state
140        """
141        self._cursor = (
142            stream_state.get(self.cursor_field.eval(self.config)) if stream_state else None  # type: ignore [union-attr]
143        )
144
145    def observe(self, stream_slice: StreamSlice, record: Record) -> None:
146        """
147        Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read.
148
149        :param stream_slice: The current slice, which may or may not contain the most recently observed record
150        :param record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the
151          stream state may need to be deferred depending on whether the source reliably orders records by the cursor field.
152        """
153        record_cursor_value = record.get(self.cursor_field.eval(self.config))  # type: ignore  # cursor_field is converted to an InterpolatedString in __post_init__
154        # if the current record has no cursor value, we cannot meaningfully update the state based on it, so there is nothing more to do
155        if not record_cursor_value:
156            return
157
158        start_field = self._partition_field_start.eval(self.config)
159        end_field = self._partition_field_end.eval(self.config)
160        is_highest_observed_cursor_value = (
161            not self._highest_observed_cursor_field_value
162            or self.parse_date(record_cursor_value)
163            > self.parse_date(self._highest_observed_cursor_field_value)
164        )
165        if (
166            self._is_within_daterange_boundaries(
167                record,
168                stream_slice.get(start_field),  # type: ignore [arg-type]
169                stream_slice.get(end_field),  # type: ignore [arg-type]
170            )
171            and is_highest_observed_cursor_value
172        ):
173            self._highest_observed_cursor_field_value = record_cursor_value
174
175    def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
176        if stream_slice.partition:
177            raise ValueError(
178                f"Stream slice {stream_slice} should not have a partition. Got {stream_slice.partition}."
179            )
180        cursor_value_str_by_cursor_value_datetime = dict(
181            map(
182                # we need to ensure the cursor value is preserved as is in the state else the CATs might complain of something like
183                # 2023-01-04T17:30:19.000Z' <= '2023-01-04T17:30:19.000000Z'
184                lambda datetime_str: (self.parse_date(datetime_str), datetime_str),  # type: ignore # because of the filter on the next line, this will only be called with a str
185                filter(
186                    lambda item: item, [self._cursor, self._highest_observed_cursor_field_value]
187                ),
188            )
189        )
190        self._cursor = (
191            cursor_value_str_by_cursor_value_datetime[
192                max(cursor_value_str_by_cursor_value_datetime.keys())
193            ]
194            if cursor_value_str_by_cursor_value_datetime
195            else None
196        )
197
198    def stream_slices(self) -> Iterable[StreamSlice]:
199        """
200        Partition the daterange into slices of size = step.
201
202        The start of the window is the minimum datetime between start_datetime - lookback_window and the stream_state's datetime
203        The end of the window is the minimum datetime between the start of the window and end_datetime.
204
205        :return:
206        """
207        end_datetime = self.select_best_end_datetime()
208        start_datetime = self._calculate_earliest_possible_value(self.select_best_end_datetime())
209        return self._partition_daterange(start_datetime, end_datetime, self._step)
210
211    def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
212        # Datetime based cursors operate over slices made up of datetime ranges. Stream state is based on the progress
213        # through each slice and does not belong to a specific slice. We just return stream state as it is.
214        return self.get_stream_state()
215
216    def _calculate_earliest_possible_value(
217        self, end_datetime: datetime.datetime
218    ) -> datetime.datetime:
219        lookback_delta = self._parse_timedelta(
220            self._lookback_window.eval(self.config) if self._lookback_window else "P0D"
221        )
222        earliest_possible_start_datetime = min(
223            self._start_datetime.get_datetime(self.config), end_datetime
224        )
225        try:
226            cursor_datetime = (
227                self._calculate_cursor_datetime_from_state(self.get_stream_state()) - lookback_delta
228            )
229        except OverflowError:
230            # cursor_datetime defers to the minimum date if it does not exist in the state. Trying to subtract
231            # a timedelta from the minimum datetime results in an OverflowError
232            cursor_datetime = self._calculate_cursor_datetime_from_state(self.get_stream_state())
233        return max(earliest_possible_start_datetime, cursor_datetime)
234
235    def select_best_end_datetime(self) -> datetime.datetime:
236        """
237        Returns the optimal end datetime.
238        This method compares the current datetime with a pre-configured end datetime
239        and returns the earlier of the two. If no pre-configured end datetime is set,
240        the current datetime is returned.
241
242        :return datetime.datetime: The best end datetime, which is either the current datetime or the pre-configured end datetime, whichever is earlier.
243        """
244        now = datetime.datetime.now(tz=self._timezone)
245        if not self._end_datetime:
246            return now
247        return min(self._end_datetime.get_datetime(self.config), now)
248
249    def _calculate_cursor_datetime_from_state(
250        self, stream_state: Mapping[str, Any]
251    ) -> datetime.datetime:
252        if self.cursor_field.eval(self.config, stream_state=stream_state) in stream_state:  # type: ignore  # cursor_field is converted to an InterpolatedString in __post_init__
253            return self.parse_date(stream_state[self.cursor_field.eval(self.config)])  # type: ignore  # cursor_field is converted to an InterpolatedString in __post_init__
254        return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)
255
256    def _format_datetime(self, dt: datetime.datetime) -> str:
257        return self._parser.format(dt, self.datetime_format)
258
259    def _partition_daterange(
260        self,
261        start: datetime.datetime,
262        end: datetime.datetime,
263        step: Union[datetime.timedelta, Duration],
264    ) -> List[StreamSlice]:
265        start_field = self._partition_field_start.eval(self.config)
266        end_field = self._partition_field_end.eval(self.config)
267        dates = []
268
269        while self._is_within_date_range(start, end):
270            next_start = self._evaluate_next_start_date_safely(start, step)
271            end_date = self._get_date(next_start - self._cursor_granularity, end, min)
272            dates.append(
273                StreamSlice(
274                    partition={},
275                    cursor_slice={
276                        start_field: self._format_datetime(start),
277                        end_field: self._format_datetime(end_date),
278                    },
279                )
280            )
281            start = next_start
282        return dates
283
284    def _is_within_date_range(self, start: datetime.datetime, end: datetime.datetime) -> bool:
285        if self.is_compare_strictly:
286            return start < end
287        return start <= end
288
289    def _evaluate_next_start_date_safely(
290        self, start: datetime.datetime, step: datetime.timedelta
291    ) -> datetime.datetime:
292        """
293        Given that we set the default step at datetime.timedelta.max, we will generate an OverflowError when evaluating the next start_date
294        This method assumes that users would never enter a step that would generate an overflow. Given that would be the case, the code
295        would have broken anyway.
296        """
297        try:
298            return start + step
299        except OverflowError:
300            return datetime.datetime.max.replace(tzinfo=datetime.timezone.utc)
301
302    def _get_date(
303        self,
304        cursor_value: datetime.datetime,
305        default_date: datetime.datetime,
306        comparator: Callable[[datetime.datetime, datetime.datetime], datetime.datetime],
307    ) -> datetime.datetime:
308        cursor_date = cursor_value or default_date
309        return comparator(cursor_date, default_date)
310
311    def parse_date(self, date: str) -> datetime.datetime:
312        for datetime_format in self.cursor_datetime_formats + [self.datetime_format]:
313            try:
314                return self._parser.parse(date, datetime_format)
315            except ValueError:
316                pass
317        raise ValueError(f"No format in {self.cursor_datetime_formats} matching {date}")
318
319    @classmethod
320    def _parse_timedelta(cls, time_str: Optional[str]) -> Union[datetime.timedelta, Duration]:
321        """
322        :return Parses an ISO 8601 durations into datetime.timedelta or Duration objects.
323        """
324        if not time_str:
325            return datetime.timedelta(0)
326        return parse_duration(time_str)
327
328    def get_request_params(
329        self,
330        *,
331        stream_state: Optional[StreamState] = None,
332        stream_slice: Optional[StreamSlice] = None,
333        next_page_token: Optional[Mapping[str, Any]] = None,
334    ) -> Mapping[str, Any]:
335        return self._get_request_options(RequestOptionType.request_parameter, stream_slice)
336
337    def get_request_headers(
338        self,
339        *,
340        stream_state: Optional[StreamState] = None,
341        stream_slice: Optional[StreamSlice] = None,
342        next_page_token: Optional[Mapping[str, Any]] = None,
343    ) -> Mapping[str, Any]:
344        return self._get_request_options(RequestOptionType.header, stream_slice)
345
346    def get_request_body_data(
347        self,
348        *,
349        stream_state: Optional[StreamState] = None,
350        stream_slice: Optional[StreamSlice] = None,
351        next_page_token: Optional[Mapping[str, Any]] = None,
352    ) -> Mapping[str, Any]:
353        return self._get_request_options(RequestOptionType.body_data, stream_slice)
354
355    def get_request_body_json(
356        self,
357        *,
358        stream_state: Optional[StreamState] = None,
359        stream_slice: Optional[StreamSlice] = None,
360        next_page_token: Optional[Mapping[str, Any]] = None,
361    ) -> Mapping[str, Any]:
362        return self._get_request_options(RequestOptionType.body_json, stream_slice)
363
364    def request_kwargs(self) -> Mapping[str, Any]:
365        # Never update kwargs
366        return {}
367
368    def _get_request_options(
369        self, option_type: RequestOptionType, stream_slice: Optional[StreamSlice]
370    ) -> Mapping[str, Any]:
371        options: MutableMapping[str, Any] = {}
372        if not stream_slice:
373            return options
374
375        if self.start_time_option and self.start_time_option.inject_into == option_type:
376            start_time_value = stream_slice.get(self._partition_field_start.eval(self.config))
377            self.start_time_option.inject_into_request(options, start_time_value, self.config)
378
379        if self.end_time_option and self.end_time_option.inject_into == option_type:
380            end_time_value = stream_slice.get(self._partition_field_end.eval(self.config))
381            self.end_time_option.inject_into_request(options, end_time_value, self.config)
382
383        return options
384
385    def should_be_synced(self, record: Record) -> bool:
386        cursor_field = self.cursor_field.eval(self.config)  # type: ignore  # cursor_field is converted to an InterpolatedString in __post_init__
387        record_cursor_value = record.get(cursor_field)
388        if not record_cursor_value:
389            self._send_log(
390                Level.WARN,
391                f"Could not find cursor field `{cursor_field}` in record. The incremental sync will assume it needs to be synced",
392            )
393            return True
394        latest_possible_cursor_value = self.select_best_end_datetime()
395        earliest_possible_cursor_value = self._calculate_earliest_possible_value(
396            latest_possible_cursor_value
397        )
398        return self._is_within_daterange_boundaries(
399            record, earliest_possible_cursor_value, latest_possible_cursor_value
400        )
401
402    def _is_within_daterange_boundaries(
403        self,
404        record: Record,
405        start_datetime_boundary: Union[datetime.datetime, str],
406        end_datetime_boundary: Union[datetime.datetime, str],
407    ) -> bool:
408        cursor_field = self.cursor_field.eval(self.config)  # type: ignore  # cursor_field is converted to an InterpolatedString in __post_init__
409        record_cursor_value = record.get(cursor_field)
410        if not record_cursor_value:
411            self._send_log(
412                Level.WARN,
413                f"Could not find cursor field `{cursor_field}` in record. The record will not be considered when emitting sync state",
414            )
415            return False
416        if isinstance(start_datetime_boundary, str):
417            start_datetime_boundary = self.parse_date(start_datetime_boundary)
418        if isinstance(end_datetime_boundary, str):
419            end_datetime_boundary = self.parse_date(end_datetime_boundary)
420        return (
421            start_datetime_boundary <= self.parse_date(record_cursor_value) <= end_datetime_boundary
422        )
423
424    def _send_log(self, level: Level, message: str) -> None:
425        if self.message_repository:
426            self.message_repository.emit_message(
427                AirbyteMessage(
428                    type=Type.LOG,
429                    log=AirbyteLogMessage(level=level, message=message),
430                )
431            )
432
433    def set_runtime_lookback_window(self, lookback_window_in_seconds: int) -> None:
434        """
435        Updates the lookback window based on a given number of seconds if the new duration
436        is greater than the currently configured lookback window.
437
438        :param lookback_window_in_seconds: The lookback duration in seconds to potentially update to.
439        """
440        runtime_lookback_window = duration_isoformat(timedelta(seconds=lookback_window_in_seconds))
441        config_lookback = parse_duration(
442            self._lookback_window.eval(self.config) if self._lookback_window else "P0D"
443        )
444
445        # Check if the new runtime lookback window is greater than the current config lookback
446        if parse_duration(runtime_lookback_window) > config_lookback:
447            self._lookback_window = InterpolatedString.create(
448                runtime_lookback_window, parameters={}
449            )

Slices the stream over a datetime range and create a state with format {: }

Given a start time, end time, a step function, and an optional lookback window, the stream slicer will partition the date range from start time - lookback window to end time.

The step function is defined as a string of the form ISO8601 duration

The timestamp format accepts the same format codes as datetime.strfptime, which are all the format codes required by the 1989 C standard. Full list of accepted format codes: https://man7.org/linux/man-pages/man3/strftime.3.html

Attributes:
  • start_datetime (Union[MinMaxDatetime, str]): the datetime that determines the earliest record that should be synced
  • end_datetime (Optional[Union[MinMaxDatetime, str]]): the datetime that determines the last record that should be synced
  • cursor_field (Union[InterpolatedString, str]): record's cursor field
  • datetime_format (str): format of the datetime
  • step (Optional[str]): size of the timewindow (ISO8601 duration)
  • cursor_granularity (Optional[str]): smallest increment the datetime_format has (ISO 8601 duration) that will be used to ensure that the start of a slice does not overlap with the end of the previous one
  • config (Config): connection config
  • start_time_option (Optional[RequestOption]): request option for start time
  • end_time_option (Optional[RequestOption]): request option for end time
  • partition_field_start (Optional[str]): partition start time field
  • partition_field_end (Optional[str]): stream slice end time field
  • lookback_window (Optional[InterpolatedString]): how many days before start_datetime to read data for (ISO8601 duration)
DatetimeBasedCursor( start_datetime: Union[airbyte_cdk.MinMaxDatetime, str], cursor_field: Union[airbyte_cdk.InterpolatedString, str], datetime_format: str, config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], _highest_observed_cursor_field_value: Optional[str] = None, _cursor: Optional[str] = None, end_datetime: Union[airbyte_cdk.MinMaxDatetime, str, NoneType] = None, step: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, cursor_granularity: Optional[str] = None, start_time_option: Optional[airbyte_cdk.RequestOption] = None, end_time_option: Optional[airbyte_cdk.RequestOption] = None, partition_field_start: Optional[str] = None, partition_field_end: Optional[str] = None, lookback_window: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, message_repository: Optional[airbyte_cdk.MessageRepository] = None, is_compare_strictly: Optional[bool] = False, cursor_datetime_formats: List[str] = <factory>)
start_datetime: Union[airbyte_cdk.MinMaxDatetime, str]
cursor_field: Union[airbyte_cdk.InterpolatedString, str]
datetime_format: str
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
end_datetime: Union[airbyte_cdk.MinMaxDatetime, str, NoneType] = None
step: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
cursor_granularity: Optional[str] = None
start_time_option: Optional[airbyte_cdk.RequestOption] = None
end_time_option: Optional[airbyte_cdk.RequestOption] = None
partition_field_start: Optional[str] = None
partition_field_end: Optional[str] = None
lookback_window: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
message_repository: Optional[airbyte_cdk.MessageRepository] = None
is_compare_strictly: Optional[bool] = False
cursor_datetime_formats: List[str]
def get_stream_state(self) -> Mapping[str, Any]:
131    def get_stream_state(self) -> StreamState:
132        return {self.cursor_field.eval(self.config): self._cursor} if self._cursor else {}  # type: ignore  # cursor_field is converted to an InterpolatedString in __post_init__

Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it is used for two things:

  • Interpolation of the requests
  • Transformation of records
  • Saving the state

For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that allows for emitting the state to the platform.

def set_initial_state(self, stream_state: Mapping[str, Any]) -> None:
134    def set_initial_state(self, stream_state: StreamState) -> None:
135        """
136        Cursors are not initialized with their state. As state is needed in order to function properly, this method should be called
137        before calling anything else
138
139        :param stream_state: The state of the stream as returned by get_stream_state
140        """
141        self._cursor = (
142            stream_state.get(self.cursor_field.eval(self.config)) if stream_state else None  # type: ignore [union-attr]
143        )

Cursors are not initialized with their state. As state is needed in order to function properly, this method should be called before calling anything else

Parameters
  • stream_state: The state of the stream as returned by get_stream_state
def observe( self, stream_slice: airbyte_cdk.StreamSlice, record: airbyte_cdk.Record) -> None:
145    def observe(self, stream_slice: StreamSlice, record: Record) -> None:
146        """
147        Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read.
148
149        :param stream_slice: The current slice, which may or may not contain the most recently observed record
150        :param record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the
151          stream state may need to be deferred depending on whether the source reliably orders records by the cursor field.
152        """
153        record_cursor_value = record.get(self.cursor_field.eval(self.config))  # type: ignore  # cursor_field is converted to an InterpolatedString in __post_init__
154        # if the current record has no cursor value, we cannot meaningfully update the state based on it, so there is nothing more to do
155        if not record_cursor_value:
156            return
157
158        start_field = self._partition_field_start.eval(self.config)
159        end_field = self._partition_field_end.eval(self.config)
160        is_highest_observed_cursor_value = (
161            not self._highest_observed_cursor_field_value
162            or self.parse_date(record_cursor_value)
163            > self.parse_date(self._highest_observed_cursor_field_value)
164        )
165        if (
166            self._is_within_daterange_boundaries(
167                record,
168                stream_slice.get(start_field),  # type: ignore [arg-type]
169                stream_slice.get(end_field),  # type: ignore [arg-type]
170            )
171            and is_highest_observed_cursor_value
172        ):
173            self._highest_observed_cursor_field_value = record_cursor_value

Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read.

Parameters
  • stream_slice: The current slice, which may or may not contain the most recently observed record
  • record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the stream state may need to be deferred depending on whether the source reliably orders records by the cursor field.
def close_slice( self, stream_slice: airbyte_cdk.StreamSlice, *args: Any) -> None:
175    def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
176        if stream_slice.partition:
177            raise ValueError(
178                f"Stream slice {stream_slice} should not have a partition. Got {stream_slice.partition}."
179            )
180        cursor_value_str_by_cursor_value_datetime = dict(
181            map(
182                # we need to ensure the cursor value is preserved as is in the state else the CATs might complain of something like
183                # 2023-01-04T17:30:19.000Z' <= '2023-01-04T17:30:19.000000Z'
184                lambda datetime_str: (self.parse_date(datetime_str), datetime_str),  # type: ignore # because of the filter on the next line, this will only be called with a str
185                filter(
186                    lambda item: item, [self._cursor, self._highest_observed_cursor_field_value]
187                ),
188            )
189        )
190        self._cursor = (
191            cursor_value_str_by_cursor_value_datetime[
192                max(cursor_value_str_by_cursor_value_datetime.keys())
193            ]
194            if cursor_value_str_by_cursor_value_datetime
195            else None
196        )

Update state based on the stream slice. Note that stream_slice.cursor_slice and most_recent_record.associated_slice are expected to be the same but we make it explicit here that stream_slice should be leveraged to update the state. We do not pass in the latest record, since cursor instances should maintain the relevant internal state on their own.

Parameters
  • stream_slice: slice to close
def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
198    def stream_slices(self) -> Iterable[StreamSlice]:
199        """
200        Partition the daterange into slices of size = step.
201
202        The start of the window is the minimum datetime between start_datetime - lookback_window and the stream_state's datetime
203        The end of the window is the minimum datetime between the start of the window and end_datetime.
204
205        :return:
206        """
207        end_datetime = self.select_best_end_datetime()
208        start_datetime = self._calculate_earliest_possible_value(self.select_best_end_datetime())
209        return self._partition_daterange(start_datetime, end_datetime, self._step)

Partition the daterange into slices of size = step.

The start of the window is the minimum datetime between start_datetime - lookback_window and the stream_state's datetime The end of the window is the minimum datetime between the start of the window and end_datetime.

Returns
def select_state( self, stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Optional[Mapping[str, Any]]:
211    def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
212        # Datetime based cursors operate over slices made up of datetime ranges. Stream state is based on the progress
213        # through each slice and does not belong to a specific slice. We just return stream state as it is.
214        return self.get_stream_state()

Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of a specific parent delineated by the incoming slice's partition object.

def select_best_end_datetime(self) -> datetime.datetime:
235    def select_best_end_datetime(self) -> datetime.datetime:
236        """
237        Returns the optimal end datetime.
238        This method compares the current datetime with a pre-configured end datetime
239        and returns the earlier of the two. If no pre-configured end datetime is set,
240        the current datetime is returned.
241
242        :return datetime.datetime: The best end datetime, which is either the current datetime or the pre-configured end datetime, whichever is earlier.
243        """
244        now = datetime.datetime.now(tz=self._timezone)
245        if not self._end_datetime:
246            return now
247        return min(self._end_datetime.get_datetime(self.config), now)

Returns the optimal end datetime. This method compares the current datetime with a pre-configured end datetime and returns the earlier of the two. If no pre-configured end datetime is set, the current datetime is returned.

Returns

The best end datetime, which is either the current datetime or the pre-configured end datetime, whichever is earlier.

def parse_date(self, date: str) -> datetime.datetime:
311    def parse_date(self, date: str) -> datetime.datetime:
312        for datetime_format in self.cursor_datetime_formats + [self.datetime_format]:
313            try:
314                return self._parser.parse(date, datetime_format)
315            except ValueError:
316                pass
317        raise ValueError(f"No format in {self.cursor_datetime_formats} matching {date}")
def get_request_params( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
328    def get_request_params(
329        self,
330        *,
331        stream_state: Optional[StreamState] = None,
332        stream_slice: Optional[StreamSlice] = None,
333        next_page_token: Optional[Mapping[str, Any]] = None,
334    ) -> Mapping[str, Any]:
335        return self._get_request_options(RequestOptionType.request_parameter, stream_slice)

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def get_request_headers( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
337    def get_request_headers(
338        self,
339        *,
340        stream_state: Optional[StreamState] = None,
341        stream_slice: Optional[StreamSlice] = None,
342        next_page_token: Optional[Mapping[str, Any]] = None,
343    ) -> Mapping[str, Any]:
344        return self._get_request_options(RequestOptionType.header, stream_slice)

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def get_request_body_data( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
346    def get_request_body_data(
347        self,
348        *,
349        stream_state: Optional[StreamState] = None,
350        stream_slice: Optional[StreamSlice] = None,
351        next_page_token: Optional[Mapping[str, Any]] = None,
352    ) -> Mapping[str, Any]:
353        return self._get_request_options(RequestOptionType.body_data, stream_slice)

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_request_body_json( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
355    def get_request_body_json(
356        self,
357        *,
358        stream_state: Optional[StreamState] = None,
359        stream_slice: Optional[StreamSlice] = None,
360        next_page_token: Optional[Mapping[str, Any]] = None,
361    ) -> Mapping[str, Any]:
362        return self._get_request_options(RequestOptionType.body_json, stream_slice)

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def request_kwargs(self) -> Mapping[str, Any]:
364    def request_kwargs(self) -> Mapping[str, Any]:
365        # Never update kwargs
366        return {}
def should_be_synced(self, record: airbyte_cdk.Record) -> bool:
385    def should_be_synced(self, record: Record) -> bool:
386        cursor_field = self.cursor_field.eval(self.config)  # type: ignore  # cursor_field is converted to an InterpolatedString in __post_init__
387        record_cursor_value = record.get(cursor_field)
388        if not record_cursor_value:
389            self._send_log(
390                Level.WARN,
391                f"Could not find cursor field `{cursor_field}` in record. The incremental sync will assume it needs to be synced",
392            )
393            return True
394        latest_possible_cursor_value = self.select_best_end_datetime()
395        earliest_possible_cursor_value = self._calculate_earliest_possible_value(
396            latest_possible_cursor_value
397        )
398        return self._is_within_daterange_boundaries(
399            record, earliest_possible_cursor_value, latest_possible_cursor_value
400        )

Evaluating if a record should be synced allows for filtering and stop condition on pagination

def set_runtime_lookback_window(self, lookback_window_in_seconds: int) -> None:
433    def set_runtime_lookback_window(self, lookback_window_in_seconds: int) -> None:
434        """
435        Updates the lookback window based on a given number of seconds if the new duration
436        is greater than the currently configured lookback window.
437
438        :param lookback_window_in_seconds: The lookback duration in seconds to potentially update to.
439        """
440        runtime_lookback_window = duration_isoformat(timedelta(seconds=lookback_window_in_seconds))
441        config_lookback = parse_duration(
442            self._lookback_window.eval(self.config) if self._lookback_window else "P0D"
443        )
444
445        # Check if the new runtime lookback window is greater than the current config lookback
446        if parse_duration(runtime_lookback_window) > config_lookback:
447            self._lookback_window = InterpolatedString.create(
448                runtime_lookback_window, parameters={}
449            )

Updates the lookback window based on a given number of seconds if the new duration is greater than the currently configured lookback window.

Parameters
  • lookback_window_in_seconds: The lookback duration in seconds to potentially update to.
10class DeclarativeCursor(Cursor, StreamSlicer, ABC):
11    """
12    DeclarativeCursors are components that allow for checkpointing syncs. In addition to managing the fetching and updating of
13    state, declarative cursors also manage stream slicing and injecting slice values into outbound requests.
14    """

DeclarativeCursors are components that allow for checkpointing syncs. In addition to managing the fetching and updating of state, declarative cursors also manage stream slicing and injecting slice values into outbound requests.

 74class GlobalSubstreamCursor(DeclarativeCursor):
 75    """
 76    The GlobalSubstreamCursor is designed to track the state of substreams using a single global cursor.
 77    This class is beneficial for streams with many partitions, as it allows the state to be managed globally
 78    instead of per partition, simplifying state management and reducing the size of state messages.
 79
 80    This cursor is activated by setting the `global_substream_cursor` parameter for incremental sync.
 81
 82    Warnings:
 83    - This class enforces a minimal lookback window for substream based on the duration of the previous sync to avoid losing records. This lookback ensures that any records added or updated during the sync are captured in subsequent syncs.
 84    - The global cursor is updated only at the end of the sync. If the sync ends prematurely (e.g., due to an exception), the state will not be updated.
 85    - When using the `incremental_dependency` option, the sync will progress through parent records, preventing the sync from getting infinitely stuck. However, it is crucial to understand the requirements for both the `global_substream_cursor` and `incremental_dependency` options to avoid data loss.
 86    """
 87
 88    def __init__(self, stream_cursor: DatetimeBasedCursor, partition_router: PartitionRouter):
 89        self._stream_cursor = stream_cursor
 90        self._partition_router = partition_router
 91        self._timer = Timer()
 92        self._lock = threading.Lock()
 93        self._slice_semaphore = threading.Semaphore(
 94            0
 95        )  # Start with 0, indicating no slices being tracked
 96        self._all_slices_yielded = False
 97        self._lookback_window: Optional[int] = None
 98        self._current_partition: Optional[Mapping[str, Any]] = None
 99        self._last_slice: bool = False
100        self._parent_state: Optional[Mapping[str, Any]] = None
101
102    def start_slices_generation(self) -> None:
103        self._timer.start()
104
105    def stream_slices(self) -> Iterable[StreamSlice]:
106        """
107        Generates stream slices, ensuring the last slice is properly flagged and processed.
108
109        This method creates a sequence of stream slices by iterating over partitions and cursor slices.
110        It holds onto one slice in memory to set `_all_slices_yielded` to `True` before yielding the
111        final slice. A semaphore is used to track the processing of slices, ensuring that `close_slice`
112        is called only after all slices have been processed.
113
114        We expect the following events:
115        * Yields all the slices except the last one. At this point, `close_slice` won't actually close the global slice as `self._all_slices_yielded == False`
116        * Release the semaphore one last time before setting `self._all_slices_yielded = True`. This will cause `close_slice` to know about all the slices before we indicate that all slices have been yielded so the left side of `if self._all_slices_yielded and self._slice_semaphore._value == 0` will be false if not everything is closed
117        * Setting `self._all_slices_yielded = True`. We do that before actually yielding the last slice as the caller of `stream_slices` might stop iterating at any point and hence the code after `yield` might not be executed
118        * Yield the last slice. At that point, once there are as many slices yielded as closes, the global slice will be closed too
119        """
120        slice_generator = (
121            StreamSlice(
122                partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields
123            )
124            for partition in self._partition_router.stream_slices()
125            for cursor_slice in self._stream_cursor.stream_slices()
126        )
127
128        self.start_slices_generation()
129        for slice, last, state in iterate_with_last_flag_and_state(
130            slice_generator, self._partition_router.get_stream_state
131        ):
132            self._parent_state = state
133            self.register_slice(last)
134            yield slice
135        self._parent_state = self._partition_router.get_stream_state()
136
137    def generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[StreamSlice]:
138        slice_generator = (
139            StreamSlice(
140                partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields
141            )
142            for cursor_slice in self._stream_cursor.stream_slices()
143        )
144
145        yield from slice_generator
146
147    def register_slice(self, last: bool) -> None:
148        """
149        Tracks the processing of a stream slice.
150
151        Releases the semaphore for each slice. If it's the last slice (`last=True`),
152        sets `_all_slices_yielded` to `True` to indicate no more slices will be processed.
153
154        Args:
155            last (bool): True if the current slice is the last in the sequence.
156        """
157        self._slice_semaphore.release()
158        if last:
159            self._all_slices_yielded = True
160
161    def set_initial_state(self, stream_state: StreamState) -> None:
162        """
163        Set the initial state for the cursors.
164
165        This method initializes the state for the global cursor using the provided stream state.
166
167        Additionally, it sets the parent state for partition routers that are based on parent streams. If a partition router
168        does not have parent streams, this step will be skipped due to the default PartitionRouter implementation.
169
170        Args:
171            stream_state (StreamState): The state of the streams to be set. The format of the stream state should be:
172                {
173                    "state": {
174                        "last_updated": "2023-05-27T00:00:00Z"
175                    },
176                    "parent_state": {
177                        "parent_stream_name": {
178                            "last_updated": "2023-05-27T00:00:00Z"
179                        }
180                    },
181                    "lookback_window": 132
182                }
183        """
184        if not stream_state:
185            return
186
187        if "lookback_window" in stream_state:
188            self._lookback_window = stream_state["lookback_window"]
189            self._inject_lookback_into_stream_cursor(stream_state["lookback_window"])
190
191        if "state" in stream_state:
192            self._stream_cursor.set_initial_state(stream_state["state"])
193        elif "states" not in stream_state:
194            # We assume that `stream_state` is in the old global format
195            # Example: {"global_state_format_key": "global_state_format_value"}
196            self._stream_cursor.set_initial_state(stream_state)
197
198        # We used to set the parent state through this method but since moving the SubstreamPartitionRouter to the
199        # Concurrent CDK/AbstractStream, the state is passed at the __init__ stage and this does not need to be called.
200        # We are still keeping this line as a comment to be explicit about the past behavior.
201        # self._partition_router.set_initial_state(stream_state)
202
203    def _inject_lookback_into_stream_cursor(self, lookback_window: int) -> None:
204        """
205        Modifies the stream cursor's lookback window based on the duration of the previous sync.
206        This adjustment ensures the cursor is set to the minimal lookback window necessary for
207        avoiding missing data.
208
209        Parameters:
210            lookback_window (int): The lookback duration in seconds to be set, derived from
211                                   the previous sync.
212
213        Raises:
214            ValueError: If the cursor does not support dynamic lookback window adjustments.
215        """
216        if hasattr(self._stream_cursor, "set_runtime_lookback_window"):
217            self._stream_cursor.set_runtime_lookback_window(lookback_window)
218        else:
219            raise ValueError(
220                "The cursor class for Global Substream Cursor does not have a set_runtime_lookback_window method"
221            )
222
223    def observe(self, stream_slice: StreamSlice, record: Record) -> None:
224        self._stream_cursor.observe(
225            StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), record
226        )
227
228    def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
229        """
230        Close the current stream slice.
231
232        This method is called when a stream slice is completed. For the global parent cursor, we close the child cursor
233        only after reading all slices. This ensures that we do not miss any child records from a later parent record
234        if the child cursor is earlier than a record from the first parent record.
235
236        Args:
237            stream_slice (StreamSlice): The stream slice to be closed.
238            *args (Any): Additional arguments.
239        """
240        with self._lock:
241            self._slice_semaphore.acquire()
242            if self._all_slices_yielded and self._slice_semaphore._value == 0:
243                self._lookback_window = self._timer.finish()
244                self._stream_cursor.close_slice(
245                    StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), *args
246                )
247
248    def get_stream_state(self) -> StreamState:
249        state: dict[str, Any] = {"state": self._stream_cursor.get_stream_state()}
250
251        if self._parent_state:
252            state["parent_state"] = self._parent_state
253
254        if self._lookback_window is not None:
255            state["lookback_window"] = self._lookback_window
256
257        return state
258
259    def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
260        # stream_slice is ignored as cursor is global
261        return self._stream_cursor.get_stream_state()
262
263    def get_request_params(
264        self,
265        *,
266        stream_state: Optional[StreamState] = None,
267        stream_slice: Optional[StreamSlice] = None,
268        next_page_token: Optional[Mapping[str, Any]] = None,
269    ) -> Mapping[str, Any]:
270        if stream_slice:
271            return self._partition_router.get_request_params(  # type: ignore # this always returns a mapping
272                stream_state=stream_state,
273                stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
274                next_page_token=next_page_token,
275            ) | self._stream_cursor.get_request_params(
276                stream_state=stream_state,
277                stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
278                next_page_token=next_page_token,
279            )
280        else:
281            raise ValueError("A partition needs to be provided in order to get request params")
282
283    def get_request_headers(
284        self,
285        *,
286        stream_state: Optional[StreamState] = None,
287        stream_slice: Optional[StreamSlice] = None,
288        next_page_token: Optional[Mapping[str, Any]] = None,
289    ) -> Mapping[str, Any]:
290        if stream_slice:
291            return self._partition_router.get_request_headers(  # type: ignore # this always returns a mapping
292                stream_state=stream_state,
293                stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
294                next_page_token=next_page_token,
295            ) | self._stream_cursor.get_request_headers(
296                stream_state=stream_state,
297                stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
298                next_page_token=next_page_token,
299            )
300        else:
301            raise ValueError("A partition needs to be provided in order to get request headers")
302
303    def get_request_body_data(
304        self,
305        *,
306        stream_state: Optional[StreamState] = None,
307        stream_slice: Optional[StreamSlice] = None,
308        next_page_token: Optional[Mapping[str, Any]] = None,
309    ) -> Union[Mapping[str, Any], str]:
310        if stream_slice:
311            return self._partition_router.get_request_body_data(  # type: ignore # this always returns a mapping
312                stream_state=stream_state,
313                stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
314                next_page_token=next_page_token,
315            ) | self._stream_cursor.get_request_body_data(
316                stream_state=stream_state,
317                stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
318                next_page_token=next_page_token,
319            )
320        else:
321            raise ValueError("A partition needs to be provided in order to get request body data")
322
323    def get_request_body_json(
324        self,
325        *,
326        stream_state: Optional[StreamState] = None,
327        stream_slice: Optional[StreamSlice] = None,
328        next_page_token: Optional[Mapping[str, Any]] = None,
329    ) -> Mapping[str, Any]:
330        if stream_slice:
331            return self._partition_router.get_request_body_json(  # type: ignore # this always returns a mapping
332                stream_state=stream_state,
333                stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
334                next_page_token=next_page_token,
335            ) | self._stream_cursor.get_request_body_json(
336                stream_state=stream_state,
337                stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
338                next_page_token=next_page_token,
339            )
340        else:
341            raise ValueError("A partition needs to be provided in order to get request body json")
342
343    def should_be_synced(self, record: Record) -> bool:
344        return self._stream_cursor.should_be_synced(self._convert_record_to_cursor_record(record))
345
346    @staticmethod
347    def _convert_record_to_cursor_record(record: Record) -> Record:
348        return Record(
349            data=record.data,
350            stream_name=record.stream_name,
351            associated_slice=StreamSlice(
352                partition={}, cursor_slice=record.associated_slice.cursor_slice
353            )
354            if record.associated_slice
355            else None,
356        )

The GlobalSubstreamCursor is designed to track the state of substreams using a single global cursor. This class is beneficial for streams with many partitions, as it allows the state to be managed globally instead of per partition, simplifying state management and reducing the size of state messages.

This cursor is activated by setting the global_substream_cursor parameter for incremental sync.

Warnings:

  • This class enforces a minimal lookback window for substream based on the duration of the previous sync to avoid losing records. This lookback ensures that any records added or updated during the sync are captured in subsequent syncs.
  • The global cursor is updated only at the end of the sync. If the sync ends prematurely (e.g., due to an exception), the state will not be updated.
  • When using the incremental_dependency option, the sync will progress through parent records, preventing the sync from getting infinitely stuck. However, it is crucial to understand the requirements for both the global_substream_cursor and incremental_dependency options to avoid data loss.
GlobalSubstreamCursor( stream_cursor: DatetimeBasedCursor, partition_router: airbyte_cdk.sources.declarative.partition_routers.PartitionRouter)
 88    def __init__(self, stream_cursor: DatetimeBasedCursor, partition_router: PartitionRouter):
 89        self._stream_cursor = stream_cursor
 90        self._partition_router = partition_router
 91        self._timer = Timer()
 92        self._lock = threading.Lock()
 93        self._slice_semaphore = threading.Semaphore(
 94            0
 95        )  # Start with 0, indicating no slices being tracked
 96        self._all_slices_yielded = False
 97        self._lookback_window: Optional[int] = None
 98        self._current_partition: Optional[Mapping[str, Any]] = None
 99        self._last_slice: bool = False
100        self._parent_state: Optional[Mapping[str, Any]] = None
def start_slices_generation(self) -> None:
102    def start_slices_generation(self) -> None:
103        self._timer.start()
def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
105    def stream_slices(self) -> Iterable[StreamSlice]:
106        """
107        Generates stream slices, ensuring the last slice is properly flagged and processed.
108
109        This method creates a sequence of stream slices by iterating over partitions and cursor slices.
110        It holds onto one slice in memory to set `_all_slices_yielded` to `True` before yielding the
111        final slice. A semaphore is used to track the processing of slices, ensuring that `close_slice`
112        is called only after all slices have been processed.
113
114        We expect the following events:
115        * Yields all the slices except the last one. At this point, `close_slice` won't actually close the global slice as `self._all_slices_yielded == False`
116        * Release the semaphore one last time before setting `self._all_slices_yielded = True`. This will cause `close_slice` to know about all the slices before we indicate that all slices have been yielded so the left side of `if self._all_slices_yielded and self._slice_semaphore._value == 0` will be false if not everything is closed
117        * Setting `self._all_slices_yielded = True`. We do that before actually yielding the last slice as the caller of `stream_slices` might stop iterating at any point and hence the code after `yield` might not be executed
118        * Yield the last slice. At that point, once there are as many slices yielded as closes, the global slice will be closed too
119        """
120        slice_generator = (
121            StreamSlice(
122                partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields
123            )
124            for partition in self._partition_router.stream_slices()
125            for cursor_slice in self._stream_cursor.stream_slices()
126        )
127
128        self.start_slices_generation()
129        for slice, last, state in iterate_with_last_flag_and_state(
130            slice_generator, self._partition_router.get_stream_state
131        ):
132            self._parent_state = state
133            self.register_slice(last)
134            yield slice
135        self._parent_state = self._partition_router.get_stream_state()

Generates stream slices, ensuring the last slice is properly flagged and processed.

This method creates a sequence of stream slices by iterating over partitions and cursor slices. It holds onto one slice in memory to set _all_slices_yielded to True before yielding the final slice. A semaphore is used to track the processing of slices, ensuring that close_slice is called only after all slices have been processed.

We expect the following events:

  • Yields all the slices except the last one. At this point, close_slice won't actually close the global slice as self._all_slices_yielded == False
  • Release the semaphore one last time before setting self._all_slices_yielded = True. This will cause close_slice to know about all the slices before we indicate that all slices have been yielded so the left side of if self._all_slices_yielded and self._slice_semaphore._value == 0 will be false if not everything is closed
  • Setting self._all_slices_yielded = True. We do that before actually yielding the last slice as the caller of stream_slices might stop iterating at any point and hence the code after yield might not be executed
  • Yield the last slice. At that point, once there are as many slices yielded as closes, the global slice will be closed too
def generate_slices_from_partition( self, partition: airbyte_cdk.StreamSlice) -> Iterable[airbyte_cdk.StreamSlice]:
137    def generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[StreamSlice]:
138        slice_generator = (
139            StreamSlice(
140                partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields
141            )
142            for cursor_slice in self._stream_cursor.stream_slices()
143        )
144
145        yield from slice_generator
def register_slice(self, last: bool) -> None:
147    def register_slice(self, last: bool) -> None:
148        """
149        Tracks the processing of a stream slice.
150
151        Releases the semaphore for each slice. If it's the last slice (`last=True`),
152        sets `_all_slices_yielded` to `True` to indicate no more slices will be processed.
153
154        Args:
155            last (bool): True if the current slice is the last in the sequence.
156        """
157        self._slice_semaphore.release()
158        if last:
159            self._all_slices_yielded = True

Tracks the processing of a stream slice.

Releases the semaphore for each slice. If it's the last slice (last=True), sets _all_slices_yielded to True to indicate no more slices will be processed.

Arguments:
  • last (bool): True if the current slice is the last in the sequence.
def set_initial_state(self, stream_state: Mapping[str, Any]) -> None:
161    def set_initial_state(self, stream_state: StreamState) -> None:
162        """
163        Set the initial state for the cursors.
164
165        This method initializes the state for the global cursor using the provided stream state.
166
167        Additionally, it sets the parent state for partition routers that are based on parent streams. If a partition router
168        does not have parent streams, this step will be skipped due to the default PartitionRouter implementation.
169
170        Args:
171            stream_state (StreamState): The state of the streams to be set. The format of the stream state should be:
172                {
173                    "state": {
174                        "last_updated": "2023-05-27T00:00:00Z"
175                    },
176                    "parent_state": {
177                        "parent_stream_name": {
178                            "last_updated": "2023-05-27T00:00:00Z"
179                        }
180                    },
181                    "lookback_window": 132
182                }
183        """
184        if not stream_state:
185            return
186
187        if "lookback_window" in stream_state:
188            self._lookback_window = stream_state["lookback_window"]
189            self._inject_lookback_into_stream_cursor(stream_state["lookback_window"])
190
191        if "state" in stream_state:
192            self._stream_cursor.set_initial_state(stream_state["state"])
193        elif "states" not in stream_state:
194            # We assume that `stream_state` is in the old global format
195            # Example: {"global_state_format_key": "global_state_format_value"}
196            self._stream_cursor.set_initial_state(stream_state)
197
198        # We used to set the parent state through this method but since moving the SubstreamPartitionRouter to the
199        # Concurrent CDK/AbstractStream, the state is passed at the __init__ stage and this does not need to be called.
200        # We are still keeping this line as a comment to be explicit about the past behavior.
201        # self._partition_router.set_initial_state(stream_state)

Set the initial state for the cursors.

This method initializes the state for the global cursor using the provided stream state.

Additionally, it sets the parent state for partition routers that are based on parent streams. If a partition router does not have parent streams, this step will be skipped due to the default PartitionRouter implementation.

Arguments:
  • stream_state (StreamState): The state of the streams to be set. The format of the stream state should be: { "state": { "last_updated": "2023-05-27T00:00:00Z" }, "parent_state": { "parent_stream_name": { "last_updated": "2023-05-27T00:00:00Z" } }, "lookback_window": 132 }
def observe( self, stream_slice: airbyte_cdk.StreamSlice, record: airbyte_cdk.Record) -> None:
223    def observe(self, stream_slice: StreamSlice, record: Record) -> None:
224        self._stream_cursor.observe(
225            StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), record
226        )

Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read.

Parameters
  • stream_slice: The current slice, which may or may not contain the most recently observed record
  • record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the stream state may need to be deferred depending on whether the source reliably orders records by the cursor field.
def close_slice( self, stream_slice: airbyte_cdk.StreamSlice, *args: Any) -> None:
228    def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
229        """
230        Close the current stream slice.
231
232        This method is called when a stream slice is completed. For the global parent cursor, we close the child cursor
233        only after reading all slices. This ensures that we do not miss any child records from a later parent record
234        if the child cursor is earlier than a record from the first parent record.
235
236        Args:
237            stream_slice (StreamSlice): The stream slice to be closed.
238            *args (Any): Additional arguments.
239        """
240        with self._lock:
241            self._slice_semaphore.acquire()
242            if self._all_slices_yielded and self._slice_semaphore._value == 0:
243                self._lookback_window = self._timer.finish()
244                self._stream_cursor.close_slice(
245                    StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), *args
246                )

Close the current stream slice.

This method is called when a stream slice is completed. For the global parent cursor, we close the child cursor only after reading all slices. This ensures that we do not miss any child records from a later parent record if the child cursor is earlier than a record from the first parent record.

Arguments:
  • stream_slice (StreamSlice): The stream slice to be closed.
  • *args (Any): Additional arguments.
def get_stream_state(self) -> Mapping[str, Any]:
248    def get_stream_state(self) -> StreamState:
249        state: dict[str, Any] = {"state": self._stream_cursor.get_stream_state()}
250
251        if self._parent_state:
252            state["parent_state"] = self._parent_state
253
254        if self._lookback_window is not None:
255            state["lookback_window"] = self._lookback_window
256
257        return state

Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it is used for two things:

  • Interpolation of the requests
  • Transformation of records
  • Saving the state

For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that allows for emitting the state to the platform.

def select_state( self, stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Optional[Mapping[str, Any]]:
259    def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
260        # stream_slice is ignored as cursor is global
261        return self._stream_cursor.get_stream_state()

Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of a specific parent delineated by the incoming slice's partition object.

def get_request_params( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
263    def get_request_params(
264        self,
265        *,
266        stream_state: Optional[StreamState] = None,
267        stream_slice: Optional[StreamSlice] = None,
268        next_page_token: Optional[Mapping[str, Any]] = None,
269    ) -> Mapping[str, Any]:
270        if stream_slice:
271            return self._partition_router.get_request_params(  # type: ignore # this always returns a mapping
272                stream_state=stream_state,
273                stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
274                next_page_token=next_page_token,
275            ) | self._stream_cursor.get_request_params(
276                stream_state=stream_state,
277                stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
278                next_page_token=next_page_token,
279            )
280        else:
281            raise ValueError("A partition needs to be provided in order to get request params")

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def get_request_headers( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
283    def get_request_headers(
284        self,
285        *,
286        stream_state: Optional[StreamState] = None,
287        stream_slice: Optional[StreamSlice] = None,
288        next_page_token: Optional[Mapping[str, Any]] = None,
289    ) -> Mapping[str, Any]:
290        if stream_slice:
291            return self._partition_router.get_request_headers(  # type: ignore # this always returns a mapping
292                stream_state=stream_state,
293                stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
294                next_page_token=next_page_token,
295            ) | self._stream_cursor.get_request_headers(
296                stream_state=stream_state,
297                stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
298                next_page_token=next_page_token,
299            )
300        else:
301            raise ValueError("A partition needs to be provided in order to get request headers")

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def get_request_body_data( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Union[Mapping[str, Any], str]:
303    def get_request_body_data(
304        self,
305        *,
306        stream_state: Optional[StreamState] = None,
307        stream_slice: Optional[StreamSlice] = None,
308        next_page_token: Optional[Mapping[str, Any]] = None,
309    ) -> Union[Mapping[str, Any], str]:
310        if stream_slice:
311            return self._partition_router.get_request_body_data(  # type: ignore # this always returns a mapping
312                stream_state=stream_state,
313                stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
314                next_page_token=next_page_token,
315            ) | self._stream_cursor.get_request_body_data(
316                stream_state=stream_state,
317                stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
318                next_page_token=next_page_token,
319            )
320        else:
321            raise ValueError("A partition needs to be provided in order to get request body data")

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_request_body_json( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
323    def get_request_body_json(
324        self,
325        *,
326        stream_state: Optional[StreamState] = None,
327        stream_slice: Optional[StreamSlice] = None,
328        next_page_token: Optional[Mapping[str, Any]] = None,
329    ) -> Mapping[str, Any]:
330        if stream_slice:
331            return self._partition_router.get_request_body_json(  # type: ignore # this always returns a mapping
332                stream_state=stream_state,
333                stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
334                next_page_token=next_page_token,
335            ) | self._stream_cursor.get_request_body_json(
336                stream_state=stream_state,
337                stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
338                next_page_token=next_page_token,
339            )
340        else:
341            raise ValueError("A partition needs to be provided in order to get request body json")

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def should_be_synced(self, record: airbyte_cdk.Record) -> bool:
343    def should_be_synced(self, record: Record) -> bool:
344        return self._stream_cursor.should_be_synced(self._convert_record_to_cursor_record(record))

Evaluating if a record should be synced allows for filtering and stop condition on pagination

 28class PerPartitionCursor(DeclarativeCursor):
 29    """
 30    Manages state per partition when a stream has many partitions, to prevent data loss or duplication.
 31
 32    **Partition Limitation and Limit Reached Logic**
 33
 34    - **DEFAULT_MAX_PARTITIONS_NUMBER**: The maximum number of partitions to keep in memory (default is 10,000).
 35    - **_cursor_per_partition**: An ordered dictionary that stores cursors for each partition.
 36    - **_over_limit**: A counter that increments each time an oldest partition is removed when the limit is exceeded.
 37
 38    The class ensures that the number of partitions tracked does not exceed the `DEFAULT_MAX_PARTITIONS_NUMBER` to prevent excessive memory usage.
 39
 40    - When the number of partitions exceeds the limit, the oldest partitions are removed from `_cursor_per_partition`, and `_over_limit` is incremented accordingly.
 41    - The `limit_reached` method returns `True` when `_over_limit` exceeds `DEFAULT_MAX_PARTITIONS_NUMBER`, indicating that the global cursor should be used instead of per-partition cursors.
 42
 43    This approach avoids unnecessary switching to a global cursor due to temporary spikes in partition counts, ensuring that switching is only done when a sustained high number of partitions is observed.
 44    """
 45
 46    DEFAULT_MAX_PARTITIONS_NUMBER = 10000
 47    _NO_STATE: Mapping[str, Any] = {}
 48    _NO_CURSOR_STATE: Mapping[str, Any] = {}
 49    _KEY = 0
 50    _VALUE = 1
 51    _state_to_migrate_from: Mapping[str, Any] = {}
 52
 53    def __init__(self, cursor_factory: CursorFactory, partition_router: PartitionRouter):
 54        self._cursor_factory = cursor_factory
 55        self._partition_router = partition_router
 56        # The dict is ordered to ensure that once the maximum number of partitions is reached,
 57        # the oldest partitions can be efficiently removed, maintaining the most recent partitions.
 58        self._cursor_per_partition: OrderedDict[str, DeclarativeCursor] = OrderedDict()
 59        self._over_limit = 0
 60        self._partition_serializer = PerPartitionKeySerializer()
 61
 62    def stream_slices(self) -> Iterable[StreamSlice]:
 63        slices = self._partition_router.stream_slices()
 64        for partition in slices:
 65            yield from self.generate_slices_from_partition(partition)
 66
 67    def generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[StreamSlice]:
 68        # Ensure the maximum number of partitions is not exceeded
 69        self._ensure_partition_limit()
 70
 71        cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition))
 72        if not cursor:
 73            partition_state = (
 74                self._state_to_migrate_from
 75                if self._state_to_migrate_from
 76                else self._NO_CURSOR_STATE
 77            )
 78            cursor = self._create_cursor(partition_state)
 79            self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor
 80
 81        for cursor_slice in cursor.stream_slices():
 82            yield StreamSlice(
 83                partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields
 84            )
 85
 86    def _ensure_partition_limit(self) -> None:
 87        """
 88        Ensure the maximum number of partitions is not exceeded. If so, the oldest added partition will be dropped.
 89        """
 90        while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1:
 91            self._over_limit += 1
 92            oldest_partition = self._cursor_per_partition.popitem(last=False)[
 93                0
 94            ]  # Remove the oldest partition
 95            logger.warning(
 96                f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._over_limit}."
 97            )
 98
 99    def limit_reached(self) -> bool:
100        return self._over_limit > self.DEFAULT_MAX_PARTITIONS_NUMBER
101
102    def set_initial_state(self, stream_state: StreamState) -> None:
103        """
104        Set the initial state for the cursors.
105
106        This method initializes the state for each partition cursor using the provided stream state.
107        If a partition state is provided in the stream state, it will update the corresponding partition cursor with this state.
108
109        Additionally, it sets the parent state for partition routers that are based on parent streams. If a partition router
110        does not have parent streams, this step will be skipped due to the default PartitionRouter implementation.
111
112        Args:
113            stream_state (StreamState): The state of the streams to be set. The format of the stream state should be:
114                {
115                    "states": [
116                        {
117                            "partition": {
118                                "partition_key": "value"
119                            },
120                            "cursor": {
121                                "last_updated": "2023-05-27T00:00:00Z"
122                            }
123                        }
124                    ],
125                    "parent_state": {
126                        "parent_stream_name": {
127                            "last_updated": "2023-05-27T00:00:00Z"
128                        }
129                    }
130                }
131        """
132        if not stream_state:
133            return
134
135        if "states" not in stream_state:
136            # We assume that `stream_state` is in a global format that can be applied to all partitions.
137            # Example: {"global_state_format_key": "global_state_format_value"}
138            self._state_to_migrate_from = stream_state
139
140        else:
141            for state in stream_state["states"]:
142                self._cursor_per_partition[self._to_partition_key(state["partition"])] = (
143                    self._create_cursor(state["cursor"])
144                )
145
146            # set default state for missing partitions if it is per partition with fallback to global
147            if "state" in stream_state:
148                self._state_to_migrate_from = stream_state["state"]
149
150        # We used to set the parent state through this method but since moving the SubstreamPartitionRouter to the
151        # Concurrent CDK/AbstractStream, the state is passed at the __init__ stage and this does not need to be called.
152        # We are still keeping this line as a comment to be explicit about the past behavior.
153        # self._partition_router.set_initial_state(stream_state)
154
155    def observe(self, stream_slice: StreamSlice, record: Record) -> None:
156        self._cursor_per_partition[self._to_partition_key(stream_slice.partition)].observe(
157            StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), record
158        )
159
160    def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
161        try:
162            self._cursor_per_partition[self._to_partition_key(stream_slice.partition)].close_slice(
163                StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), *args
164            )
165        except KeyError as exception:
166            raise ValueError(
167                f"Partition {str(exception)} could not be found in current state based on the record. This is unexpected because "
168                f"we should only update state for partitions that were emitted during `stream_slices`"
169            )
170
171    def get_stream_state(self) -> StreamState:
172        states = []
173        for partition_tuple, cursor in self._cursor_per_partition.items():
174            cursor_state = cursor.get_stream_state()
175            if cursor_state:
176                states.append(
177                    {
178                        "partition": self._to_dict(partition_tuple),
179                        "cursor": cursor_state,
180                    }
181                )
182        state: dict[str, Any] = {"states": states}
183
184        parent_state = self._partition_router.get_stream_state()
185        if parent_state:
186            state["parent_state"] = parent_state
187        return state
188
189    def _get_state_for_partition(self, partition: Mapping[str, Any]) -> Optional[StreamState]:
190        cursor = self._cursor_per_partition.get(self._to_partition_key(partition))
191        if cursor:
192            return cursor.get_stream_state()
193
194        return None
195
196    @staticmethod
197    def _is_new_state(stream_state: Mapping[str, Any]) -> bool:
198        return not bool(stream_state)
199
200    def _to_partition_key(self, partition: Mapping[str, Any]) -> str:
201        return self._partition_serializer.to_partition_key(partition)
202
203    def _to_dict(self, partition_key: str) -> Mapping[str, Any]:
204        return self._partition_serializer.to_partition(partition_key)
205
206    def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
207        if not stream_slice:
208            raise ValueError("A partition needs to be provided in order to extract a state")
209
210        if not stream_slice:
211            return None
212
213        return self._get_state_for_partition(stream_slice.partition)
214
215    def _create_cursor(self, cursor_state: Any) -> DeclarativeCursor:
216        cursor = self._cursor_factory.create()
217        cursor.set_initial_state(cursor_state)
218        return cursor
219
220    def get_request_params(
221        self,
222        *,
223        stream_state: Optional[StreamState] = None,
224        stream_slice: Optional[StreamSlice] = None,
225        next_page_token: Optional[Mapping[str, Any]] = None,
226    ) -> Mapping[str, Any]:
227        if stream_slice:
228            if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition:
229                self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition))
230            return self._partition_router.get_request_params(  # type: ignore # this always returns a mapping
231                stream_state=stream_state,
232                stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
233                next_page_token=next_page_token,
234            ) | self._cursor_per_partition[
235                self._to_partition_key(stream_slice.partition)
236            ].get_request_params(
237                stream_state=stream_state,
238                stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
239                next_page_token=next_page_token,
240            )
241        else:
242            raise ValueError("A partition needs to be provided in order to get request params")
243
244    def get_request_headers(
245        self,
246        *,
247        stream_state: Optional[StreamState] = None,
248        stream_slice: Optional[StreamSlice] = None,
249        next_page_token: Optional[Mapping[str, Any]] = None,
250    ) -> Mapping[str, Any]:
251        if stream_slice:
252            if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition:
253                self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition))
254            return self._partition_router.get_request_headers(  # type: ignore # this always returns a mapping
255                stream_state=stream_state,
256                stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
257                next_page_token=next_page_token,
258            ) | self._cursor_per_partition[
259                self._to_partition_key(stream_slice.partition)
260            ].get_request_headers(
261                stream_state=stream_state,
262                stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
263                next_page_token=next_page_token,
264            )
265        else:
266            raise ValueError("A partition needs to be provided in order to get request headers")
267
268    def get_request_body_data(
269        self,
270        *,
271        stream_state: Optional[StreamState] = None,
272        stream_slice: Optional[StreamSlice] = None,
273        next_page_token: Optional[Mapping[str, Any]] = None,
274    ) -> Union[Mapping[str, Any], str]:
275        if stream_slice:
276            if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition:
277                self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition))
278            return self._partition_router.get_request_body_data(  # type: ignore # this always returns a mapping
279                stream_state=stream_state,
280                stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
281                next_page_token=next_page_token,
282            ) | self._cursor_per_partition[
283                self._to_partition_key(stream_slice.partition)
284            ].get_request_body_data(
285                stream_state=stream_state,
286                stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
287                next_page_token=next_page_token,
288            )
289        else:
290            raise ValueError("A partition needs to be provided in order to get request body data")
291
292    def get_request_body_json(
293        self,
294        *,
295        stream_state: Optional[StreamState] = None,
296        stream_slice: Optional[StreamSlice] = None,
297        next_page_token: Optional[Mapping[str, Any]] = None,
298    ) -> Mapping[str, Any]:
299        if stream_slice:
300            if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition:
301                self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition))
302            return self._partition_router.get_request_body_json(  # type: ignore # this always returns a mapping
303                stream_state=stream_state,
304                stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
305                next_page_token=next_page_token,
306            ) | self._cursor_per_partition[
307                self._to_partition_key(stream_slice.partition)
308            ].get_request_body_json(
309                stream_state=stream_state,
310                stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
311                next_page_token=next_page_token,
312            )
313        else:
314            raise ValueError("A partition needs to be provided in order to get request body json")
315
316    def should_be_synced(self, record: Record) -> bool:
317        return self._get_cursor(record).should_be_synced(
318            self._convert_record_to_cursor_record(record)
319        )
320
321    @staticmethod
322    def _convert_record_to_cursor_record(record: Record) -> Record:
323        return Record(
324            data=record.data,
325            stream_name=record.stream_name,
326            associated_slice=StreamSlice(
327                partition={}, cursor_slice=record.associated_slice.cursor_slice
328            )
329            if record.associated_slice
330            else None,
331        )
332
333    def _get_cursor(self, record: Record) -> DeclarativeCursor:
334        if not record.associated_slice:
335            raise ValueError(
336                "Invalid state as stream slices that are emitted should refer to an existing cursor"
337            )
338        partition_key = self._to_partition_key(record.associated_slice.partition)
339        if partition_key not in self._cursor_per_partition:
340            self._create_cursor_for_partition(partition_key)
341        cursor = self._cursor_per_partition[partition_key]
342        return cursor
343
344    def _create_cursor_for_partition(self, partition_key: str) -> None:
345        """
346        Dynamically creates and initializes a cursor for the specified partition.
347
348        This method is required for `ConcurrentPerPartitionCursor`. For concurrent cursors,
349        stream_slices is executed only for the concurrent cursor, so cursors per partition
350        are not created for the declarative cursor. This method ensures that a cursor is available
351        to create requests for the specified partition. The cursor is initialized
352        with the per-partition state if present in the initial state, or with the global state
353        adjusted by the lookback window, or with the state to migrate from.
354
355        Note:
356            This is a temporary workaround and should be removed once the declarative cursor
357            is decoupled from the concurrent cursor implementation.
358
359        Args:
360            partition_key (str): The unique identifier for the partition for which the cursor
361            needs to be created.
362        """
363        partition_state = (
364            self._state_to_migrate_from if self._state_to_migrate_from else self._NO_CURSOR_STATE
365        )
366        cursor = self._create_cursor(partition_state)
367
368        self._cursor_per_partition[partition_key] = cursor

Manages state per partition when a stream has many partitions, to prevent data loss or duplication.

Partition Limitation and Limit Reached Logic

  • DEFAULT_MAX_PARTITIONS_NUMBER: The maximum number of partitions to keep in memory (default is 10,000).
  • _cursor_per_partition: An ordered dictionary that stores cursors for each partition.
  • _over_limit: A counter that increments each time an oldest partition is removed when the limit is exceeded.

The class ensures that the number of partitions tracked does not exceed the DEFAULT_MAX_PARTITIONS_NUMBER to prevent excessive memory usage.

  • When the number of partitions exceeds the limit, the oldest partitions are removed from _cursor_per_partition, and _over_limit is incremented accordingly.
  • The limit_reached method returns True when _over_limit exceeds DEFAULT_MAX_PARTITIONS_NUMBER, indicating that the global cursor should be used instead of per-partition cursors.

This approach avoids unnecessary switching to a global cursor due to temporary spikes in partition counts, ensuring that switching is only done when a sustained high number of partitions is observed.

PerPartitionCursor( cursor_factory: CursorFactory, partition_router: airbyte_cdk.sources.declarative.partition_routers.PartitionRouter)
53    def __init__(self, cursor_factory: CursorFactory, partition_router: PartitionRouter):
54        self._cursor_factory = cursor_factory
55        self._partition_router = partition_router
56        # The dict is ordered to ensure that once the maximum number of partitions is reached,
57        # the oldest partitions can be efficiently removed, maintaining the most recent partitions.
58        self._cursor_per_partition: OrderedDict[str, DeclarativeCursor] = OrderedDict()
59        self._over_limit = 0
60        self._partition_serializer = PerPartitionKeySerializer()
DEFAULT_MAX_PARTITIONS_NUMBER = 10000
def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
62    def stream_slices(self) -> Iterable[StreamSlice]:
63        slices = self._partition_router.stream_slices()
64        for partition in slices:
65            yield from self.generate_slices_from_partition(partition)

Defines stream slices

Returns

An iterable of stream slices

def generate_slices_from_partition( self, partition: airbyte_cdk.StreamSlice) -> Iterable[airbyte_cdk.StreamSlice]:
67    def generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[StreamSlice]:
68        # Ensure the maximum number of partitions is not exceeded
69        self._ensure_partition_limit()
70
71        cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition))
72        if not cursor:
73            partition_state = (
74                self._state_to_migrate_from
75                if self._state_to_migrate_from
76                else self._NO_CURSOR_STATE
77            )
78            cursor = self._create_cursor(partition_state)
79            self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor
80
81        for cursor_slice in cursor.stream_slices():
82            yield StreamSlice(
83                partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields
84            )
def limit_reached(self) -> bool:
 99    def limit_reached(self) -> bool:
100        return self._over_limit > self.DEFAULT_MAX_PARTITIONS_NUMBER
def set_initial_state(self, stream_state: Mapping[str, Any]) -> None:
102    def set_initial_state(self, stream_state: StreamState) -> None:
103        """
104        Set the initial state for the cursors.
105
106        This method initializes the state for each partition cursor using the provided stream state.
107        If a partition state is provided in the stream state, it will update the corresponding partition cursor with this state.
108
109        Additionally, it sets the parent state for partition routers that are based on parent streams. If a partition router
110        does not have parent streams, this step will be skipped due to the default PartitionRouter implementation.
111
112        Args:
113            stream_state (StreamState): The state of the streams to be set. The format of the stream state should be:
114                {
115                    "states": [
116                        {
117                            "partition": {
118                                "partition_key": "value"
119                            },
120                            "cursor": {
121                                "last_updated": "2023-05-27T00:00:00Z"
122                            }
123                        }
124                    ],
125                    "parent_state": {
126                        "parent_stream_name": {
127                            "last_updated": "2023-05-27T00:00:00Z"
128                        }
129                    }
130                }
131        """
132        if not stream_state:
133            return
134
135        if "states" not in stream_state:
136            # We assume that `stream_state` is in a global format that can be applied to all partitions.
137            # Example: {"global_state_format_key": "global_state_format_value"}
138            self._state_to_migrate_from = stream_state
139
140        else:
141            for state in stream_state["states"]:
142                self._cursor_per_partition[self._to_partition_key(state["partition"])] = (
143                    self._create_cursor(state["cursor"])
144                )
145
146            # set default state for missing partitions if it is per partition with fallback to global
147            if "state" in stream_state:
148                self._state_to_migrate_from = stream_state["state"]
149
150        # We used to set the parent state through this method but since moving the SubstreamPartitionRouter to the
151        # Concurrent CDK/AbstractStream, the state is passed at the __init__ stage and this does not need to be called.
152        # We are still keeping this line as a comment to be explicit about the past behavior.
153        # self._partition_router.set_initial_state(stream_state)

Set the initial state for the cursors.

This method initializes the state for each partition cursor using the provided stream state. If a partition state is provided in the stream state, it will update the corresponding partition cursor with this state.

Additionally, it sets the parent state for partition routers that are based on parent streams. If a partition router does not have parent streams, this step will be skipped due to the default PartitionRouter implementation.

Arguments:
  • stream_state (StreamState): The state of the streams to be set. The format of the stream state should be: { "states": [ { "partition": { "partition_key": "value" }, "cursor": { "last_updated": "2023-05-27T00:00:00Z" } } ], "parent_state": { "parent_stream_name": { "last_updated": "2023-05-27T00:00:00Z" } } }
def observe( self, stream_slice: airbyte_cdk.StreamSlice, record: airbyte_cdk.Record) -> None:
155    def observe(self, stream_slice: StreamSlice, record: Record) -> None:
156        self._cursor_per_partition[self._to_partition_key(stream_slice.partition)].observe(
157            StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), record
158        )

Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read.

Parameters
  • stream_slice: The current slice, which may or may not contain the most recently observed record
  • record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the stream state may need to be deferred depending on whether the source reliably orders records by the cursor field.
def close_slice( self, stream_slice: airbyte_cdk.StreamSlice, *args: Any) -> None:
160    def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
161        try:
162            self._cursor_per_partition[self._to_partition_key(stream_slice.partition)].close_slice(
163                StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), *args
164            )
165        except KeyError as exception:
166            raise ValueError(
167                f"Partition {str(exception)} could not be found in current state based on the record. This is unexpected because "
168                f"we should only update state for partitions that were emitted during `stream_slices`"
169            )

Update state based on the stream slice. Note that stream_slice.cursor_slice and most_recent_record.associated_slice are expected to be the same but we make it explicit here that stream_slice should be leveraged to update the state. We do not pass in the latest record, since cursor instances should maintain the relevant internal state on their own.

Parameters
  • stream_slice: slice to close
def get_stream_state(self) -> Mapping[str, Any]:
171    def get_stream_state(self) -> StreamState:
172        states = []
173        for partition_tuple, cursor in self._cursor_per_partition.items():
174            cursor_state = cursor.get_stream_state()
175            if cursor_state:
176                states.append(
177                    {
178                        "partition": self._to_dict(partition_tuple),
179                        "cursor": cursor_state,
180                    }
181                )
182        state: dict[str, Any] = {"states": states}
183
184        parent_state = self._partition_router.get_stream_state()
185        if parent_state:
186            state["parent_state"] = parent_state
187        return state

Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it is used for two things:

  • Interpolation of the requests
  • Transformation of records
  • Saving the state

For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that allows for emitting the state to the platform.

def select_state( self, stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Optional[Mapping[str, Any]]:
206    def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
207        if not stream_slice:
208            raise ValueError("A partition needs to be provided in order to extract a state")
209
210        if not stream_slice:
211            return None
212
213        return self._get_state_for_partition(stream_slice.partition)

Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of a specific parent delineated by the incoming slice's partition object.

def get_request_params( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
220    def get_request_params(
221        self,
222        *,
223        stream_state: Optional[StreamState] = None,
224        stream_slice: Optional[StreamSlice] = None,
225        next_page_token: Optional[Mapping[str, Any]] = None,
226    ) -> Mapping[str, Any]:
227        if stream_slice:
228            if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition:
229                self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition))
230            return self._partition_router.get_request_params(  # type: ignore # this always returns a mapping
231                stream_state=stream_state,
232                stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
233                next_page_token=next_page_token,
234            ) | self._cursor_per_partition[
235                self._to_partition_key(stream_slice.partition)
236            ].get_request_params(
237                stream_state=stream_state,
238                stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
239                next_page_token=next_page_token,
240            )
241        else:
242            raise ValueError("A partition needs to be provided in order to get request params")

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def get_request_headers( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
244    def get_request_headers(
245        self,
246        *,
247        stream_state: Optional[StreamState] = None,
248        stream_slice: Optional[StreamSlice] = None,
249        next_page_token: Optional[Mapping[str, Any]] = None,
250    ) -> Mapping[str, Any]:
251        if stream_slice:
252            if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition:
253                self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition))
254            return self._partition_router.get_request_headers(  # type: ignore # this always returns a mapping
255                stream_state=stream_state,
256                stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
257                next_page_token=next_page_token,
258            ) | self._cursor_per_partition[
259                self._to_partition_key(stream_slice.partition)
260            ].get_request_headers(
261                stream_state=stream_state,
262                stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
263                next_page_token=next_page_token,
264            )
265        else:
266            raise ValueError("A partition needs to be provided in order to get request headers")

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def get_request_body_data( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Union[Mapping[str, Any], str]:
268    def get_request_body_data(
269        self,
270        *,
271        stream_state: Optional[StreamState] = None,
272        stream_slice: Optional[StreamSlice] = None,
273        next_page_token: Optional[Mapping[str, Any]] = None,
274    ) -> Union[Mapping[str, Any], str]:
275        if stream_slice:
276            if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition:
277                self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition))
278            return self._partition_router.get_request_body_data(  # type: ignore # this always returns a mapping
279                stream_state=stream_state,
280                stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
281                next_page_token=next_page_token,
282            ) | self._cursor_per_partition[
283                self._to_partition_key(stream_slice.partition)
284            ].get_request_body_data(
285                stream_state=stream_state,
286                stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
287                next_page_token=next_page_token,
288            )
289        else:
290            raise ValueError("A partition needs to be provided in order to get request body data")

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_request_body_json( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
292    def get_request_body_json(
293        self,
294        *,
295        stream_state: Optional[StreamState] = None,
296        stream_slice: Optional[StreamSlice] = None,
297        next_page_token: Optional[Mapping[str, Any]] = None,
298    ) -> Mapping[str, Any]:
299        if stream_slice:
300            if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition:
301                self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition))
302            return self._partition_router.get_request_body_json(  # type: ignore # this always returns a mapping
303                stream_state=stream_state,
304                stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
305                next_page_token=next_page_token,
306            ) | self._cursor_per_partition[
307                self._to_partition_key(stream_slice.partition)
308            ].get_request_body_json(
309                stream_state=stream_state,
310                stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
311                next_page_token=next_page_token,
312            )
313        else:
314            raise ValueError("A partition needs to be provided in order to get request body json")

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def should_be_synced(self, record: airbyte_cdk.Record) -> bool:
316    def should_be_synced(self, record: Record) -> bool:
317        return self._get_cursor(record).should_be_synced(
318            self._convert_record_to_cursor_record(record)
319        )

Evaluating if a record should be synced allows for filtering and stop condition on pagination

class PerPartitionWithGlobalCursor(airbyte_cdk.legacy.sources.declarative.incremental.DeclarativeCursor):
 23class PerPartitionWithGlobalCursor(DeclarativeCursor):
 24    """
 25    Manages state for streams with multiple partitions, with an optional fallback to a global cursor when specific conditions are met.
 26
 27    This cursor handles partitioned streams by maintaining individual state per partition using `PerPartitionCursor`. If the number of partitions exceeds a defined limit, it switches to a global cursor (`GlobalSubstreamCursor`) to manage state more efficiently.
 28
 29    **Overview**
 30
 31    - **Partition-Based State**: Initially manages state per partition to ensure accurate processing of each partition's data.
 32    - **Global Fallback**: Switches to a global cursor when the partition limit is exceeded to handle state management more effectively.
 33
 34    **Switching Logic**
 35
 36    - Monitors the number of partitions.
 37    - If `PerPartitionCursor.limit_reached()` returns `True`, sets `_use_global_cursor` to `True`, activating the global cursor.
 38
 39    **Active Cursor Selection**
 40
 41    - Uses the `_get_active_cursor()` helper method to select the active cursor based on the `_use_global_cursor` flag.
 42    - This simplifies the logic and ensures consistent cursor usage across methods.
 43
 44    **State Structure Example**
 45
 46    ```json
 47    {
 48        "states": [
 49            {
 50                "partition": {"partition_key": "partition_1"},
 51                "cursor": {"cursor_field": "2021-01-15"}
 52            },
 53            {
 54                "partition": {"partition_key": "partition_2"},
 55                "cursor": {"cursor_field": "2021-02-14"}
 56            }
 57        ],
 58        "state": {
 59            "cursor_field": "2021-02-15"
 60        },
 61        "use_global_cursor": false
 62    }
 63    ```
 64
 65    In this example, the cursor is using partition-based state management (`"use_global_cursor": false`), maintaining separate cursor states for each partition.
 66
 67    **Usage Scenario**
 68
 69    Suitable for streams where the number of partitions may vary significantly, requiring dynamic switching between per-partition and global state management to ensure data consistency and efficient synchronization.
 70    """
 71
 72    def __init__(
 73        self,
 74        cursor_factory: CursorFactory,
 75        partition_router: PartitionRouter,
 76        stream_cursor: DatetimeBasedCursor,
 77    ):
 78        self._partition_router = partition_router
 79        self._per_partition_cursor = PerPartitionCursor(cursor_factory, partition_router)
 80        self._global_cursor = GlobalSubstreamCursor(stream_cursor, partition_router)
 81        self._use_global_cursor = False
 82        self._current_partition: Optional[Mapping[str, Any]] = None
 83        self._last_slice: bool = False
 84        self._parent_state: Optional[Mapping[str, Any]] = None
 85
 86    def _get_active_cursor(self) -> Union[PerPartitionCursor, GlobalSubstreamCursor]:
 87        return self._global_cursor if self._use_global_cursor else self._per_partition_cursor
 88
 89    def stream_slices(self) -> Iterable[StreamSlice]:
 90        self._global_cursor.start_slices_generation()
 91
 92        # Iterate through partitions and process slices
 93        for partition, is_last_partition, parent_state in iterate_with_last_flag_and_state(
 94            self._partition_router.stream_slices(), self._partition_router.get_stream_state
 95        ):
 96            # Generate slices for the current cursor and handle the last slice using the flag
 97            self._parent_state = parent_state
 98            for slice, is_last_slice, _ in iterate_with_last_flag_and_state(
 99                self._get_active_cursor().generate_slices_from_partition(partition=partition),
100                lambda: None,
101            ):
102                self._global_cursor.register_slice(is_last_slice and is_last_partition)
103                yield slice
104        self._parent_state = self._partition_router.get_stream_state()
105
106    def set_initial_state(self, stream_state: StreamState) -> None:
107        """
108        Set the initial state for the cursors.
109        """
110        self._use_global_cursor = stream_state.get("use_global_cursor", False)
111
112        self._parent_state = stream_state.get("parent_state", {})
113
114        self._global_cursor.set_initial_state(stream_state)
115        if not self._use_global_cursor:
116            self._per_partition_cursor.set_initial_state(stream_state)
117
118    def observe(self, stream_slice: StreamSlice, record: Record) -> None:
119        if not self._use_global_cursor and self._per_partition_cursor.limit_reached():
120            self._use_global_cursor = True
121
122        if not self._use_global_cursor:
123            self._per_partition_cursor.observe(stream_slice, record)
124        self._global_cursor.observe(stream_slice, record)
125
126    def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
127        if not self._use_global_cursor:
128            self._per_partition_cursor.close_slice(stream_slice, *args)
129        self._global_cursor.close_slice(stream_slice, *args)
130
131    def get_stream_state(self) -> StreamState:
132        final_state: MutableMapping[str, Any] = {"use_global_cursor": self._use_global_cursor}
133
134        final_state.update(self._global_cursor.get_stream_state())
135        if not self._use_global_cursor:
136            final_state.update(self._per_partition_cursor.get_stream_state())
137
138        final_state["parent_state"] = self._parent_state
139        if not final_state.get("parent_state"):
140            del final_state["parent_state"]
141
142        return final_state
143
144    def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
145        return self._get_active_cursor().select_state(stream_slice)
146
147    def get_request_params(
148        self,
149        *,
150        stream_state: Optional[StreamState] = None,
151        stream_slice: Optional[StreamSlice] = None,
152        next_page_token: Optional[Mapping[str, Any]] = None,
153    ) -> Mapping[str, Any]:
154        return self._get_active_cursor().get_request_params(
155            stream_state=stream_state,
156            stream_slice=stream_slice,
157            next_page_token=next_page_token,
158        )
159
160    def get_request_headers(
161        self,
162        *,
163        stream_state: Optional[StreamState] = None,
164        stream_slice: Optional[StreamSlice] = None,
165        next_page_token: Optional[Mapping[str, Any]] = None,
166    ) -> Mapping[str, Any]:
167        return self._get_active_cursor().get_request_headers(
168            stream_state=stream_state,
169            stream_slice=stream_slice,
170            next_page_token=next_page_token,
171        )
172
173    def get_request_body_data(
174        self,
175        *,
176        stream_state: Optional[StreamState] = None,
177        stream_slice: Optional[StreamSlice] = None,
178        next_page_token: Optional[Mapping[str, Any]] = None,
179    ) -> Union[Mapping[str, Any], str]:
180        return self._get_active_cursor().get_request_body_data(
181            stream_state=stream_state,
182            stream_slice=stream_slice,
183            next_page_token=next_page_token,
184        )
185
186    def get_request_body_json(
187        self,
188        *,
189        stream_state: Optional[StreamState] = None,
190        stream_slice: Optional[StreamSlice] = None,
191        next_page_token: Optional[Mapping[str, Any]] = None,
192    ) -> Mapping[str, Any]:
193        return self._get_active_cursor().get_request_body_json(
194            stream_state=stream_state,
195            stream_slice=stream_slice,
196            next_page_token=next_page_token,
197        )
198
199    def should_be_synced(self, record: Record) -> bool:
200        return self._get_active_cursor().should_be_synced(record)

Manages state for streams with multiple partitions, with an optional fallback to a global cursor when specific conditions are met.

This cursor handles partitioned streams by maintaining individual state per partition using PerPartitionCursor. If the number of partitions exceeds a defined limit, it switches to a global cursor (GlobalSubstreamCursor) to manage state more efficiently.

Overview

  • Partition-Based State: Initially manages state per partition to ensure accurate processing of each partition's data.
  • Global Fallback: Switches to a global cursor when the partition limit is exceeded to handle state management more effectively.

Switching Logic

Active Cursor Selection

  • Uses the _get_active_cursor() helper method to select the active cursor based on the _use_global_cursor flag.
  • This simplifies the logic and ensures consistent cursor usage across methods.

State Structure Example

{
    "states": [
        {
            "partition": {"partition_key": "partition_1"},
            "cursor": {"cursor_field": "2021-01-15"}
        },
        {
            "partition": {"partition_key": "partition_2"},
            "cursor": {"cursor_field": "2021-02-14"}
        }
    ],
    "state": {
        "cursor_field": "2021-02-15"
    },
    "use_global_cursor": false
}

In this example, the cursor is using partition-based state management ("use_global_cursor": false), maintaining separate cursor states for each partition.

Usage Scenario

Suitable for streams where the number of partitions may vary significantly, requiring dynamic switching between per-partition and global state management to ensure data consistency and efficient synchronization.

PerPartitionWithGlobalCursor( cursor_factory: CursorFactory, partition_router: airbyte_cdk.sources.declarative.partition_routers.PartitionRouter, stream_cursor: DatetimeBasedCursor)
72    def __init__(
73        self,
74        cursor_factory: CursorFactory,
75        partition_router: PartitionRouter,
76        stream_cursor: DatetimeBasedCursor,
77    ):
78        self._partition_router = partition_router
79        self._per_partition_cursor = PerPartitionCursor(cursor_factory, partition_router)
80        self._global_cursor = GlobalSubstreamCursor(stream_cursor, partition_router)
81        self._use_global_cursor = False
82        self._current_partition: Optional[Mapping[str, Any]] = None
83        self._last_slice: bool = False
84        self._parent_state: Optional[Mapping[str, Any]] = None
def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
 89    def stream_slices(self) -> Iterable[StreamSlice]:
 90        self._global_cursor.start_slices_generation()
 91
 92        # Iterate through partitions and process slices
 93        for partition, is_last_partition, parent_state in iterate_with_last_flag_and_state(
 94            self._partition_router.stream_slices(), self._partition_router.get_stream_state
 95        ):
 96            # Generate slices for the current cursor and handle the last slice using the flag
 97            self._parent_state = parent_state
 98            for slice, is_last_slice, _ in iterate_with_last_flag_and_state(
 99                self._get_active_cursor().generate_slices_from_partition(partition=partition),
100                lambda: None,
101            ):
102                self._global_cursor.register_slice(is_last_slice and is_last_partition)
103                yield slice
104        self._parent_state = self._partition_router.get_stream_state()

Defines stream slices

Returns

An iterable of stream slices

def set_initial_state(self, stream_state: Mapping[str, Any]) -> None:
106    def set_initial_state(self, stream_state: StreamState) -> None:
107        """
108        Set the initial state for the cursors.
109        """
110        self._use_global_cursor = stream_state.get("use_global_cursor", False)
111
112        self._parent_state = stream_state.get("parent_state", {})
113
114        self._global_cursor.set_initial_state(stream_state)
115        if not self._use_global_cursor:
116            self._per_partition_cursor.set_initial_state(stream_state)

Set the initial state for the cursors.

def observe( self, stream_slice: airbyte_cdk.StreamSlice, record: airbyte_cdk.Record) -> None:
118    def observe(self, stream_slice: StreamSlice, record: Record) -> None:
119        if not self._use_global_cursor and self._per_partition_cursor.limit_reached():
120            self._use_global_cursor = True
121
122        if not self._use_global_cursor:
123            self._per_partition_cursor.observe(stream_slice, record)
124        self._global_cursor.observe(stream_slice, record)

Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read.

Parameters
  • stream_slice: The current slice, which may or may not contain the most recently observed record
  • record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the stream state may need to be deferred depending on whether the source reliably orders records by the cursor field.
def close_slice( self, stream_slice: airbyte_cdk.StreamSlice, *args: Any) -> None:
126    def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
127        if not self._use_global_cursor:
128            self._per_partition_cursor.close_slice(stream_slice, *args)
129        self._global_cursor.close_slice(stream_slice, *args)

Update state based on the stream slice. Note that stream_slice.cursor_slice and most_recent_record.associated_slice are expected to be the same but we make it explicit here that stream_slice should be leveraged to update the state. We do not pass in the latest record, since cursor instances should maintain the relevant internal state on their own.

Parameters
  • stream_slice: slice to close
def get_stream_state(self) -> Mapping[str, Any]:
131    def get_stream_state(self) -> StreamState:
132        final_state: MutableMapping[str, Any] = {"use_global_cursor": self._use_global_cursor}
133
134        final_state.update(self._global_cursor.get_stream_state())
135        if not self._use_global_cursor:
136            final_state.update(self._per_partition_cursor.get_stream_state())
137
138        final_state["parent_state"] = self._parent_state
139        if not final_state.get("parent_state"):
140            del final_state["parent_state"]
141
142        return final_state

Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it is used for two things:

  • Interpolation of the requests
  • Transformation of records
  • Saving the state

For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that allows for emitting the state to the platform.

def select_state( self, stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Optional[Mapping[str, Any]]:
144    def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
145        return self._get_active_cursor().select_state(stream_slice)

Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of a specific parent delineated by the incoming slice's partition object.

def get_request_params( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
147    def get_request_params(
148        self,
149        *,
150        stream_state: Optional[StreamState] = None,
151        stream_slice: Optional[StreamSlice] = None,
152        next_page_token: Optional[Mapping[str, Any]] = None,
153    ) -> Mapping[str, Any]:
154        return self._get_active_cursor().get_request_params(
155            stream_state=stream_state,
156            stream_slice=stream_slice,
157            next_page_token=next_page_token,
158        )

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def get_request_headers( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
160    def get_request_headers(
161        self,
162        *,
163        stream_state: Optional[StreamState] = None,
164        stream_slice: Optional[StreamSlice] = None,
165        next_page_token: Optional[Mapping[str, Any]] = None,
166    ) -> Mapping[str, Any]:
167        return self._get_active_cursor().get_request_headers(
168            stream_state=stream_state,
169            stream_slice=stream_slice,
170            next_page_token=next_page_token,
171        )

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def get_request_body_data( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Union[Mapping[str, Any], str]:
173    def get_request_body_data(
174        self,
175        *,
176        stream_state: Optional[StreamState] = None,
177        stream_slice: Optional[StreamSlice] = None,
178        next_page_token: Optional[Mapping[str, Any]] = None,
179    ) -> Union[Mapping[str, Any], str]:
180        return self._get_active_cursor().get_request_body_data(
181            stream_state=stream_state,
182            stream_slice=stream_slice,
183            next_page_token=next_page_token,
184        )

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_request_body_json( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
186    def get_request_body_json(
187        self,
188        *,
189        stream_state: Optional[StreamState] = None,
190        stream_slice: Optional[StreamSlice] = None,
191        next_page_token: Optional[Mapping[str, Any]] = None,
192    ) -> Mapping[str, Any]:
193        return self._get_active_cursor().get_request_body_json(
194            stream_state=stream_state,
195            stream_slice=stream_slice,
196            next_page_token=next_page_token,
197        )

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def should_be_synced(self, record: airbyte_cdk.Record) -> bool:
199    def should_be_synced(self, record: Record) -> bool:
200        return self._get_active_cursor().should_be_synced(record)

Evaluating if a record should be synced allows for filtering and stop condition on pagination

@dataclass
class ResumableFullRefreshCursor(airbyte_cdk.legacy.sources.declarative.incremental.DeclarativeCursor):
12@dataclass
13class ResumableFullRefreshCursor(DeclarativeCursor):
14    parameters: InitVar[Mapping[str, Any]]
15
16    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
17        self._cursor: StreamState = {}
18
19    def get_stream_state(self) -> StreamState:
20        return self._cursor
21
22    def set_initial_state(self, stream_state: StreamState) -> None:
23        self._cursor = stream_state
24
25    def observe(self, stream_slice: StreamSlice, record: Record) -> None:
26        """
27        Resumable full refresh manages state using a page number so it does not need to update state by observing incoming records.
28        """
29        pass
30
31    def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
32        # The ResumableFullRefreshCursor doesn't support nested streams yet so receiving a partition is unexpected
33        if stream_slice.partition:
34            raise ValueError(
35                f"Stream slice {stream_slice} should not have a partition. Got {stream_slice.partition}."
36            )
37        self._cursor = stream_slice.cursor_slice
38
39    def should_be_synced(self, record: Record) -> bool:
40        """
41        Unlike date-based cursors which filter out records outside slice boundaries, resumable full refresh records exist within pages
42        that don't have filterable bounds. We should always return them.
43        """
44        return True
45
46    def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
47        # A top-level RFR cursor only manages the state of a single partition
48        return self._cursor
49
50    def stream_slices(self) -> Iterable[StreamSlice]:
51        """
52        Resumable full refresh cursors only return a single slice and can't perform partitioning because iteration is done per-page
53        along an unbounded set.
54        """
55        yield from [StreamSlice(cursor_slice=self._cursor, partition={})]
56
57    # This is an interesting pattern that might not seem obvious at first glance. This cursor itself has no functional need to
58    # inject any request values into the outbound response because the up-to-date pagination state is already loaded and
59    # maintained by the paginator component
60    def get_request_params(
61        self,
62        *,
63        stream_state: Optional[StreamState] = None,
64        stream_slice: Optional[StreamSlice] = None,
65        next_page_token: Optional[Mapping[str, Any]] = None,
66    ) -> Mapping[str, Any]:
67        return {}
68
69    def get_request_headers(
70        self,
71        *,
72        stream_state: Optional[StreamState] = None,
73        stream_slice: Optional[StreamSlice] = None,
74        next_page_token: Optional[Mapping[str, Any]] = None,
75    ) -> Mapping[str, Any]:
76        return {}
77
78    def get_request_body_data(
79        self,
80        *,
81        stream_state: Optional[StreamState] = None,
82        stream_slice: Optional[StreamSlice] = None,
83        next_page_token: Optional[Mapping[str, Any]] = None,
84    ) -> Mapping[str, Any]:
85        return {}
86
87    def get_request_body_json(
88        self,
89        *,
90        stream_state: Optional[StreamState] = None,
91        stream_slice: Optional[StreamSlice] = None,
92        next_page_token: Optional[Mapping[str, Any]] = None,
93    ) -> Mapping[str, Any]:
94        return {}
ResumableFullRefreshCursor(parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def get_stream_state(self) -> Mapping[str, Any]:
19    def get_stream_state(self) -> StreamState:
20        return self._cursor

Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it is used for two things:

  • Interpolation of the requests
  • Transformation of records
  • Saving the state

For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that allows for emitting the state to the platform.

def set_initial_state(self, stream_state: Mapping[str, Any]) -> None:
22    def set_initial_state(self, stream_state: StreamState) -> None:
23        self._cursor = stream_state

Cursors are not initialized with their state. As state is needed in order to function properly, this method should be called before calling anything else

Parameters
  • stream_state: The state of the stream as returned by get_stream_state
def observe( self, stream_slice: airbyte_cdk.StreamSlice, record: airbyte_cdk.Record) -> None:
25    def observe(self, stream_slice: StreamSlice, record: Record) -> None:
26        """
27        Resumable full refresh manages state using a page number so it does not need to update state by observing incoming records.
28        """
29        pass

Resumable full refresh manages state using a page number so it does not need to update state by observing incoming records.

def close_slice( self, stream_slice: airbyte_cdk.StreamSlice, *args: Any) -> None:
31    def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
32        # The ResumableFullRefreshCursor doesn't support nested streams yet so receiving a partition is unexpected
33        if stream_slice.partition:
34            raise ValueError(
35                f"Stream slice {stream_slice} should not have a partition. Got {stream_slice.partition}."
36            )
37        self._cursor = stream_slice.cursor_slice

Update state based on the stream slice. Note that stream_slice.cursor_slice and most_recent_record.associated_slice are expected to be the same but we make it explicit here that stream_slice should be leveraged to update the state. We do not pass in the latest record, since cursor instances should maintain the relevant internal state on their own.

Parameters
  • stream_slice: slice to close
def should_be_synced(self, record: airbyte_cdk.Record) -> bool:
39    def should_be_synced(self, record: Record) -> bool:
40        """
41        Unlike date-based cursors which filter out records outside slice boundaries, resumable full refresh records exist within pages
42        that don't have filterable bounds. We should always return them.
43        """
44        return True

Unlike date-based cursors which filter out records outside slice boundaries, resumable full refresh records exist within pages that don't have filterable bounds. We should always return them.

def select_state( self, stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Optional[Mapping[str, Any]]:
46    def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
47        # A top-level RFR cursor only manages the state of a single partition
48        return self._cursor

Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of a specific parent delineated by the incoming slice's partition object.

def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
50    def stream_slices(self) -> Iterable[StreamSlice]:
51        """
52        Resumable full refresh cursors only return a single slice and can't perform partitioning because iteration is done per-page
53        along an unbounded set.
54        """
55        yield from [StreamSlice(cursor_slice=self._cursor, partition={})]

Resumable full refresh cursors only return a single slice and can't perform partitioning because iteration is done per-page along an unbounded set.

def get_request_params( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
60    def get_request_params(
61        self,
62        *,
63        stream_state: Optional[StreamState] = None,
64        stream_slice: Optional[StreamSlice] = None,
65        next_page_token: Optional[Mapping[str, Any]] = None,
66    ) -> Mapping[str, Any]:
67        return {}

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def get_request_headers( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
69    def get_request_headers(
70        self,
71        *,
72        stream_state: Optional[StreamState] = None,
73        stream_slice: Optional[StreamSlice] = None,
74        next_page_token: Optional[Mapping[str, Any]] = None,
75    ) -> Mapping[str, Any]:
76        return {}

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def get_request_body_data( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
78    def get_request_body_data(
79        self,
80        *,
81        stream_state: Optional[StreamState] = None,
82        stream_slice: Optional[StreamSlice] = None,
83        next_page_token: Optional[Mapping[str, Any]] = None,
84    ) -> Mapping[str, Any]:
85        return {}

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_request_body_json( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
87    def get_request_body_json(
88        self,
89        *,
90        stream_state: Optional[StreamState] = None,
91        stream_slice: Optional[StreamSlice] = None,
92        next_page_token: Optional[Mapping[str, Any]] = None,
93    ) -> Mapping[str, Any]:
94        return {}

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

@dataclass
class ChildPartitionResumableFullRefreshCursor(airbyte_cdk.legacy.sources.declarative.incremental.ResumableFullRefreshCursor):
 97@dataclass
 98class ChildPartitionResumableFullRefreshCursor(ResumableFullRefreshCursor):
 99    """
100    The Sub-stream Resumable Cursor for Full-Refresh substreams.
101    Follows the parent type `ResumableFullRefreshCursor` with a small override,
102    to provide the ability to close the substream's slice once it has finished processing.
103
104    Check the `close_slice` method overide for more info about the actual behaviour of this cursor.
105    """
106
107    def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
108        """
109        Once the current slice has finished syncing:
110         - paginator returns None
111         - no more slices to process
112
113        we assume that the records are processed and emitted already,
114        thus we have to set the cursor to ` __ab_full_refresh_sync_complete: true `,
115        otherwise there is a risk of Inf. Loop processing the same slice.
116        """
117        self._cursor = FULL_REFRESH_COMPLETE_STATE

The Sub-stream Resumable Cursor for Full-Refresh substreams. Follows the parent type ResumableFullRefreshCursor with a small override, to provide the ability to close the substream's slice once it has finished processing.

Check the close_slice method overide for more info about the actual behaviour of this cursor.

ChildPartitionResumableFullRefreshCursor(parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
def close_slice( self, stream_slice: airbyte_cdk.StreamSlice, *args: Any) -> None:
107    def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
108        """
109        Once the current slice has finished syncing:
110         - paginator returns None
111         - no more slices to process
112
113        we assume that the records are processed and emitted already,
114        thus we have to set the cursor to ` __ab_full_refresh_sync_complete: true `,
115        otherwise there is a risk of Inf. Loop processing the same slice.
116        """
117        self._cursor = FULL_REFRESH_COMPLETE_STATE
Once the current slice has finished syncing:
  • paginator returns None
  • no more slices to process

we assume that the records are processed and emitted already, thus we have to set the cursor to __ab_full_refresh_sync_complete: true, otherwise there is a risk of Inf. Loop processing the same slice.