airbyte_cdk.sources.streams.concurrent.cursor

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5import functools
  6import logging
  7from abc import ABC, abstractmethod
  8from typing import (
  9    Any,
 10    Callable,
 11    Iterable,
 12    List,
 13    Mapping,
 14    MutableMapping,
 15    Optional,
 16    Tuple,
 17    Union,
 18)
 19
 20from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
 21from airbyte_cdk.sources.message import MessageRepository
 22from airbyte_cdk.sources.streams import NO_CURSOR_STATE_KEY
 23from airbyte_cdk.sources.streams.concurrent.clamping import ClampingStrategy, NoClamping
 24from airbyte_cdk.sources.streams.concurrent.cursor_types import CursorValueType, GapType
 25from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
 26from airbyte_cdk.sources.streams.concurrent.partitions.stream_slicer import StreamSlicer
 27from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import (
 28    AbstractStreamStateConverter,
 29)
 30from airbyte_cdk.sources.types import Record, StreamSlice
 31
 32LOGGER = logging.getLogger("airbyte")
 33
 34
 35def _extract_value(mapping: Mapping[str, Any], path: List[str]) -> Any:
 36    return functools.reduce(lambda a, b: a[b], path, mapping)
 37
 38
 39class CursorField:
 40    def __init__(self, cursor_field_key: str) -> None:
 41        self.cursor_field_key = cursor_field_key
 42
 43    def extract_value(self, record: Record) -> CursorValueType:
 44        cursor_value = record.data.get(self.cursor_field_key)
 45        if cursor_value is None:
 46            raise ValueError(f"Could not find cursor field {self.cursor_field_key} in record")
 47        return cursor_value  # type: ignore  # we assume that the value the path points at is a comparable
 48
 49
 50class Cursor(StreamSlicer, ABC):
 51    @property
 52    @abstractmethod
 53    def state(self) -> MutableMapping[str, Any]: ...
 54
 55    @abstractmethod
 56    def observe(self, record: Record) -> None:
 57        """
 58        Indicate to the cursor that the record has been emitted
 59        """
 60        raise NotImplementedError()
 61
 62    @abstractmethod
 63    def close_partition(self, partition: Partition) -> None:
 64        """
 65        Indicate to the cursor that the partition has been successfully processed
 66        """
 67        raise NotImplementedError()
 68
 69    @abstractmethod
 70    def ensure_at_least_one_state_emitted(self) -> None:
 71        """
 72        State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per
 73        stream. Hence, if no partitions are generated, this method needs to be called.
 74        """
 75        raise NotImplementedError()
 76
 77    @abstractmethod
 78    def should_be_synced(self, record: Record) -> bool:
 79        pass
 80
 81    def stream_slices(self) -> Iterable[StreamSlice]:
 82        """
 83        Default placeholder implementation of generate_slices.
 84        Subclasses can override this method to provide actual behavior.
 85        """
 86        yield StreamSlice(partition={}, cursor_slice={})
 87
 88
 89class FinalStateCursor(Cursor):
 90    """Cursor that is used to guarantee at least one state message is emitted for a concurrent stream."""
 91
 92    def __init__(
 93        self,
 94        stream_name: str,
 95        stream_namespace: Optional[str],
 96        message_repository: MessageRepository,
 97    ) -> None:
 98        self._stream_name = stream_name
 99        self._stream_namespace = stream_namespace
