airbyte_cdk.sources.streams.concurrent.cursor

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5import functools
  6import logging
  7import threading
  8from abc import ABC, abstractmethod
  9from typing import (
 10    Any,
 11    Callable,
 12    Iterable,
 13    List,
 14    Mapping,
 15    MutableMapping,
 16    Optional,
 17    Tuple,
 18    Union,
 19)
 20
 21from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
 22from airbyte_cdk.sources.message import MessageRepository
 23from airbyte_cdk.sources.streams import NO_CURSOR_STATE_KEY
 24from airbyte_cdk.sources.streams.concurrent.clamping import ClampingStrategy, NoClamping
 25from airbyte_cdk.sources.streams.concurrent.cursor_types import CursorValueType, GapType
 26from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
 27from airbyte_cdk.sources.streams.concurrent.partitions.stream_slicer import StreamSlicer
 28from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import (
 29    AbstractStreamStateConverter,
 30)
 31from airbyte_cdk.sources.types import Record, StreamSlice
 32
 33LOGGER = logging.getLogger("airbyte")
 34
 35
 36def _extract_value(mapping: Mapping[str, Any], path: List[str]) -> Any:
 37    return functools.reduce(lambda a, b: a[b], path, mapping)
 38
 39
 40class CursorField:
 41    def __init__(self, cursor_field_key: str) -> None:
 42        self.cursor_field_key = cursor_field_key
 43
 44    def extract_value(self, record: Record) -> CursorValueType:
 45        cursor_value = record.data.get(self.cursor_field_key)
 46        if cursor_value is None:
 47            raise ValueError(f"Could not find cursor field {self.cursor_field_key} in record")
 48        return cursor_value  # type: ignore  # we assume that the value the path points at is a comparable
 49
 50
 51class Cursor(StreamSlicer, ABC):
 52    @property
 53    @abstractmethod
 54    def state(self) -> MutableMapping[str, Any]: ...
 55
 56    @abstractmethod
 57    def observe(self, record: Record) -> None:
 58        """
 59        Indicate to the cursor that the record has been emitted
 60        """
 61        raise NotImplementedError()
 62
 63    @abstractmethod
 64    def close_partition(self, partition: Partition) -> None:
 65        """
 66        Indicate to the cursor that the partition has been successfully processed
 67        """
 68        raise NotImplementedError()
 69
 70    @abstractmethod
 71    def ensure_at_least_one_state_emitted(self) -> None:
 72        """
 73        State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per
 74        stream. Hence, if no partitions are generated, this method needs to be called.
 75        """
 76        raise NotImplementedError()
 77
 78    @abstractmethod
 79    def should_be_synced(self, record: Record) -> bool:
 80        pass
 81
 82    def stream_slices(self) -> Iterable[StreamSlice]:
 83        """
 84        Default placeholder implementation of generate_slices.
 85        Subclasses can override this method to provide actual behavior.
 86        """
 87        yield StreamSlice(partition={}, cursor_slice={})
 88
 89
 90class FinalStateCursor(Cursor):
 91    """Cursor that is used to guarantee at least one state message is emitted for a concurrent stream."""
 92
 93    def __init__(
 94        self,
 95        stream_name: str,
 96        stream_namespace: Optional[str],
 97        message_repository: MessageRepository,
 98    ) -> None:
 99        self._stream_name = stream_name