100        self._message_repository = message_repository
101        # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel
102        # state message rather than manage overall source state. This is also only temporary as we move to the resumable
103        # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state.
104        self._connector_state_manager = ConnectorStateManager()
105        self._has_closed_at_least_one_slice = False
106
107    @property
108    def state(self) -> MutableMapping[str, Any]:
109        return {NO_CURSOR_STATE_KEY: True}
110
111    def observe(self, record: Record) -> None:
112        pass
113
114    def close_partition(self, partition: Partition) -> None:
115        pass
116
117    def ensure_at_least_one_state_emitted(self) -> None:
118        """
119        Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync
120        """
121
122        self._connector_state_manager.update_state_for_stream(
123            self._stream_name, self._stream_namespace, self.state
124        )
125        state_message = self._connector_state_manager.create_state_message(
126            self._stream_name, self._stream_namespace
127        )
128        self._message_repository.emit_message(state_message)
129
130    def should_be_synced(self, record: Record) -> bool:
131        return True
132
133
134class ConcurrentCursor(Cursor):
135    _START_BOUNDARY = 0
136    _END_BOUNDARY = 1
137
138    def __init__(
139        self,
140        stream_name: str,
141        stream_namespace: Optional[str],
142        stream_state: Any,
143        message_repository: MessageRepository,
144        connector_state_manager: ConnectorStateManager,
145        connector_state_converter: AbstractStreamStateConverter,
146        cursor_field: CursorField,
147        slice_boundary_fields: Optional[Tuple[str, str]],
148        start: Optional[CursorValueType],
149        end_provider: Callable[[], CursorValueType],
150        lookback_window: Optional[GapType] = None,
151        slice_range: Optional[GapType] = None,
152        cursor_granularity: Optional[GapType] = None,
153        clamping_strategy: ClampingStrategy = NoClamping(),
154    ) -> None:
155        self._stream_name = stream_name
156        self._stream_namespace = stream_namespace
157        self._message_repository = message_repository
158        self._connector_state_converter = connector_state_converter
159        self._connector_state_manager = connector_state_manager
160        self._cursor_field = cursor_field
161        # To see some example where the slice boundaries might not be defined, check https://github.com/airbytehq/airbyte/blob/1ce84d6396e446e1ac2377362446e3fb94509461/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py#L363-L379
162        self._slice_boundary_fields = slice_boundary_fields
163        self._start = start
164        self._end_provider = end_provider
165        self.start, self._concurrent_state = self._get_concurrent_state(stream_state)
166        self._lookback_window = lookback_window
167        self._slice_range = slice_range
168        self._most_recent_cursor_value_per_partition: MutableMapping[
169            Union[StreamSlice, Mapping[str, Any], None], Any
170        ] = {}
171        self._has_closed_at_least_one_slice = False
172        self._cursor_granularity = cursor_granularity
173        # Flag to track if the logger has been triggered (per stream)
174        self._should_be_synced_logger_triggered = False
175        self._clamping_strategy = clamping_strategy
176
177    @property
178    def state(self) -> MutableMapping[str, Any]:
179        return self._connector_state_converter.convert_to_state_message(
180            self.cursor_field, self._concurrent_state
181        )
182
183    @property
184    def cursor_field(self) -> CursorField:
185        return self._cursor_field
186
187    @property
188    def _slice_boundary_fields_wrapper(self) -> Tuple[str, str]:
189        return (
190            self._slice_boundary_fields
191            if self._slice_boundary_fields
192            else (
193                self._connector_state_converter.START_KEY,
194                self._connector_state_converter.END_KEY,
195            )
196        )
197
198    def _get_concurrent_state(
199        self, state: MutableMapping[str, Any]
200    ) -> Tuple[CursorValueType, MutableMapping[str, Any]]:
201        if self._connector_state_converter.is_state_message_compatible(state):
202            partitioned_state = self._connector_state_converter.deserialize(state)
203            slices_from_partitioned_state = partitioned_state.get("slices", [])
204
205            value_from_partitioned_state = None
206            if slices_from_partitioned_state:
207                # We assume here that the slices have been already merged
208                first_slice = slices_from_partitioned_state[0]
209                value_from_partitioned_state = (
210                    first_slice[self._connector_state_converter.MOST_RECENT_RECORD_KEY]
211                    if self._connector_state_converter.MOST_RECENT_RECORD_KEY in first_slice
212                    else first_slice[self._connector_state_converter.END_KEY]
213                )
214            return (
215                value_from_partitioned_state
216                or self._start
217                or self._connector_state_converter.zero_value,
218                partitioned_state,
219            )
220        return self._connector_state_converter.convert_from_sequential_state(
221            self._cursor_field, state, self._start
222        )
223
224    def observe(self, record: Record) -> None:
225        most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get(
226            record.associated_slice
227        )
228        try:
229            cursor_value = self._extract_cursor_value(record)
230
231            if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value:
232                self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value
233        except ValueError:
234            self._log_for_record_without_cursor_value()
235
236    def _extract_cursor_value(self, record: Record) -> Any:
237        return self._connector_state_converter.parse_value(self._cursor_field.extract_value(record))
238
239    def close_partition(self, partition: Partition) -> None:
240        slice_count_before = len(self._concurrent_state.get("slices", []))
241        self._add_slice_to_state(partition)
242        if slice_count_before < len(
243            self._concurrent_state["slices"]
244        ):  # only emit if at least one slice has been processed
245            self._merge_partitions()
246            self._emit_state_message()
247        self._has_closed_at_least_one_slice = True
248
249    def _add_slice_to_state(self, partition: Partition) -> None:
250        most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get(
251            partition.to_slice()
252        )
253
254        if self._slice_boundary_fields:
255            if "slices" not in self._concurrent_state:
256                raise RuntimeError(
257                    f"The state for stream {self._stream_name} should have at least one slice to delineate the sync start time, but no slices are present. This is unexpected. Please contact Support."
258                )
259            self._concurrent_state["slices"].append(
260                {
261                    self._connector_state_converter.START_KEY: self._extract_from_slice(
262                        partition, self._slice_boundary_fields[self._START_BOUNDARY]
263                    ),
264                    self._connector_state_converter.END_KEY: self._extract_from_slice(
265                        partition, self._slice_boundary_fields[self._END_BOUNDARY]
266                    ),
267                    self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value,
268                }
269            )
270        elif most_recent_cursor_value:
271            if self._has_closed_at_least_one_slice:
272                # If we track state value using records cursor field, we can only do that if there is one partition. This is because we save
273                # the state every time we close a partition. We assume that if there are multiple slices, they need to be providing
274                # boundaries. There are cases where partitions could not have boundaries:
275                # * The cursor should be per-partition
276                # * The stream state is actually the parent stream state
277                # There might be other cases not listed above. Those are not supported today hence the stream should not use this cursor for
278                # state management. For the specific user that was affected with this issue, we need to:
279                # * Fix state tracking (which is currently broken)
280                # * Make the new version available
281                # * (Probably) ask the user to reset the stream to avoid data loss
282                raise ValueError(
283                    "Given that slice_boundary_fields is not defined and that per-partition state is not supported, only one slice is "
284                    "expected. Please contact the Airbyte team."
285                )
286
287            self._concurrent_state["slices"].append(
288                {
289                    self._connector_state_converter.START_KEY: self.start,
290                    self._connector_state_converter.END_KEY: most_recent_cursor_value,
291                    self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value,
292                }
293            )
294
295    def _emit_state_message(self) -> None:
296        self._connector_state_manager.update_state_for_stream(
297            self._stream_name,
298            self._stream_namespace,
299            self.state,
300        )
301        state_message = self._connector_state_manager.create_state_message(
302            self._stream_name, self._stream_namespace
303        )
304        self._message_repository.emit_message(state_message)
305
306    def _merge_partitions(self) -> None:
307        self._concurrent_state["slices"] = self._connector_state_converter.merge_intervals(
308            self._concurrent_state["slices"]
309        )
310
311    def _extract_from_slice(self, partition: Partition, key: str) -> CursorValueType:
312        try:
313            _slice = partition.to_slice()
314            if not _slice:
315                raise KeyError(f"Could not find key `{key}` in empty slice")
316            return self._connector_state_converter.parse_value(_slice[key])  # type: ignore  # we expect the devs to specify a key that would return a CursorValueType
317        except KeyError as exception:
318            raise KeyError(
319                f"Partition is expected to have key `{key}` but could not be found"
320            ) from exception
321
322    def ensure_at_least_one_state_emitted(self) -> None:
323        """
324        The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be
325        called.
326        """
327        self._emit_state_message()
328
329    def stream_slices(self) -> Iterable[StreamSlice]:
330        """
331        Generating slices based on a few parameters:
332        * lookback_window: Buffer to remove from END_KEY of the highest slice
333        * slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created
334        * start: `_split_per_slice_range` will clip any value to `self._start which means that:
335          * if upper is less than self._start, no slices will be generated
336          * if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case)
337
338        Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be
339        inclusive in the API that is queried.
340        """
341        self._merge_partitions()
342
343        if self._start is not None and self._is_start_before_first_slice():
344            yield from self._split_per_slice_range(
345                self._start,
346                self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY],
347                False,
348            )
349
350        if len(self._concurrent_state["slices"]) == 1:
351            yield from self._split_per_slice_range(
352                self._calculate_lower_boundary_of_last_slice(
353                    self._concurrent_state["slices"][0][self._connector_state_converter.END_KEY]
354                ),
355                self._end_provider(),
356                True,
357            )
358        elif len(self._concurrent_state["slices"]) > 1:
359            for i in range(len(self._concurrent_state["slices"]) - 1):
360                if self._cursor_granularity:
361                    yield from self._split_per_slice_range(
362                        self._concurrent_state["slices"][i][self._connector_state_converter.END_KEY]
363                        + self._cursor_granularity,
364                        self._concurrent_state["slices"][i + 1][
365                            self._connector_state_converter.START_KEY
366                        ],
367                        False,
368                    )
369                else:
370                    yield from self._split_per_slice_range(
371                        self._concurrent_state["slices"][i][
372                            self._connector_state_converter.END_KEY
373                        ],
374                        self._concurrent_state["slices"][i + 1][
375                            self._connector_state_converter.START_KEY
376                        ],
377                        False,
378                    )
379            yield from self._split_per_slice_range(
380                self._calculate_lower_boundary_of_last_slice(
381                    self._concurrent_state["slices"][-1][self._connector_state_converter.END_KEY]
382                ),
383                self._end_provider(),
384                True,
385            )
386        else:
387            raise ValueError("Expected at least one slice")
388
389    def _is_start_before_first_slice(self) -> bool:
390        return (
391            self._start is not None
392            and self._start
393            < self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY]
394        )
395
396    def _calculate_lower_boundary_of_last_slice(
397        self, lower_boundary: CursorValueType
398    ) -> CursorValueType:
399        if self._lookback_window:
400            return lower_boundary - self._lookback_window
401        return lower_boundary
402
403    def _split_per_slice_range(
404        self, lower: CursorValueType, upper: CursorValueType, upper_is_end: bool
405    ) -> Iterable[StreamSlice]:
406        if lower >= upper:
407            return
408
409        if self._start and upper < self._start:
410            return
411
412        lower = max(lower, self._start) if self._start else lower
413        if not self._slice_range or self._evaluate_upper_safely(lower, self._slice_range) >= upper:
414            clamped_lower = self._clamping_strategy.clamp(lower)
415            clamped_upper = self._clamping_strategy.clamp(upper)
416            start_value, end_value = (
417                (clamped_lower, clamped_upper - self._cursor_granularity)
418                if self._cursor_granularity and not upper_is_end
419                else (clamped_lower, clamped_upper)
420            )
421            yield StreamSlice(
422                partition={},
423                cursor_slice={
424                    self._slice_boundary_fields_wrapper[
425                        self._START_BOUNDARY
426                    ]: self._connector_state_converter.output_format(start_value),
427                    self._slice_boundary_fields_wrapper[
428                        self._END_BOUNDARY
429                    ]: self._connector_state_converter.output_format(end_value),
430                },
431            )
432        else:
433            stop_processing = False
434            current_lower_boundary = lower
435            while not stop_processing:
436                current_upper_boundary = min(
437                    self._evaluate_upper_safely(current_lower_boundary, self._slice_range), upper
438                )
439                has_reached_upper_boundary = current_upper_boundary >= upper
440
441                clamped_upper = (
442                    self._clamping_strategy.clamp(current_upper_boundary)
443                    if current_upper_boundary != upper
444                    else current_upper_boundary
445                )
446                clamped_lower = self._clamping_strategy.clamp(current_lower_boundary)
447                if clamped_lower >= clamped_upper:
448                    # clamping collapsed both values which means that it is time to stop processing
449                    # FIXME should this be replace by proper end_provider
450                    break
451                start_value, end_value = (
452                    (clamped_lower, clamped_upper - self._cursor_granularity)
453                    if self._cursor_granularity
454                    and (not upper_is_end or not has_reached_upper_boundary)
455                    else (clamped_lower, clamped_upper)
456                )
457                yield StreamSlice(
458                    partition={},
459                    cursor_slice={
460                        self._slice_boundary_fields_wrapper[
461                            self._START_BOUNDARY
462                        ]: self._connector_state_converter.output_format(start_value),
463                        self._slice_boundary_fields_wrapper[
464                            self._END_BOUNDARY
465                        ]: self._connector_state_converter.output_format(end_value),
466                    },
467                )
468                current_lower_boundary = clamped_upper
469                if current_upper_boundary >= upper:
470                    stop_processing = True
471
472    def _evaluate_upper_safely(self, lower: CursorValueType, step: GapType) -> CursorValueType:
473        """
474        Given that we set the default step at datetime.timedelta.max, we will generate an OverflowError when evaluating the next start_date
475        This method assumes that users would never enter a step that would generate an overflow. Given that would be the case, the code
476        would have broken anyway.
477        """
478        try:
479            return lower + step
480        except OverflowError:
481            return self._end_provider()
482
483    def should_be_synced(self, record: Record) -> bool:
484        """
485        Determines if a record should be synced based on its cursor value.
486        :param record: The record to evaluate
487
488        :return: True if the record's cursor value falls within the sync boundaries
489        """
490        try:
491            record_cursor_value: CursorValueType = self._extract_cursor_value(record)
492        except ValueError:
493            self._log_for_record_without_cursor_value()
494            return True
495        return self.start <= record_cursor_value <= self._end_provider()
496
497    def _log_for_record_without_cursor_value(self) -> None:
498        if not self._should_be_synced_logger_triggered:
499            LOGGER.warning(
500                f"Could not find cursor field `{self.cursor_field.cursor_field_key}` in record for stream {self._stream_name}. The incremental sync will assume it needs to be synced"
501            )
502            self._should_be_synced_logger_triggered = True
LOGGER = <Logger airbyte (INFO)>
class CursorField:
40class CursorField:
41    def __init__(self, cursor_field_key: str) -> None:
42        self.cursor_field_key = cursor_field_key
43
44    def extract_value(self, record: Record) -> CursorValueType:
45        cursor_value = record.data.get(self.cursor_field_key)
46        if cursor_value is None:
47            raise ValueError(f"Could not find cursor field {self.cursor_field_key} in record")
48        return cursor_value  # type: ignore  # we assume that the value the path points at is a comparable
CursorField(cursor_field_key: str)
41    def __init__(self, cursor_field_key: str) -> None:
42        self.cursor_field_key = cursor_field_key
cursor_field_key
44    def extract_value(self, record: Record) -> CursorValueType:
45        cursor_value = record.data.get(self.cursor_field_key)
46        if cursor_value is None:
47            raise ValueError(f"Could not find cursor field {self.cursor_field_key} in record")
48        return cursor_value  # type: ignore  # we assume that the value the path points at is a comparable
51class Cursor(StreamSlicer, ABC):
52    @property
53    @abstractmethod
54    def state(self) -> MutableMapping[str, Any]: ...
55
56    @abstractmethod
57    def observe(self, record: Record) -> None:
58        """
59        Indicate to the cursor that the record has been emitted
60        """
61        raise NotImplementedError()
62
63    @abstractmethod
64    def close_partition(self, partition: Partition) -> None:
65        """
66        Indicate to the cursor that the partition has been successfully processed
67        """
68        raise NotImplementedError()
69
70    @abstractmethod
71    def ensure_at_least_one_state_emitted(self) -> None:
72        """
73        State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per
74        stream. Hence, if no partitions are generated, this method needs to be called.
75        """
76        raise NotImplementedError()
77
78    @abstractmethod
79    def should_be_synced(self, record: Record) -> bool:
80        pass
81
82    def stream_slices(self) -> Iterable[StreamSlice]:
83        """
84        Default placeholder implementation of generate_slices.
85        Subclasses can override this method to provide actual behavior.
86        """
87        yield StreamSlice(partition={}, cursor_slice={})

Slices the stream into chunks that can be fetched independently. Slices enable state checkpointing and data retrieval parallelization.

state: MutableMapping[str, Any]
52    @property
53    @abstractmethod
54    def state(self) -> MutableMapping[str, Any]: ...
@abstractmethod
def observe(self, record: airbyte_cdk.Record) -> None:
56    @abstractmethod
57    def observe(self, record: Record) -> None:
58        """
59        Indicate to the cursor that the record has been emitted
60        """
61        raise NotImplementedError()

Indicate to the cursor that the record has been emitted

@abstractmethod
def close_partition( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition) -> None:
63    @abstractmethod
64    def close_partition(self, partition: Partition) -> None:
65        """
66        Indicate to the cursor that the partition has been successfully processed
67        """
68        raise NotImplementedError()

Indicate to the cursor that the partition has been successfully processed

@abstractmethod
def ensure_at_least_one_state_emitted(self) -> None:
70    @abstractmethod
71    def ensure_at_least_one_state_emitted(self) -> None:
72        """
73        State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per
74        stream. Hence, if no partitions are generated, this method needs to be called.
75        """
76        raise NotImplementedError()

State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per stream. Hence, if no partitions are generated, this method needs to be called.

@abstractmethod
def should_be_synced(self, record: airbyte_cdk.Record) -> bool:
78    @abstractmethod
79    def should_be_synced(self, record: Record) -> bool:
80        pass
def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
82    def stream_slices(self) -> Iterable[StreamSlice]:
83        """
84        Default placeholder implementation of generate_slices.
85        Subclasses can override this method to provide actual behavior.
86        """
87        yield StreamSlice(partition={}, cursor_slice={})