100        self._stream_namespace = stream_namespace
101        self._message_repository = message_repository
102        # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel
103        # state message rather than manage overall source state. This is also only temporary as we move to the resumable
104        # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state.
105        self._connector_state_manager = ConnectorStateManager()
106        self._has_closed_at_least_one_slice = False
107
108    @property
109    def state(self) -> MutableMapping[str, Any]:
110        return {NO_CURSOR_STATE_KEY: True}
111
112    def observe(self, record: Record) -> None:
113        pass
114
115    def close_partition(self, partition: Partition) -> None:
116        pass
117
118    def ensure_at_least_one_state_emitted(self) -> None:
119        """
120        Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync
121        """
122
123        self._connector_state_manager.update_state_for_stream(
124            self._stream_name, self._stream_namespace, self.state
125        )
126        state_message = self._connector_state_manager.create_state_message(
127            self._stream_name, self._stream_namespace
128        )
129        self._message_repository.emit_message(state_message)
130
131    def should_be_synced(self, record: Record) -> bool:
132        return True
133
134
135class ConcurrentCursor(Cursor):
136    _START_BOUNDARY = 0
137    _END_BOUNDARY = 1
138
139    def __init__(
140        self,
141        stream_name: str,
142        stream_namespace: Optional[str],
143        stream_state: Any,
144        message_repository: MessageRepository,
145        connector_state_manager: ConnectorStateManager,
146        connector_state_converter: AbstractStreamStateConverter,
147        cursor_field: CursorField,
148        slice_boundary_fields: Optional[Tuple[str, str]],
149        start: Optional[CursorValueType],
150        end_provider: Callable[[], CursorValueType],
151        lookback_window: Optional[GapType] = None,
152        slice_range: Optional[GapType] = None,
153        cursor_granularity: Optional[GapType] = None,
154        clamping_strategy: ClampingStrategy = NoClamping(),
155    ) -> None:
156        self._stream_name = stream_name
157        self._stream_namespace = stream_namespace
158        self._message_repository = message_repository
159        self._connector_state_converter = connector_state_converter
160        self._connector_state_manager = connector_state_manager
161        self._cursor_field = cursor_field
162        # To see some example where the slice boundaries might not be defined, check https://github.com/airbytehq/airbyte/blob/1ce84d6396e446e1ac2377362446e3fb94509461/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py#L363-L379
163        self._slice_boundary_fields = slice_boundary_fields
164        self._start = start
165        self._end_provider = end_provider
166        self.start, self._concurrent_state = self._get_concurrent_state(stream_state)
167        self._lookback_window = lookback_window
168        self._slice_range = slice_range
169        self._most_recent_cursor_value_per_partition: MutableMapping[
170            Union[StreamSlice, Mapping[str, Any], None], Any
171        ] = {}
172        self._has_closed_at_least_one_slice = False
173        self._cursor_granularity = cursor_granularity
174        # Flag to track if the logger has been triggered (per stream)
175        self._should_be_synced_logger_triggered = False
176        self._clamping_strategy = clamping_strategy
177
178        # A lock is required when closing a partition because updating the cursor's concurrent_state is
179        # not thread safe. When multiple partitions are being closed by the cursor at the same time, it is
180        # possible for one partition to update concurrent_state after a second partition has already read
181        # the previous state. This can lead to the second partition overwriting the previous one's state.
182        self._lock = threading.Lock()
183
184    @property
185    def state(self) -> MutableMapping[str, Any]:
186        return self._connector_state_converter.convert_to_state_message(
187            self.cursor_field, self._concurrent_state
188        )
189
190    @property
191    def cursor_field(self) -> CursorField:
192        return self._cursor_field
193
194    @property
195    def _slice_boundary_fields_wrapper(self) -> Tuple[str, str]:
196        return (
197            self._slice_boundary_fields
198            if self._slice_boundary_fields
199            else (
200                self._connector_state_converter.START_KEY,
201                self._connector_state_converter.END_KEY,
202            )
203        )
204
205    def _get_concurrent_state(
206        self, state: MutableMapping[str, Any]
207    ) -> Tuple[CursorValueType, MutableMapping[str, Any]]:
208        if self._connector_state_converter.is_state_message_compatible(state):
209            partitioned_state = self._connector_state_converter.deserialize(state)
210            slices_from_partitioned_state = partitioned_state.get("slices", [])
211
212            value_from_partitioned_state = None
213            if slices_from_partitioned_state:
214                # We assume here that the slices have been already merged
215                first_slice = slices_from_partitioned_state[0]
216                value_from_partitioned_state = (
217                    first_slice[self._connector_state_converter.MOST_RECENT_RECORD_KEY]
218                    if self._connector_state_converter.MOST_RECENT_RECORD_KEY in first_slice
219                    else first_slice[self._connector_state_converter.END_KEY]
220                )
221            return (
222                value_from_partitioned_state
223                or self._start
224                or self._connector_state_converter.zero_value,
225                partitioned_state,
226            )
227        return self._connector_state_converter.convert_from_sequential_state(
228            self._cursor_field, state, self._start
229        )
230
231    def observe(self, record: Record) -> None:
232        # Because observe writes to the most_recent_cursor_value_per_partition mapping,
233        # it is not thread-safe. However, this shouldn't lead to concurrency issues because
234        # observe() is only invoked by PartitionReader.process_partition(). Since the map is
235        # broken down according to partition, concurrent threads processing only read/write
236        # from different keys which avoids any conflicts.
237        #
238        # If we were to add thread safety, we should implement a lock per-partition
239        # which is instantiated during stream_slices()
240        most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get(
241            record.associated_slice
242        )
243        try:
244            cursor_value = self._extract_cursor_value(record)
245
246            if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value:
247                self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value
248        except ValueError:
249            self._log_for_record_without_cursor_value()
250
251    def _extract_cursor_value(self, record: Record) -> Any:
252        return self._connector_state_converter.parse_value(self._cursor_field.extract_value(record))
253
254    def close_partition(self, partition: Partition) -> None:
255        with self._lock:
256            slice_count_before = len(self._concurrent_state.get("slices", []))
257            self._add_slice_to_state(partition)
258            if slice_count_before < len(
259                self._concurrent_state["slices"]
260            ):  # only emit if at least one slice has been processed
261                self._merge_partitions()
262                self._emit_state_message()
263        self._has_closed_at_least_one_slice = True
264
265    def _add_slice_to_state(self, partition: Partition) -> None:
266        most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get(
267            partition.to_slice()
268        )
269
270        if self._slice_boundary_fields:
271            if "slices" not in self._concurrent_state:
272                raise RuntimeError(
273                    f"The state for stream {self._stream_name} should have at least one slice to delineate the sync start time, but no slices are present. This is unexpected. Please contact Support."
274                )
275            self._concurrent_state["slices"].append(
276                {
277                    self._connector_state_converter.START_KEY: self._extract_from_slice(
278                        partition, self._slice_boundary_fields[self._START_BOUNDARY]
279                    ),
280                    self._connector_state_converter.END_KEY: self._extract_from_slice(
281                        partition, self._slice_boundary_fields[self._END_BOUNDARY]
282                    ),
283                    self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value,
284                }
285            )
286        elif most_recent_cursor_value:
287            if self._has_closed_at_least_one_slice:
288                # If we track state value using records cursor field, we can only do that if there is one partition. This is because we save
289                # the state every time we close a partition. We assume that if there are multiple slices, they need to be providing
290                # boundaries. There are cases where partitions could not have boundaries:
291                # * The cursor should be per-partition
292                # * The stream state is actually the parent stream state
293                # There might be other cases not listed above. Those are not supported today hence the stream should not use this cursor for
294                # state management. For the specific user that was affected with this issue, we need to:
295                # * Fix state tracking (which is currently broken)
296                # * Make the new version available
297                # * (Probably) ask the user to reset the stream to avoid data loss
298                raise ValueError(
299                    "Given that slice_boundary_fields is not defined and that per-partition state is not supported, only one slice is "
300                    "expected. Please contact the Airbyte team."
301                )
302
303            self._concurrent_state["slices"].append(
304                {
305                    self._connector_state_converter.START_KEY: self.start,
306                    self._connector_state_converter.END_KEY: most_recent_cursor_value,
307                    self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value,
308                }
309            )
310
311    def _emit_state_message(self) -> None:
312        self._connector_state_manager.update_state_for_stream(
313            self._stream_name,
314            self._stream_namespace,
315            self.state,
316        )
317        state_message = self._connector_state_manager.create_state_message(
318            self._stream_name, self._stream_namespace
319        )
320        self._message_repository.emit_message(state_message)
321
322    def _merge_partitions(self) -> None:
323        self._concurrent_state["slices"] = self._connector_state_converter.merge_intervals(
324            self._concurrent_state["slices"]
325        )
326
327    def _extract_from_slice(self, partition: Partition, key: str) -> CursorValueType:
328        try:
329            _slice = partition.to_slice()
330            if not _slice:
331                raise KeyError(f"Could not find key `{key}` in empty slice")
332            return self._connector_state_converter.parse_value(_slice[key])  # type: ignore  # we expect the devs to specify a key that would return a CursorValueType
333        except KeyError as exception:
334            raise KeyError(
335                f"Partition is expected to have key `{key}` but could not be found"
336            ) from exception
337
338    def ensure_at_least_one_state_emitted(self) -> None:
339        """
340        The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be
341        called.
342        """
343        self._emit_state_message()
344
345    def stream_slices(self) -> Iterable[StreamSlice]:
346        """
347        Generating slices based on a few parameters:
348        * lookback_window: Buffer to remove from END_KEY of the highest slice
349        * slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created
350        * start: `_split_per_slice_range` will clip any value to `self._start which means that:
351          * if upper is less than self._start, no slices will be generated
352          * if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case)
353
354        Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be
355        inclusive in the API that is queried.
356        """
357        self._merge_partitions()
358
359        if self._start is not None and self._is_start_before_first_slice():
360            yield from self._split_per_slice_range(
361                self._start,
362                self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY],
363                False,
364            )
365
366        if len(self._concurrent_state["slices"]) == 1:
367            yield from self._split_per_slice_range(
368                self._calculate_lower_boundary_of_last_slice(
369                    self._concurrent_state["slices"][0][self._connector_state_converter.END_KEY]
370                ),
371                self._end_provider(),
372                True,
373            )
374        elif len(self._concurrent_state["slices"]) > 1:
375            for i in range(len(self._concurrent_state["slices"]) - 1):
376                if self._cursor_granularity:
377                    yield from self._split_per_slice_range(
378                        self._concurrent_state["slices"][i][self._connector_state_converter.END_KEY]
379                        + self._cursor_granularity,
380                        self._concurrent_state["slices"][i + 1][
381                            self._connector_state_converter.START_KEY
382                        ],
383                        False,
384                    )
385                else:
386                    yield from self._split_per_slice_range(
387                        self._concurrent_state["slices"][i][
388                            self._connector_state_converter.END_KEY
389                        ],
390                        self._concurrent_state["slices"][i + 1][
391                            self._connector_state_converter.START_KEY
392                        ],
393                        False,
394                    )
395            yield from self._split_per_slice_range(
396                self._calculate_lower_boundary_of_last_slice(
397                    self._concurrent_state["slices"][-1][self._connector_state_converter.END_KEY]
398                ),
399                self._end_provider(),
400                True,
401            )
402        else:
403            raise ValueError("Expected at least one slice")
404
405    def _is_start_before_first_slice(self) -> bool:
406        return (
407            self._start is not None
408            and self._start
409            < self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY]
410        )
411
412    def _calculate_lower_boundary_of_last_slice(
413        self, lower_boundary: CursorValueType
414    ) -> CursorValueType:
415        if self._lookback_window:
416            return lower_boundary - self._lookback_window
417        return lower_boundary
418
419    def _split_per_slice_range(
420        self, lower: CursorValueType, upper: CursorValueType, upper_is_end: bool
421    ) -> Iterable[StreamSlice]:
422        if lower >= upper:
423            return
424
425        if self._start and upper < self._start:
426            return
427
428        lower = max(lower, self._start) if self._start else lower
429        if not self._slice_range or self._evaluate_upper_safely(lower, self._slice_range) >= upper:
430            clamped_lower = self._clamping_strategy.clamp(lower)
431            clamped_upper = self._clamping_strategy.clamp(upper)
432            start_value, end_value = (
433                (clamped_lower, clamped_upper - self._cursor_granularity)
434                if self._cursor_granularity and not upper_is_end
435                else (clamped_lower, clamped_upper)
436            )
437            yield StreamSlice(
438                partition={},
439                cursor_slice={
440                    self._slice_boundary_fields_wrapper[
441                        self._START_BOUNDARY
442                    ]: self._connector_state_converter.output_format(start_value),
443                    self._slice_boundary_fields_wrapper[
444                        self._END_BOUNDARY
445                    ]: self._connector_state_converter.output_format(end_value),
446                },
447            )
448        else:
449            stop_processing = False
450            current_lower_boundary = lower
451            while not stop_processing:
452                current_upper_boundary = min(
453                    self._evaluate_upper_safely(current_lower_boundary, self._slice_range), upper
454                )
455                has_reached_upper_boundary = current_upper_boundary >= upper
456
457                clamped_upper = (
458                    self._clamping_strategy.clamp(current_upper_boundary)
459                    if current_upper_boundary != upper
460                    else current_upper_boundary
461                )
462                clamped_lower = self._clamping_strategy.clamp(current_lower_boundary)
463                if clamped_lower >= clamped_upper:
464                    # clamping collapsed both values which means that it is time to stop processing
465                    # FIXME should this be replace by proper end_provider
466                    break
467                start_value, end_value = (
468                    (clamped_lower, clamped_upper - self._cursor_granularity)
469                    if self._cursor_granularity
470                    and (not upper_is_end or not has_reached_upper_boundary)
471                    else (clamped_lower, clamped_upper)
472                )
473                yield StreamSlice(
474                    partition={},
475                    cursor_slice={
476                        self._slice_boundary_fields_wrapper[
477                            self._START_BOUNDARY
478                        ]: self._connector_state_converter.output_format(start_value),
479                        self._slice_boundary_fields_wrapper[
480                            self._END_BOUNDARY
481                        ]: self._connector_state_converter.output_format(end_value),
482                    },
483                )
484                current_lower_boundary = clamped_upper
485                if current_upper_boundary >= upper:
486                    stop_processing = True
487
488    def _evaluate_upper_safely(self, lower: CursorValueType, step: GapType) -> CursorValueType:
489        """
490        Given that we set the default step at datetime.timedelta.max, we will generate an OverflowError when evaluating the next start_date
491        This method assumes that users would never enter a step that would generate an overflow. Given that would be the case, the code
492        would have broken anyway.
493        """
494        try:
495            return lower + step
496        except OverflowError:
497            return self._end_provider()
498
499    def should_be_synced(self, record: Record) -> bool:
500        """
501        Determines if a record should be synced based on its cursor value.
502        :param record: The record to evaluate
503
504        :return: True if the record's cursor value falls within the sync boundaries
505        """
506        try:
507            record_cursor_value: CursorValueType = self._extract_cursor_value(record)
508        except ValueError:
509            self._log_for_record_without_cursor_value()
510            return True
511        return self.start <= record_cursor_value <= self._end_provider()
512
513    def _log_for_record_without_cursor_value(self) -> None:
514        if not self._should_be_synced_logger_triggered:
515            LOGGER.warning(
516                f"Could not find cursor field `{self.cursor_field.cursor_field_key}` in record for stream {self._stream_name}. The incremental sync will assume it needs to be synced"
517            )
518            self._should_be_synced_logger_triggered = True
LOGGER = <Logger airbyte (INFO)>
class CursorField:
41class CursorField:
42    def __init__(self, cursor_field_key: str) -> None:
43        self.cursor_field_key = cursor_field_key
44
45    def extract_value(self, record: Record) -> CursorValueType:
46        cursor_value = record.data.get(self.cursor_field_key)
47        if cursor_value is None:
48            raise ValueError(f"Could not find cursor field {self.cursor_field_key} in record")
49        return cursor_value  # type: ignore  # we assume that the value the path points at is a comparable
CursorField(cursor_field_key: str)
42    def __init__(self, cursor_field_key: str) -> None:
43        self.cursor_field_key = cursor_field_key
cursor_field_key
45    def extract_value(self, record: Record) -> CursorValueType:
46        cursor_value = record.data.get(self.cursor_field_key)
47        if cursor_value is None:
48            raise ValueError(f"Could not find cursor field {self.cursor_field_key} in record")
49        return cursor_value  # type: ignore  # we assume that the value the path points at is a comparable
52class Cursor(StreamSlicer, ABC):
53    @property
54    @abstractmethod
55    def state(self) -> MutableMapping[str, Any]: ...
56
57    @abstractmethod
58    def observe(self, record: Record) -> None:
59        """
60        Indicate to the cursor that the record has been emitted
61        """
62        raise NotImplementedError()
63
64    @abstractmethod
65    def close_partition(self, partition: Partition) -> None:
66        """
67        Indicate to the cursor that the partition has been successfully processed
68        """
69        raise NotImplementedError()
70
71    @abstractmethod
72    def ensure_at_least_one_state_emitted(self) -> None:
73        """
74        State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per
75        stream. Hence, if no partitions are generated, this method needs to be called.
76        """
77        raise NotImplementedError()
78
79    @abstractmethod
80    def should_be_synced(self, record: Record) -> bool:
81        pass
82
83    def stream_slices(self) -> Iterable[StreamSlice]:
84        """
85        Default placeholder implementation of generate_slices.
86        Subclasses can override this method to provide actual behavior.
87        """
88        yield StreamSlice(partition={}, cursor_slice={})