Default placeholder implementation of generate_slices. Subclasses can override this method to provide actual behavior.

class FinalStateCursor(Cursor):
 90class FinalStateCursor(Cursor):
 91    """Cursor that is used to guarantee at least one state message is emitted for a concurrent stream."""
 92
 93    def __init__(
 94        self,
 95        stream_name: str,
 96        stream_namespace: Optional[str],
 97        message_repository: MessageRepository,
 98    ) -> None:
 99        self._stream_name = stream_name
100        self._stream_namespace = stream_namespace
101        self._message_repository = message_repository
102        # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel
103        # state message rather than manage overall source state. This is also only temporary as we move to the resumable
104        # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state.
105        self._connector_state_manager = ConnectorStateManager()
106        self._has_closed_at_least_one_slice = False
107
108    @property
109    def state(self) -> MutableMapping[str, Any]:
110        return {NO_CURSOR_STATE_KEY: True}
111
112    def observe(self, record: Record) -> None:
113        pass
114
115    def close_partition(self, partition: Partition) -> None:
116        pass
117
118    def ensure_at_least_one_state_emitted(self) -> None:
119        """
120        Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync
121        """
122
123        self._connector_state_manager.update_state_for_stream(
124            self._stream_name, self._stream_namespace, self.state
125        )
126        state_message = self._connector_state_manager.create_state_message(
127            self._stream_name, self._stream_namespace
128        )
129        self._message_repository.emit_message(state_message)
130
131    def should_be_synced(self, record: Record) -> bool:
132        return True