Slices the stream into chunks that can be fetched independently. Slices enable state checkpointing and data retrieval parallelization.

state: MutableMapping[str, Any]
53    @property
54    @abstractmethod
55    def state(self) -> MutableMapping[str, Any]: ...
@abstractmethod
def observe(self, record: airbyte_cdk.Record) -> None:
57    @abstractmethod
58    def observe(self, record: Record) -> None:
59        """
60        Indicate to the cursor that the record has been emitted
61        """
62        raise NotImplementedError()

Indicate to the cursor that the record has been emitted

@abstractmethod
def close_partition( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition) -> None:
64    @abstractmethod
65    def close_partition(self, partition: Partition) -> None:
66        """
67        Indicate to the cursor that the partition has been successfully processed
68        """
69        raise NotImplementedError()

Indicate to the cursor that the partition has been successfully processed

@abstractmethod
def ensure_at_least_one_state_emitted(self) -> None:
71    @abstractmethod
72    def ensure_at_least_one_state_emitted(self) -> None:
73        """
74        State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per
75        stream. Hence, if no partitions are generated, this method needs to be called.
76        """
77        raise NotImplementedError()

State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per stream. Hence, if no partitions are generated, this method needs to be called.

@abstractmethod
def should_be_synced(self, record: airbyte_cdk.Record) -> bool:
79    @abstractmethod
80    def should_be_synced(self, record: Record) -> bool:
81        pass
def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
83    def stream_slices(self) -> Iterable[StreamSlice]:
84        """
85        Default placeholder implementation of generate_slices.
86        Subclasses can override this method to provide actual behavior.
87        """
88        yield StreamSlice(partition={}, cursor_slice={})

Default placeholder implementation of generate_slices. Subclasses can override this method to provide actual behavior.

class FinalStateCursor(Cursor):
 91class FinalStateCursor(Cursor):
 92    """Cursor that is used to guarantee at least one state message is emitted for a concurrent stream."""
 93
 94    def __init__(
 95        self,
 96        stream_name: str,
 97        stream_namespace: Optional[str],
 98        message_repository: MessageRepository,
 99    ) -> None:
100        self._stream_name = stream_name
101        self._stream_namespace = stream_namespace
102        self._message_repository = message_repository
103        # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel
104        # state message rather than manage overall source state. This is also only temporary as we move to the resumable
105        # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state.
106        self._connector_state_manager = ConnectorStateManager()
107        self._has_closed_at_least_one_slice = False
108
109    @property
110    def state(self) -> MutableMapping[str, Any]:
111        return {NO_CURSOR_STATE_KEY: True}
112
113    def observe(self, record: Record) -> None:
114        pass
115
116    def close_partition(self, partition: Partition) -> None:
117        pass
118
119    def ensure_at_least_one_state_emitted(self) -> None:
120        """
121        Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync
122        """
123
124        self._connector_state_manager.update_state_for_stream(
125            self._stream_name, self._stream_namespace, self.state
126        )
127        state_message = self._connector_state_manager.create_state_message(
128            self._stream_name, self._stream_namespace
129        )
130        self._message_repository.emit_message(state_message)
131
132    def should_be_synced(self, record: Record) -> bool:
133        return True

Cursor that is used to guarantee at least one state message is emitted for a concurrent stream.

FinalStateCursor( stream_name: str, stream_namespace: Optional[str], message_repository: airbyte_cdk.MessageRepository)
 94    def __init__(
 95        self,
 96        stream_name: str,
 97        stream_namespace: Optional[str],
 98        message_repository: MessageRepository,
 99    ) -> None:
100        self._stream_name = stream_name
101        self._stream_namespace = stream_namespace
102        self._message_repository = message_repository
103        # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel
104        # state message rather than manage overall source state. This is also only temporary as we move to the resumable
105        # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state.
106        self._connector_state_manager = ConnectorStateManager()
107        self._has_closed_at_least_one_slice = False
state: MutableMapping[str, Any]
109    @property
110    def state(self) -> MutableMapping[str, Any]:
111        return {NO_CURSOR_STATE_KEY: True}
def observe(self, record: airbyte_cdk.Record) -> None:
113    def observe(self, record: Record) -> None:
114        pass

Indicate to the cursor that the record has been emitted

def close_partition( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition) -> None:
116    def close_partition(self, partition: Partition) -> None:
117        pass

Indicate to the cursor that the partition has been successfully processed

def ensure_at_least_one_state_emitted(self) -> None:
119    def ensure_at_least_one_state_emitted(self) -> None:
120        """
121        Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync
122        """
123
124        self._connector_state_manager.update_state_for_stream(
125            self._stream_name, self._stream_namespace, self.state
126        )
127        state_message = self._connector_state_manager.create_state_message(
128            self._stream_name, self._stream_namespace
129        )
130        self._message_repository.emit_message(state_message)

Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync

def should_be_synced(self, record: airbyte_cdk.Record) -> bool:
132    def should_be_synced(self, record: Record) -> bool:
133        return True
Inherited Members
Cursor
stream_slices
class ConcurrentCursor(Cursor):
136class ConcurrentCursor(Cursor):
137    _START_BOUNDARY = 0
138    _END_BOUNDARY = 1
139
140    def __init__(
141        self,
142        stream_name: str,
143        stream_namespace: Optional[str],
144        stream_state: Any,
145        message_repository: MessageRepository,
146        connector_state_manager: ConnectorStateManager,
147        connector_state_converter: AbstractStreamStateConverter,
148        cursor_field: CursorField,
149        slice_boundary_fields: Optional[Tuple[str, str]],
150        start: Optional[CursorValueType],
151        end_provider: Callable[[], CursorValueType],
152        lookback_window: Optional[GapType] = None,
153        slice_range: Optional[GapType] = None,
154        cursor_granularity: Optional[GapType] = None,
155        clamping_strategy: ClampingStrategy = NoClamping(),
156    ) -> None:
157        self._stream_name = stream_name
158        self._stream_namespace = stream_namespace
159        self._message_repository = message_repository
160        self._connector_state_converter = connector_state_converter
161        self._connector_state_manager = connector_state_manager
162        self._cursor_field = cursor_field
163        # To see some example where the slice boundaries might not be defined, check https://github.com/airbytehq/airbyte/blob/1ce84d6396e446e1ac2377362446e3fb94509461/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py#L363-L379
164        self._slice_boundary_fields = slice_boundary_fields
165        self._start = start
166        self._end_provider = end_provider
167        self.start, self._concurrent_state = self._get_concurrent_state(stream_state)
168        self._lookback_window = lookback_window
169        self._slice_range = slice_range
170        self._most_recent_cursor_value_per_partition: MutableMapping[
171            Union[StreamSlice, Mapping[str, Any], None], Any
172        ] = {}
173        self._has_closed_at_least_one_slice = False
174        self._cursor_granularity = cursor_granularity
175        # Flag to track if the logger has been triggered (per stream)
176        self._should_be_synced_logger_triggered = False
177        self._clamping_strategy = clamping_strategy
178
179        # A lock is required when closing a partition because updating the cursor's concurrent_state is
180        # not thread safe. When multiple partitions are being closed by the cursor at the same time, it is
181        # possible for one partition to update concurrent_state after a second partition has already read
182        # the previous state. This can lead to the second partition overwriting the previous one's state.
183        self._lock = threading.Lock()
184
185    @property
186    def state(self) -> MutableMapping[str, Any]:
187        return self._connector_state_converter.convert_to_state_message(
188            self.cursor_field, self._concurrent_state
189        )
190
191    @property
192    def cursor_field(self) -> CursorField:
193        return self._cursor_field
194
195    @property
196    def _slice_boundary_fields_wrapper(self) -> Tuple[str, str]:
197        return (
198            self._slice_boundary_fields
199            if self._slice_boundary_fields
200            else (
201                self._connector_state_converter.START_KEY,
202                self._connector_state_converter.END_KEY,
203            )
204        )
205
206    def _get_concurrent_state(
207        self, state: MutableMapping[str, Any]
208    ) -> Tuple[CursorValueType, MutableMapping[str, Any]]:
209        if self._connector_state_converter.is_state_message_compatible(state):
210            partitioned_state = self._connector_state_converter.deserialize(state)
211            slices_from_partitioned_state = partitioned_state.get("slices", [])
212
213            value_from_partitioned_state = None
214            if slices_from_partitioned_state:
215                # We assume here that the slices have been already merged
216                first_slice = slices_from_partitioned_state[0]
217                value_from_partitioned_state = (
218                    first_slice[self._connector_state_converter.MOST_RECENT_RECORD_KEY]
219                    if self._connector_state_converter.MOST_RECENT_RECORD_KEY in first_slice
220                    else first_slice[self._connector_state_converter.END_KEY]
221                )
222            return (
223                value_from_partitioned_state
224                or self._start
225                or self._connector_state_converter.zero_value,
226                partitioned_state,
227            )
228        return self._connector_state_converter.convert_from_sequential_state(
229            self._cursor_field, state, self._start
230        )
231
232    def observe(self, record: Record) -> None:
233        # Because observe writes to the most_recent_cursor_value_per_partition mapping,
234        # it is not thread-safe. However, this shouldn't lead to concurrency issues because
235        # observe() is only invoked by PartitionReader.process_partition(). Since the map is
236        # broken down according to partition, concurrent threads processing only read/write
237        # from different keys which avoids any conflicts.
238        #
239        # If we were to add thread safety, we should implement a lock per-partition
240        # which is instantiated during stream_slices()
241        most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get(
242            record.associated_slice
243        )
244        try:
245            cursor_value = self._extract_cursor_value(record)
246
247            if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value:
248                self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value
249        except ValueError:
250            self._log_for_record_without_cursor_value()
251
252    def _extract_cursor_value(self, record: Record) -> Any:
253        return self._connector_state_converter.parse_value(self._cursor_field.extract_value(record))
254
255    def close_partition(self, partition: Partition) -> None:
256        with self._lock:
257            slice_count_before = len(self._concurrent_state.get("slices", []))
258            self._add_slice_to_state(partition)
259            if slice_count_before < len(
260                self._concurrent_state["slices"]
261            ):  # only emit if at least one slice has been processed
262                self._merge_partitions()
263                self._emit_state_message()
264        self._has_closed_at_least_one_slice = True
265
266    def _add_slice_to_state(self, partition: Partition) -> None:
267        most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get(
268            partition.to_slice()
269        )
270
271        if self._slice_boundary_fields:
272            if "slices" not in self._concurrent_state:
273                raise RuntimeError(
274                    f"The state for stream {self._stream_name} should have at least one slice to delineate the sync start time, but no slices are present. This is unexpected. Please contact Support."
275                )
276            self._concurrent_state["slices"].append(
277                {
278                    self._connector_state_converter.START_KEY: self._extract_from_slice(
279                        partition, self._slice_boundary_fields[self._START_BOUNDARY]
280                    ),
281                    self._connector_state_converter.END_KEY: self._extract_from_slice(
282                        partition, self._slice_boundary_fields[self._END_BOUNDARY]
283                    ),
284                    self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value,
285                }
286            )
287        elif most_recent_cursor_value:
288            if self._has_closed_at_least_one_slice:
289                # If we track state value using records cursor field, we can only do that if there is one partition. This is because we save
290                # the state every time we close a partition. We assume that if there are multiple slices, they need to be providing
291                # boundaries. There are cases where partitions could not have boundaries:
292                # * The cursor should be per-partition
293                # * The stream state is actually the parent stream state
294                # There might be other cases not listed above. Those are not supported today hence the stream should not use this cursor for
295                # state management. For the specific user that was affected with this issue, we need to:
296                # * Fix state tracking (which is currently broken)
297                # * Make the new version available
298                # * (Probably) ask the user to reset the stream to avoid data loss
299                raise ValueError(
300                    "Given that slice_boundary_fields is not defined and that per-partition state is not supported, only one slice is "
301                    "expected. Please contact the Airbyte team."
302                )
303
304            self._concurrent_state["slices"].append(
305                {
306                    self._connector_state_converter.START_KEY: self.start,
307                    self._connector_state_converter.END_KEY: most_recent_cursor_value,
308                    self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value,
309                }
310            )
311
312    def _emit_state_message(self) -> None:
313        self._connector_state_manager.update_state_for_stream(
314            self._stream_name,
315            self._stream_namespace,
316            self.state,
317        )
318        state_message = self._connector_state_manager.create_state_message(
319            self._stream_name, self._stream_namespace
320        )
321        self._message_repository.emit_message(state_message)
322
323    def _merge_partitions(self) -> None:
324        self._concurrent_state["slices"] = self._connector_state_converter.merge_intervals(
325            self._concurrent_state["slices"]
326        )
327
328    def _extract_from_slice(self, partition: Partition, key: str) -> CursorValueType:
329        try:
330            _slice = partition.to_slice()
331            if not _slice:
332                raise KeyError(f"Could not find key `{key}` in empty slice")
333            return self._connector_state_converter.parse_value(_slice[key])  # type: ignore  # we expect the devs to specify a key that would return a CursorValueType
334        except KeyError as exception:
335            raise KeyError(
336                f"Partition is expected to have key `{key}` but could not be found"
337            ) from exception
338
339    def ensure_at_least_one_state_emitted(self) -> None:
340        """
341        The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be
342        called.
343        """
344        self._emit_state_message()
345
346    def stream_slices(self) -> Iterable[StreamSlice]:
347        """
348        Generating slices based on a few parameters:
349        * lookback_window: Buffer to remove from END_KEY of the highest slice
350        * slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created
351        * start: `_split_per_slice_range` will clip any value to `self._start which means that:
352          * if upper is less than self._start, no slices will be generated
353          * if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case)
354
355        Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be
356        inclusive in the API that is queried.
357        """
358        self._merge_partitions()
359
360        if self._start is not None and self._is_start_before_first_slice():
361            yield from self._split_per_slice_range(
362                self._start,
363                self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY],
364                False,
365            )
366
367        if len(self._concurrent_state["slices"]) == 1:
368            yield from self._split_per_slice_range(
369                self._calculate_lower_boundary_of_last_slice(
370                    self._concurrent_state["slices"][0][self._connector_state_converter.END_KEY]
371                ),
372                self._end_provider(),
373                True,
374            )
375        elif len(self._concurrent_state["slices"]) > 1:
376            for i in range(len(self._concurrent_state["slices"]) - 1):
377                if self._cursor_granularity:
378                    yield from self._split_per_slice_range(
379                        self._concurrent_state["slices"][i][self._connector_state_converter.END_KEY]
380                        + self._cursor_granularity,
381                        self._concurrent_state["slices"][i + 1][
382                            self._connector_state_converter.START_KEY
383                        ],
384                        False,
385                    )
386                else:
387                    yield from self._split_per_slice_range(
388                        self._concurrent_state["slices"][i][
389                            self._connector_state_converter.END_KEY
390                        ],
391                        self._concurrent_state["slices"][i + 1][
392                            self._connector_state_converter.START_KEY
393                        ],
394                        False,
395                    )
396            yield from self._split_per_slice_range(
397                self._calculate_lower_boundary_of_last_slice(
398                    self._concurrent_state["slices"][-1][self._connector_state_converter.END_KEY]
399                ),
400                self._end_provider(),
401                True,
402            )
403        else:
404            raise ValueError("Expected at least one slice")
405
406    def _is_start_before_first_slice(self) -> bool:
407        return (
408            self._start is not None
409            and self._start
410            < self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY]
411        )
412
413    def _calculate_lower_boundary_of_last_slice(
414        self, lower_boundary: CursorValueType
415    ) -> CursorValueType:
416        if self._lookback_window:
417            return lower_boundary - self._lookback_window
418        return lower_boundary
419
420    def _split_per_slice_range(
421        self, lower: CursorValueType, upper: CursorValueType, upper_is_end: bool
422    ) -> Iterable[StreamSlice]:
423        if lower >= upper:
424            return
425
426        if self._start and upper < self._start:
427            return
428
429        lower = max(lower, self._start) if self._start else lower
430        if not self._slice_range or self._evaluate_upper_safely(lower, self._slice_range) >= upper:
431            clamped_lower = self._clamping_strategy.clamp(lower)
432            clamped_upper = self._clamping_strategy.clamp(upper)
433            start_value, end_value = (
434                (clamped_lower, clamped_upper - self._cursor_granularity)
435                if self._cursor_granularity and not upper_is_end
436                else (clamped_lower, clamped_upper)
437            )
438            yield StreamSlice(
439                partition={},
440                cursor_slice={
441                    self._slice_boundary_fields_wrapper[
442                        self._START_BOUNDARY
443                    ]: self._connector_state_converter.output_format(start_value),
444                    self._slice_boundary_fields_wrapper[
445                        self._END_BOUNDARY
446                    ]: self._connector_state_converter.output_format(end_value),
447                },
448            )
449        else:
450            stop_processing = False
451            current_lower_boundary = lower
452            while not stop_processing:
453                current_upper_boundary = min(
454                    self._evaluate_upper_safely(current_lower_boundary, self._slice_range), upper
455                )
456                has_reached_upper_boundary = current_upper_boundary >= upper
457
458                clamped_upper = (
459                    self._clamping_strategy.clamp(current_upper_boundary)
460                    if current_upper_boundary != upper
461                    else current_upper_boundary
462                )
463                clamped_lower = self._clamping_strategy.clamp(current_lower_boundary)
464                if clamped_lower >= clamped_upper:
465                    # clamping collapsed both values which means that it is time to stop processing
466                    # FIXME should this be replace by proper end_provider
467                    break
468                start_value, end_value = (
469                    (clamped_lower, clamped_upper - self._cursor_granularity)
470                    if self._cursor_granularity
471                    and (not upper_is_end or not has_reached_upper_boundary)
472                    else (clamped_lower, clamped_upper)
473                )
474                yield StreamSlice(
475                    partition={},
476                    cursor_slice={
477                        self._slice_boundary_fields_wrapper[
478                            self._START_BOUNDARY
479                        ]: self._connector_state_converter.output_format(start_value),
480                        self._slice_boundary_fields_wrapper[
481                            self._END_BOUNDARY
482                        ]: self._connector_state_converter.output_format(end_value),
483                    },
484                )
485                current_lower_boundary = clamped_upper
486                if current_upper_boundary >= upper:
487                    stop_processing = True
488
489    def _evaluate_upper_safely(self, lower: CursorValueType, step: GapType) -> CursorValueType:
490        """
491        Given that we set the default step at datetime.timedelta.max, we will generate an OverflowError when evaluating the next start_date
492        This method assumes that users would never enter a step that would generate an overflow. Given that would be the case, the code
493        would have broken anyway.
494        """
495        try:
496            return lower + step
497        except OverflowError:
498            return self._end_provider()
499
500    def should_be_synced(self, record: Record) -> bool:
501        """
502        Determines if a record should be synced based on its cursor value.
503        :param record: The record to evaluate
504
505        :return: True if the record's cursor value falls within the sync boundaries
506        """
507        try:
508            record_cursor_value: CursorValueType = self._extract_cursor_value(record)
509        except ValueError:
510            self._log_for_record_without_cursor_value()
511            return True
512        return self.start <= record_cursor_value <= self._end_provider()
513
514    def _log_for_record_without_cursor_value(self) -> None:
515        if not self._should_be_synced_logger_triggered:
516            LOGGER.warning(
517                f"Could not find cursor field `{self.cursor_field.cursor_field_key}` in record for stream {self._stream_name}. The incremental sync will assume it needs to be synced"
518            )
519            self._should_be_synced_logger_triggered = True