Cursor that is used to guarantee at least one state message is emitted for a concurrent stream.

FinalStateCursor( stream_name: str, stream_namespace: Optional[str], message_repository: airbyte_cdk.MessageRepository)
 93    def __init__(
 94        self,
 95        stream_name: str,
 96        stream_namespace: Optional[str],
 97        message_repository: MessageRepository,
 98    ) -> None:
 99        self._stream_name = stream_name
100        self._stream_namespace = stream_namespace
101        self._message_repository = message_repository
102        # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel
103        # state message rather than manage overall source state. This is also only temporary as we move to the resumable
104        # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state.
105        self._connector_state_manager = ConnectorStateManager()
106        self._has_closed_at_least_one_slice = False
state: MutableMapping[str, Any]
108    @property
109    def state(self) -> MutableMapping[str, Any]:
110        return {NO_CURSOR_STATE_KEY: True}
def observe(self, record: airbyte_cdk.Record) -> None:
112    def observe(self, record: Record) -> None:
113        pass

Indicate to the cursor that the record has been emitted

def close_partition( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition) -> None:
115    def close_partition(self, partition: Partition) -> None:
116        pass

Indicate to the cursor that the partition has been successfully processed

def ensure_at_least_one_state_emitted(self) -> None:
118    def ensure_at_least_one_state_emitted(self) -> None:
119        """
120        Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync
121        """
122
123        self._connector_state_manager.update_state_for_stream(
124            self._stream_name, self._stream_namespace, self.state
125        )
126        state_message = self._connector_state_manager.create_state_message(
127            self._stream_name, self._stream_namespace
128        )
129        self._message_repository.emit_message(state_message)

Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync

def should_be_synced(self, record: airbyte_cdk.Record) -> bool:
131    def should_be_synced(self, record: Record) -> bool:
132        return True
Inherited Members
Cursor
stream_slices
class ConcurrentCursor(Cursor):
135class ConcurrentCursor(Cursor):
136    _START_BOUNDARY = 0
137    _END_BOUNDARY = 1
138
139    def __init__(
140        self,
141        stream_name: str,
142        stream_namespace: Optional[str],
143        stream_state: Any,
144        message_repository: MessageRepository,
145        connector_state_manager: ConnectorStateManager,
146        connector_state_converter: AbstractStreamStateConverter,
147        cursor_field: CursorField,
148        slice_boundary_fields: Optional[Tuple[str, str]],
149        start: Optional[CursorValueType],
150        end_provider: Callable[[], CursorValueType],
151        lookback_window: Optional[GapType] = None,
152        slice_range: Optional[GapType] = None,
153        cursor_granularity: Optional[GapType] = None,
154        clamping_strategy: ClampingStrategy = NoClamping(),
155    ) -> None:
156        self._stream_name = stream_name
157        self._stream_namespace = stream_namespace
158        self._message_repository = message_repository
159        self._connector_state_converter = connector_state_converter
160        self._connector_state_manager = connector_state_manager
161        self._cursor_field = cursor_field
162        # To see some example where the slice boundaries might not be defined, check https://github.com/airbytehq/airbyte/blob/1ce84d6396e446e1ac2377362446e3fb94509461/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py#L363-L379
163        self._slice_boundary_fields = slice_boundary_fields
164        self._start = start
165        self._end_provider = end_provider
166        self.start, self._concurrent_state = self._get_concurrent_state(stream_state)
167        self._lookback_window = lookback_window
168        self._slice_range = slice_range
169        self._most_recent_cursor_value_per_partition: MutableMapping[
170            Union[StreamSlice, Mapping[str, Any], None], Any
171        ] = {}
172        self._has_closed_at_least_one_slice = False
173        self._cursor_granularity = cursor_granularity
174        # Flag to track if the logger has been triggered (per stream)
175        self._should_be_synced_logger_triggered = False
176        self._clamping_strategy = clamping_strategy
177
178    @property
179    def state(self) -> MutableMapping[str, Any]:
180        return self._connector_state_converter.convert_to_state_message(
181            self.cursor_field, self._concurrent_state
182        )
183
184    @property
185    def cursor_field(self) -> CursorField:
186        return self._cursor_field
187
188    @property
189    def _slice_boundary_fields_wrapper(self) -> Tuple[str, str]:
190        return (
191            self._slice_boundary_fields
192            if self._slice_boundary_fields
193            else (
194                self._connector_state_converter.START_KEY,
195                self._connector_state_converter.END_KEY,
196            )
197        )
198
199    def _get_concurrent_state(
200        self, state: MutableMapping[str, Any]
201    ) -> Tuple[CursorValueType, MutableMapping[str, Any]]:
202        if self._connector_state_converter.is_state_message_compatible(state):
203            partitioned_state = self._connector_state_converter.deserialize(state)
204            slices_from_partitioned_state = partitioned_state.get("slices", [])
205
206            value_from_partitioned_state = None
207            if slices_from_partitioned_state:
208                # We assume here that the slices have been already merged
209                first_slice = slices_from_partitioned_state[0]
210                value_from_partitioned_state = (
211                    first_slice[self._connector_state_converter.MOST_RECENT_RECORD_KEY]
212                    if self._connector_state_converter.MOST_RECENT_RECORD_KEY in first_slice
213                    else first_slice[self._connector_state_converter.END_KEY]
214                )
215            return (
216                value_from_partitioned_state
217                or self._start
218                or self._connector_state_converter.zero_value,
219                partitioned_state,
220            )
221        return self._connector_state_converter.convert_from_sequential_state(
222            self._cursor_field, state, self._start
223        )
224
225    def observe(self, record: Record) -> None:
226        most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get(
227            record.associated_slice
228        )
229        try:
230            cursor_value = self._extract_cursor_value(record)
231
232            if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value:
233                self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value
234        except ValueError:
235            self._log_for_record_without_cursor_value()
236
237    def _extract_cursor_value(self, record: Record) -> Any:
238        return self._connector_state_converter.parse_value(self._cursor_field.extract_value(record))
239
240    def close_partition(self, partition: Partition) -> None:
241        slice_count_before = len(self._concurrent_state.get("slices", []))
242        self._add_slice_to_state(partition)
243        if slice_count_before < len(
244            self._concurrent_state["slices"]
245        ):  # only emit if at least one slice has been processed
246            self._merge_partitions()
247            self._emit_state_message()
248        self._has_closed_at_least_one_slice = True
249
250    def _add_slice_to_state(self, partition: Partition) -> None:
251        most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get(
252            partition.to_slice()
253        )
254
255        if self._slice_boundary_fields:
256            if "slices" not in self._concurrent_state:
257                raise RuntimeError(
258                    f"The state for stream {self._stream_name} should have at least one slice to delineate the sync start time, but no slices are present. This is unexpected. Please contact Support."
259                )
260            self._concurrent_state["slices"].append(
261                {
262                    self._connector_state_converter.START_KEY: self._extract_from_slice(
263                        partition, self._slice_boundary_fields[self._START_BOUNDARY]
264                    ),
265                    self._connector_state_converter.END_KEY: self._extract_from_slice(
266                        partition, self._slice_boundary_fields[self._END_BOUNDARY]
267                    ),
268                    self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value,
269                }
270            )
271        elif most_recent_cursor_value:
272            if self._has_closed_at_least_one_slice:
273                # If we track state value using records cursor field, we can only do that if there is one partition. This is because we save
274                # the state every time we close a partition. We assume that if there are multiple slices, they need to be providing
275                # boundaries. There are cases where partitions could not have boundaries:
276                # * The cursor should be per-partition
277                # * The stream state is actually the parent stream state
278                # There might be other cases not listed above. Those are not supported today hence the stream should not use this cursor for
279                # state management. For the specific user that was affected with this issue, we need to:
280                # * Fix state tracking (which is currently broken)
281                # * Make the new version available
282                # * (Probably) ask the user to reset the stream to avoid data loss
283                raise ValueError(
284                    "Given that slice_boundary_fields is not defined and that per-partition state is not supported, only one slice is "
285                    "expected. Please contact the Airbyte team."
286                )
287
288            self._concurrent_state["slices"].append(
289                {
290                    self._connector_state_converter.START_KEY: self.start,
291                    self._connector_state_converter.END_KEY: most_recent_cursor_value,
292                    self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value,
293                }
294            )
295
296    def _emit_state_message(self) -> None:
297        self._connector_state_manager.update_state_for_stream(
298            self._stream_name,
299            self._stream_namespace,
300            self.state,
301        )
302        state_message = self._connector_state_manager.create_state_message(
303            self._stream_name, self._stream_namespace
304        )
305        self._message_repository.emit_message(state_message)
306
307    def _merge_partitions(self) -> None:
308        self._concurrent_state["slices"] = self._connector_state_converter.merge_intervals(
309            self._concurrent_state["slices"]
310        )
311
312    def _extract_from_slice(self, partition: Partition, key: str) -> CursorValueType:
313        try:
314            _slice = partition.to_slice()
315            if not _slice:
316                raise KeyError(f"Could not find key `{key}` in empty slice")
317            return self._connector_state_converter.parse_value(_slice[key])  # type: ignore  # we expect the devs to specify a key that would return a CursorValueType
318        except KeyError as exception:
319            raise KeyError(
320                f"Partition is expected to have key `{key}` but could not be found"
321            ) from exception
322
323    def ensure_at_least_one_state_emitted(self) -> None:
324        """
325        The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be
326        called.
327        """
328        self._emit_state_message()
329
330    def stream_slices(self) -> Iterable[StreamSlice]:
331        """
332        Generating slices based on a few parameters:
333        * lookback_window: Buffer to remove from END_KEY of the highest slice
334        * slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created
335        * start: `_split_per_slice_range` will clip any value to `self._start which means that:
336          * if upper is less than self._start, no slices will be generated
337          * if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case)
338
339        Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be
340        inclusive in the API that is queried.
341        """
342        self._merge_partitions()
343
344        if self._start is not None and self._is_start_before_first_slice():
345            yield from self._split_per_slice_range(
346                self._start,
347                self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY],
348                False,
349            )
350
351        if len(self._concurrent_state["slices"]) == 1:
352            yield from self._split_per_slice_range(
353                self._calculate_lower_boundary_of_last_slice(
354                    self._concurrent_state["slices"][0][self._connector_state_converter.END_KEY]
355                ),
356                self._end_provider(),
357                True,
358            )
359        elif len(self._concurrent_state["slices"]) > 1:
360            for i in range(len(self._concurrent_state["slices"]) - 1):
361                if self._cursor_granularity:
362                    yield from self._split_per_slice_range(
363                        self._concurrent_state["slices"][i][self._connector_state_converter.END_KEY]
364                        + self._cursor_granularity,
365                        self._concurrent_state["slices"][i + 1][
366                            self._connector_state_converter.START_KEY
367                        ],
368                        False,
369                    )
370                else:
371                    yield from self._split_per_slice_range(
372                        self._concurrent_state["slices"][i][
373                            self._connector_state_converter.END_KEY
374                        ],
375                        self._concurrent_state["slices"][i + 1][
376                            self._connector_state_converter.START_KEY
377                        ],
378                        False,
379                    )
380            yield from self._split_per_slice_range(
381                self._calculate_lower_boundary_of_last_slice(
382                    self._concurrent_state["slices"][-1][self._connector_state_converter.END_KEY]
383                ),
384                self._end_provider(),
385                True,
386            )
387        else:
388            raise ValueError("Expected at least one slice")
389
390    def _is_start_before_first_slice(self) -> bool:
391        return (
392            self._start is not None
393            and self._start
394            < self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY]
395        )
396
397    def _calculate_lower_boundary_of_last_slice(
398        self, lower_boundary: CursorValueType
399    ) -> CursorValueType:
400        if self._lookback_window:
401            return lower_boundary - self._lookback_window
402        return lower_boundary
403
404    def _split_per_slice_range(
405        self, lower: CursorValueType, upper: CursorValueType, upper_is_end: bool
406    ) -> Iterable[StreamSlice]:
407        if lower >= upper:
408            return
409
410        if self._start and upper < self._start:
411            return
412
413        lower = max(lower, self._start) if self._start else lower
414        if not self._slice_range or self._evaluate_upper_safely(lower, self._slice_range) >= upper:
415            clamped_lower = self._clamping_strategy.clamp(lower)
416            clamped_upper = self._clamping_strategy.clamp(upper)
417            start_value, end_value = (
418                (clamped_lower, clamped_upper - self._cursor_granularity)
419                if self._cursor_granularity and not upper_is_end
420                else (clamped_lower, clamped_upper)
421            )
422            yield StreamSlice(
423                partition={},
424                cursor_slice={
425                    self._slice_boundary_fields_wrapper[
426                        self._START_BOUNDARY
427                    ]: self._connector_state_converter.output_format(start_value),
428                    self._slice_boundary_fields_wrapper[
429                        self._END_BOUNDARY
430                    ]: self._connector_state_converter.output_format(end_value),
431                },
432            )
433        else:
434            stop_processing = False
435            current_lower_boundary = lower
436            while not stop_processing:
437                current_upper_boundary = min(
438                    self._evaluate_upper_safely(current_lower_boundary, self._slice_range), upper
439                )
440                has_reached_upper_boundary = current_upper_boundary >= upper
441
442                clamped_upper = (
443                    self._clamping_strategy.clamp(current_upper_boundary)
444                    if current_upper_boundary != upper
445                    else current_upper_boundary
446                )
447                clamped_lower = self._clamping_strategy.clamp(current_lower_boundary)
448                if clamped_lower >= clamped_upper:
449                    # clamping collapsed both values which means that it is time to stop processing
450                    # FIXME should this be replace by proper end_provider
451                    break
452                start_value, end_value = (
453                    (clamped_lower, clamped_upper - self._cursor_granularity)
454                    if self._cursor_granularity
455                    and (not upper_is_end or not has_reached_upper_boundary)
456                    else (clamped_lower, clamped_upper)
457                )
458                yield StreamSlice(
459                    partition={},
460                    cursor_slice={
461                        self._slice_boundary_fields_wrapper[
462                            self._START_BOUNDARY
463                        ]: self._connector_state_converter.output_format(start_value),
464                        self._slice_boundary_fields_wrapper[
465                            self._END_BOUNDARY
466                        ]: self._connector_state_converter.output_format(end_value),
467                    },
468                )
469                current_lower_boundary = clamped_upper
470                if current_upper_boundary >= upper:
471                    stop_processing = True
472
473    def _evaluate_upper_safely(self, lower: CursorValueType, step: GapType) -> CursorValueType:
474        """
475        Given that we set the default step at datetime.timedelta.max, we will generate an OverflowError when evaluating the next start_date
476        This method assumes that users would never enter a step that would generate an overflow. Given that would be the case, the code
477        would have broken anyway.
478        """
479        try:
480            return lower + step
481        except OverflowError:
482            return self._end_provider()
483
484    def should_be_synced(self, record: Record) -> bool:
485        """
486        Determines if a record should be synced based on its cursor value.
487        :param record: The record to evaluate
488
489        :return: True if the record's cursor value falls within the sync boundaries
490        """
491        try:
492            record_cursor_value: CursorValueType = self._extract_cursor_value(record)
493        except ValueError:
494            self._log_for_record_without_cursor_value()
495            return True
496        return self.start <= record_cursor_value <= self._end_provider()
497
498    def _log_for_record_without_cursor_value(self) -> None:
499        if not self._should_be_synced_logger_triggered:
500            LOGGER.warning(
501                f"Could not find cursor field `{self.cursor_field.cursor_field_key}` in record for stream {self._stream_name}. The incremental sync will assume it needs to be synced"
502            )
503            self._should_be_synced_logger_triggered = True