Slices the stream into chunks that can be fetched independently. Slices enable state checkpointing and data retrieval parallelization.

ConcurrentCursor( stream_name: str, stream_namespace: Optional[str], stream_state: Any, message_repository: airbyte_cdk.MessageRepository, connector_state_manager: airbyte_cdk.ConnectorStateManager, connector_state_converter: airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter.AbstractStreamStateConverter, cursor_field: CursorField, slice_boundary_fields: Optional[Tuple[str, str]], start: Optional[airbyte_cdk.sources.streams.concurrent.cursor_types.CursorValueType], end_provider: Callable[[], airbyte_cdk.sources.streams.concurrent.cursor_types.CursorValueType], lookback_window: Optional[airbyte_cdk.sources.streams.concurrent.cursor_types.GapType] = None, slice_range: Optional[airbyte_cdk.sources.streams.concurrent.cursor_types.GapType] = None, cursor_granularity: Optional[airbyte_cdk.sources.streams.concurrent.cursor_types.GapType] = None, clamping_strategy: airbyte_cdk.sources.streams.concurrent.clamping.ClampingStrategy = <airbyte_cdk.sources.streams.concurrent.clamping.NoClamping object>)
140    def __init__(
141        self,
142        stream_name: str,
143        stream_namespace: Optional[str],
144        stream_state: Any,
145        message_repository: MessageRepository,
146        connector_state_manager: ConnectorStateManager,
147        connector_state_converter: AbstractStreamStateConverter,
148        cursor_field: CursorField,
149        slice_boundary_fields: Optional[Tuple[str, str]],
150        start: Optional[CursorValueType],
151        end_provider: Callable[[], CursorValueType],
152        lookback_window: Optional[GapType] = None,
153        slice_range: Optional[GapType] = None,
154        cursor_granularity: Optional[GapType] = None,
155        clamping_strategy: ClampingStrategy = NoClamping(),
156    ) -> None:
157        self._stream_name = stream_name
158        self._stream_namespace = stream_namespace
159        self._message_repository = message_repository
160        self._connector_state_converter = connector_state_converter
161        self._connector_state_manager = connector_state_manager
162        self._cursor_field = cursor_field
163        # To see some example where the slice boundaries might not be defined, check https://github.com/airbytehq/airbyte/blob/1ce84d6396e446e1ac2377362446e3fb94509461/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py#L363-L379
164        self._slice_boundary_fields = slice_boundary_fields
165        self._start = start
166        self._end_provider = end_provider
167        self.start, self._concurrent_state = self._get_concurrent_state(stream_state)
168        self._lookback_window = lookback_window
169        self._slice_range = slice_range
170        self._most_recent_cursor_value_per_partition: MutableMapping[
171            Union[StreamSlice, Mapping[str, Any], None], Any
172        ] = {}
173        self._has_closed_at_least_one_slice = False
174        self._cursor_granularity = cursor_granularity
175        # Flag to track if the logger has been triggered (per stream)
176        self._should_be_synced_logger_triggered = False
177        self._clamping_strategy = clamping_strategy
178
179        # A lock is required when closing a partition because updating the cursor's concurrent_state is
180        # not thread safe. When multiple partitions are being closed by the cursor at the same time, it is
181        # possible for one partition to update concurrent_state after a second partition has already read
182        # the previous state. This can lead to the second partition overwriting the previous one's state.
183        self._lock = threading.Lock()
state: MutableMapping[str, Any]
185    @property
186    def state(self) -> MutableMapping[str, Any]:
187        return self._connector_state_converter.convert_to_state_message(
188            self.cursor_field, self._concurrent_state
189        )
cursor_field: CursorField
191    @property
192    def cursor_field(self) -> CursorField:
193        return self._cursor_field
def observe(self, record: airbyte_cdk.Record) -> None:
232    def observe(self, record: Record) -> None:
233        # Because observe writes to the most_recent_cursor_value_per_partition mapping,
234        # it is not thread-safe. However, this shouldn't lead to concurrency issues because
235        # observe() is only invoked by PartitionReader.process_partition(). Since the map is
236        # broken down according to partition, concurrent threads processing only read/write
237        # from different keys which avoids any conflicts.
238        #
239        # If we were to add thread safety, we should implement a lock per-partition
240        # which is instantiated during stream_slices()
241        most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get(
242            record.associated_slice
243        )
244        try:
245            cursor_value = self._extract_cursor_value(record)
246
247            if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value:
248                self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value
249        except ValueError:
250            self._log_for_record_without_cursor_value()