Slices the stream into chunks that can be fetched independently. Slices enable state checkpointing and data retrieval parallelization.

ConcurrentCursor( stream_name: str, stream_namespace: Optional[str], stream_state: Any, message_repository: airbyte_cdk.MessageRepository, connector_state_manager: airbyte_cdk.ConnectorStateManager, connector_state_converter: airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter.AbstractStreamStateConverter, cursor_field: CursorField, slice_boundary_fields: Optional[Tuple[str, str]], start: Optional[airbyte_cdk.sources.streams.concurrent.cursor_types.CursorValueType], end_provider: Callable[[], airbyte_cdk.sources.streams.concurrent.cursor_types.CursorValueType], lookback_window: Optional[airbyte_cdk.sources.streams.concurrent.cursor_types.GapType] = None, slice_range: Optional[airbyte_cdk.sources.streams.concurrent.cursor_types.GapType] = None, cursor_granularity: Optional[airbyte_cdk.sources.streams.concurrent.cursor_types.GapType] = None, clamping_strategy: airbyte_cdk.sources.streams.concurrent.clamping.ClampingStrategy = <airbyte_cdk.sources.streams.concurrent.clamping.NoClamping object>)
139    def __init__(
140        self,
141        stream_name: str,
142        stream_namespace: Optional[str],
143        stream_state: Any,
144        message_repository: MessageRepository,
145        connector_state_manager: ConnectorStateManager,
146        connector_state_converter: AbstractStreamStateConverter,
147        cursor_field: CursorField,
148        slice_boundary_fields: Optional[Tuple[str, str]],
149        start: Optional[CursorValueType],
150        end_provider: Callable[[], CursorValueType],
151        lookback_window: Optional[GapType] = None,
152        slice_range: Optional[GapType] = None,
153        cursor_granularity: Optional[GapType] = None,
154        clamping_strategy: ClampingStrategy = NoClamping(),
155    ) -> None:
156        self._stream_name = stream_name
157        self._stream_namespace = stream_namespace
158        self._message_repository = message_repository
159        self._connector_state_converter = connector_state_converter
160        self._connector_state_manager = connector_state_manager
161        self._cursor_field = cursor_field
162        # To see some example where the slice boundaries might not be defined, check https://github.com/airbytehq/airbyte/blob/1ce84d6396e446e1ac2377362446e3fb94509461/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py#L363-L379
163        self._slice_boundary_fields = slice_boundary_fields
164        self._start = start
165        self._end_provider = end_provider
166        self.start, self._concurrent_state = self._get_concurrent_state(stream_state)
167        self._lookback_window = lookback_window
168        self._slice_range = slice_range
169        self._most_recent_cursor_value_per_partition: MutableMapping[
170            Union[StreamSlice, Mapping[str, Any], None], Any
171        ] = {}
172        self._has_closed_at_least_one_slice = False
173        self._cursor_granularity = cursor_granularity
174        # Flag to track if the logger has been triggered (per stream)
175        self._should_be_synced_logger_triggered = False
176        self._clamping_strategy = clamping_strategy
state: MutableMapping[str, Any]
178    @property
179    def state(self) -> MutableMapping[str, Any]:
180        return self._connector_state_converter.convert_to_state_message(
181            self.cursor_field, self._concurrent_state
182        )
cursor_field: CursorField
184    @property
185    def cursor_field(self) -> CursorField:
186        return self._cursor_field
def observe(self, record: airbyte_cdk.Record) -> None:
225    def observe(self, record: Record) -> None:
226        most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get(
227            record.associated_slice
228        )
229        try:
230            cursor_value = self._extract_cursor_value(record)
231
232            if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value:
233                self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value
234        except ValueError:
235            self._log_for_record_without_cursor_value()