Indicate to the cursor that the record has been emitted

def close_partition( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition) -> None:
255    def close_partition(self, partition: Partition) -> None:
256        with self._lock:
257            slice_count_before = len(self._concurrent_state.get("slices", []))
258            self._add_slice_to_state(partition)
259            if slice_count_before < len(
260                self._concurrent_state["slices"]
261            ):  # only emit if at least one slice has been processed
262                self._merge_partitions()
263                self._emit_state_message()
264        self._has_closed_at_least_one_slice = True

Indicate to the cursor that the partition has been successfully processed

def ensure_at_least_one_state_emitted(self) -> None:
339    def ensure_at_least_one_state_emitted(self) -> None:
340        """
341        The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be
342        called.
343        """
344        self._emit_state_message()

The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be called.

def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
346    def stream_slices(self) -> Iterable[StreamSlice]:
347        """
348        Generating slices based on a few parameters:
349        * lookback_window: Buffer to remove from END_KEY of the highest slice
350        * slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created
351        * start: `_split_per_slice_range` will clip any value to `self._start which means that:
352          * if upper is less than self._start, no slices will be generated
353          * if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case)
354
355        Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be
356        inclusive in the API that is queried.
357        """
358        self._merge_partitions()
359
360        if self._start is not None and self._is_start_before_first_slice():
361            yield from self._split_per_slice_range(
362                self._start,
363                self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY],
364                False,
365            )
366
367        if len(self._concurrent_state["slices"]) == 1:
368            yield from self._split_per_slice_range(
369                self._calculate_lower_boundary_of_last_slice(
370                    self._concurrent_state["slices"][0][self._connector_state_converter.END_KEY]
371                ),
372                self._end_provider(),
373                True,
374            )
375        elif len(self._concurrent_state["slices"]) > 1:
376            for i in range(len(self._concurrent_state["slices"]) - 1):
377                if self._cursor_granularity:
378                    yield from self._split_per_slice_range(
379                        self._concurrent_state["slices"][i][self._connector_state_converter.END_KEY]
380                        + self._cursor_granularity,
381                        self._concurrent_state["slices"][i + 1][
382                            self._connector_state_converter.START_KEY
383                        ],
384                        False,
385                    )
386                else:
387                    yield from self._split_per_slice_range(
388                        self._concurrent_state["slices"][i][
389                            self._connector_state_converter.END_KEY
390                        ],
391                        self._concurrent_state["slices"][i + 1][
392                            self._connector_state_converter.START_KEY
393                        ],
394                        False,
395                    )
396            yield from self._split_per_slice_range(
397                self._calculate_lower_boundary_of_last_slice(
398                    self._concurrent_state["slices"][-1][self._connector_state_converter.END_KEY]
399                ),
400                self._end_provider(),
401                True,
402            )
403        else:
404            raise ValueError("Expected at least one slice")

Generating slices based on a few parameters:

  • lookback_window: Buffer to remove from END_KEY of the highest slice
  • slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created
  • start: _split_per_slice_range will clip any value to `self._start which means that:
    • if upper is less than self._start, no slices will be generated
    • if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case)

Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be inclusive in the API that is queried.

def should_be_synced(self, record: airbyte_cdk.Record) -> bool:
500    def should_be_synced(self, record: Record) -> bool:
501        """
502        Determines if a record should be synced based on its cursor value.
503        :param record: The record to evaluate
504
505        :return: True if the record's cursor value falls within the sync boundaries
506        """
507        try:
508            record_cursor_value: CursorValueType = self._extract_cursor_value(record)
509        except ValueError:
510            self._log_for_record_without_cursor_value()
511            return True
512        return self.start <= record_cursor_value <= self._end_provider()

Determines if a record should be synced based on its cursor value.

Parameters
  • record: The record to evaluate
Returns

True if the record's cursor value falls within the sync boundaries