Indicate to the cursor that the record has been emitted

def close_partition( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition) -> None:
240    def close_partition(self, partition: Partition) -> None:
241        slice_count_before = len(self._concurrent_state.get("slices", []))
242        self._add_slice_to_state(partition)
243        if slice_count_before < len(
244            self._concurrent_state["slices"]
245        ):  # only emit if at least one slice has been processed
246            self._merge_partitions()
247            self._emit_state_message()
248        self._has_closed_at_least_one_slice = True

Indicate to the cursor that the partition has been successfully processed

def ensure_at_least_one_state_emitted(self) -> None:
323    def ensure_at_least_one_state_emitted(self) -> None:
324        """
325        The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be
326        called.
327        """
328        self._emit_state_message()

The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be called.

def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
330    def stream_slices(self) -> Iterable[StreamSlice]:
331        """
332        Generating slices based on a few parameters:
333        * lookback_window: Buffer to remove from END_KEY of the highest slice
334        * slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created
335        * start: `_split_per_slice_range` will clip any value to `self._start which means that:
336          * if upper is less than self._start, no slices will be generated
337          * if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case)
338
339        Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be
340        inclusive in the API that is queried.
341        """
342        self._merge_partitions()
343
344        if self._start is not None and self._is_start_before_first_slice():
345            yield from self._split_per_slice_range(
346                self._start,
347                self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY],
348                False,
349            )
350
351        if len(self._concurrent_state["slices"]) == 1:
352            yield from self._split_per_slice_range(
353                self._calculate_lower_boundary_of_last_slice(
354                    self._concurrent_state["slices"][0][self._connector_state_converter.END_KEY]
355                ),
356                self._end_provider(),
357                True,
358            )
359        elif len(self._concurrent_state["slices"]) > 1:
360            for i in range(len(self._concurrent_state["slices"]) - 1):
361                if self._cursor_granularity:
362                    yield from self._split_per_slice_range(
363                        self._concurrent_state["slices"][i][self._connector_state_converter.END_KEY]
364                        + self._cursor_granularity,
365                        self._concurrent_state["slices"][i + 1][
366                            self._connector_state_converter.START_KEY
367                        ],
368                        False,
369                    )
370                else:
371                    yield from self._split_per_slice_range(
372                        self._concurrent_state["slices"][i][
373                            self._connector_state_converter.END_KEY
374                        ],
375                        self._concurrent_state["slices"][i + 1][
376                            self._connector_state_converter.START_KEY
377                        ],
378                        False,
379                    )
380            yield from self._split_per_slice_range(
381                self._calculate_lower_boundary_of_last_slice(
382                    self._concurrent_state["slices"][-1][self._connector_state_converter.END_KEY]
383                ),
384                self._end_provider(),
385                True,
386            )
387        else:
388            raise ValueError("Expected at least one slice")

Generating slices based on a few parameters:

  • lookback_window: Buffer to remove from END_KEY of the highest slice
  • slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created
  • start: _split_per_slice_range will clip any value to `self._start which means that:
    • if upper is less than self._start, no slices will be generated
    • if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case)

Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be inclusive in the API that is queried.

def should_be_synced(self, record: airbyte_cdk.Record) -> bool:
484    def should_be_synced(self, record: Record) -> bool:
485        """
486        Determines if a record should be synced based on its cursor value.
487        :param record: The record to evaluate
488
489        :return: True if the record's cursor value falls within the sync boundaries
490        """
491        try:
492            record_cursor_value: CursorValueType = self._extract_cursor_value(record)
493        except ValueError:
494            self._log_for_record_without_cursor_value()
495            return True
496        return self.start <= record_cursor_value <= self._end_provider()

Determines if a record should be synced based on its cursor value.

Parameters
  • record: The record to evaluate
Returns

True if the record's cursor value falls within the sync boundaries