airbyte_cdk.sources.streams.concurrent.cursor

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5import datetime
  6import functools
  7import logging
  8import threading
  9from abc import ABC, abstractmethod
 10from typing import (
 11    Any,
 12    Callable,
 13    Iterable,
 14    List,
 15    Mapping,
 16    MutableMapping,
 17    Optional,
 18    Tuple,
 19    Union,
 20)
 21
 22from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
 23from airbyte_cdk.sources.message import MessageRepository, NoopMessageRepository
 24from airbyte_cdk.sources.streams import NO_CURSOR_STATE_KEY
 25from airbyte_cdk.sources.streams.concurrent.clamping import ClampingStrategy, NoClamping
 26from airbyte_cdk.sources.streams.concurrent.cursor_types import CursorValueType, GapType
 27from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
 28from airbyte_cdk.sources.streams.concurrent.partitions.stream_slicer import StreamSlicer
 29from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import (
 30    AbstractStreamStateConverter,
 31)
 32from airbyte_cdk.sources.types import Record, StreamSlice
 33
 34LOGGER = logging.getLogger("airbyte")
 35
 36
 37def _extract_value(mapping: Mapping[str, Any], path: List[str]) -> Any:
 38    return functools.reduce(lambda a, b: a[b], path, mapping)
 39
 40
 41class CursorField:
 42    def __init__(
 43        self, cursor_field_key: str, supports_catalog_defined_cursor_field: bool = False
 44    ) -> None:
 45        self.cursor_field_key = cursor_field_key
 46        self.supports_catalog_defined_cursor_field = supports_catalog_defined_cursor_field
 47
 48    def extract_value(self, record: Record) -> Any:
 49        cursor_value = record.data.get(self.cursor_field_key)
 50        if cursor_value is None:
 51            raise ValueError(f"Could not find cursor field {self.cursor_field_key} in record")
 52        return cursor_value  # type: ignore  # we assume that the value the path points at is a comparable
 53
 54
 55class Cursor(StreamSlicer, ABC):
 56    @property
 57    @abstractmethod
 58    def state(self) -> MutableMapping[str, Any]: ...
 59
 60    @abstractmethod
 61    def observe(self, record: Record) -> None:
 62        """
 63        Indicate to the cursor that the record has been emitted
 64        """
 65        raise NotImplementedError()
 66
 67    @abstractmethod
 68    def close_partition(self, partition: Partition) -> None:
 69        """
 70        Indicate to the cursor that the partition has been successfully processed
 71        """
 72        raise NotImplementedError()
 73
 74    @abstractmethod
 75    def ensure_at_least_one_state_emitted(self) -> None:
 76        """
 77        State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per
 78        stream. Hence, if no partitions are generated, this method needs to be called.
 79        """
 80        raise NotImplementedError()
 81
 82    @abstractmethod
 83    def should_be_synced(self, record: Record) -> bool:
 84        pass
 85
 86    def stream_slices(self) -> Iterable[StreamSlice]:
 87        """
 88        Default placeholder implementation of generate_slices.
 89        Subclasses can override this method to provide actual behavior.
 90        """
 91        yield StreamSlice(partition={}, cursor_slice={})
 92
 93    def get_cursor_datetime_from_state(
 94        self, stream_state: Mapping[str, Any]
 95    ) -> datetime.datetime | None:
 96        """Extract and parse the cursor datetime from the given stream state.
 97
 98        This method is used by StateDelegatingStream to validate cursor age against
 99        an API's data retention period. Subclasses should implement this method to
100        extract the cursor value from their specific state structure and parse it
101        into a datetime object.
102
103        Returns None if the cursor cannot be extracted or parsed, which will cause
104        StateDelegatingStream to fall back to full refresh (safe default).
105
106        Raises NotImplementedError by default - subclasses must implement this method
107        if they want to support cursor age validation with api_retention_period.
108        """
109        raise NotImplementedError(
110            f"{self.__class__.__name__} does not implement get_cursor_datetime_from_state. "
111            f"Cursor age validation with api_retention_period is not supported for this cursor type."
112        )
113
114
115class FinalStateCursor(Cursor):
116    """Cursor that is used to guarantee at least one state message is emitted for a concurrent stream."""
117
118    def __init__(
119        self,
120        stream_name: str,
121        stream_namespace: Optional[str],
122        message_repository: MessageRepository,
123    ) -> None:
124        self._stream_name = stream_name
125        self._stream_namespace = stream_namespace
126        self._message_repository = message_repository
127        # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel
128        # state message rather than manage overall source state. This is also only temporary as we move to the resumable
129        # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state.
130        self._connector_state_manager = ConnectorStateManager()
131        self._has_closed_at_least_one_slice = False
132
133    @property
134    def state(self) -> MutableMapping[str, Any]:
135        return {NO_CURSOR_STATE_KEY: True}
136
137    def observe(self, record: Record) -> None:
138        pass
139
140    def close_partition(self, partition: Partition) -> None:
141        pass
142
143    def ensure_at_least_one_state_emitted(self) -> None:
144        """
145        Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync
146        """
147
148        self._connector_state_manager.update_state_for_stream(
149            self._stream_name, self._stream_namespace, self.state
150        )
151        state_message = self._connector_state_manager.create_state_message(
152            self._stream_name, self._stream_namespace
153        )
154        self._message_repository.emit_message(state_message)
155
156    def should_be_synced(self, record: Record) -> bool:
157        return True
158
159    def get_cursor_datetime_from_state(
160        self, stream_state: Mapping[str, Any]
161    ) -> datetime.datetime | None:
162        """Return now() if state indicates a completed full refresh, else None.
163
164        When the state has NO_CURSOR_STATE_KEY: True, it means the previous sync was a
165        completed full refresh. Returning now() indicates the cursor is "current" and
166        within any retention period, so we should use incremental sync.
167
168        For any other state format, return None to indicate this cursor cannot parse it,
169        allowing the incremental cursor to handle the state instead.
170        """
171        if stream_state.get(NO_CURSOR_STATE_KEY):
172            return datetime.datetime.now(datetime.timezone.utc)
173        return None
174
175
176class ConcurrentCursor(Cursor):
177    _START_BOUNDARY = 0
178    _END_BOUNDARY = 1
179
180    def copy_without_state(self) -> "ConcurrentCursor":
181        return self.__class__(
182            stream_name=self._stream_name,
183            stream_namespace=self._stream_namespace,
184            stream_state={},
185            message_repository=NoopMessageRepository(),
186            connector_state_manager=ConnectorStateManager(),
187            connector_state_converter=self._connector_state_converter,
188            cursor_field=self._cursor_field,
189            slice_boundary_fields=self._slice_boundary_fields,
190            start=self._start,
191            end_provider=self._end_provider,
192            lookback_window=self._lookback_window,
193            slice_range=self._slice_range,
194            cursor_granularity=self._cursor_granularity,
195            clamping_strategy=self._clamping_strategy,
196        )
197
198    def __init__(
199        self,
200        stream_name: str,
201        stream_namespace: Optional[str],
202        stream_state: Any,
203        message_repository: MessageRepository,
204        connector_state_manager: ConnectorStateManager,
205        connector_state_converter: AbstractStreamStateConverter,
206        cursor_field: CursorField,
207        slice_boundary_fields: Optional[Tuple[str, str]],
208        start: Optional[CursorValueType],
209        end_provider: Callable[[], CursorValueType],
210        lookback_window: Optional[GapType] = None,
211        slice_range: Optional[GapType] = None,
212        cursor_granularity: Optional[GapType] = None,
213        clamping_strategy: ClampingStrategy = NoClamping(),
214    ) -> None:
215        self._stream_name = stream_name
216        self._stream_namespace = stream_namespace
217        self._message_repository = message_repository
218        self._connector_state_converter = connector_state_converter
219        self._connector_state_manager = connector_state_manager
220        self._cursor_field = cursor_field
221        # To see some example where the slice boundaries might not be defined, check https://github.com/airbytehq/airbyte/blob/1ce84d6396e446e1ac2377362446e3fb94509461/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py#L363-L379
222        self._slice_boundary_fields = slice_boundary_fields
223        self._start = start
224        self._end_provider = end_provider
225        self.start, self._concurrent_state = self._get_concurrent_state(stream_state)
226        self._lookback_window = lookback_window
227        self._slice_range = slice_range
228        self._most_recent_cursor_value_per_partition: MutableMapping[
229            Union[StreamSlice, Mapping[str, Any], None], Any
230        ] = {}
231        self._has_closed_at_least_one_slice = False
232        self._cursor_granularity = cursor_granularity
233        # Flag to track if the logger has been triggered (per stream)
234        self._should_be_synced_logger_triggered = False
235        self._clamping_strategy = clamping_strategy
236        self._is_ascending_order = True
237
238        # A lock is required when closing a partition because updating the cursor's concurrent_state is
239        # not thread safe. When multiple partitions are being closed by the cursor at the same time, it is
240        # possible for one partition to update concurrent_state after a second partition has already read
241        # the previous state. This can lead to the second partition overwriting the previous one's state.
242        self._lock = threading.Lock()
243
244    @property
245    def state(self) -> MutableMapping[str, Any]:
246        return self._connector_state_converter.convert_to_state_message(
247            self.cursor_field, self._concurrent_state
248        )
249
250    @property
251    def cursor_field(self) -> CursorField:
252        return self._cursor_field
253
254    @property
255    def _slice_boundary_fields_wrapper(self) -> Tuple[str, str]:
256        return (
257            self._slice_boundary_fields
258            if self._slice_boundary_fields
259            else (
260                self._connector_state_converter.START_KEY,
261                self._connector_state_converter.END_KEY,
262            )
263        )
264
265    def _get_concurrent_state(
266        self, state: MutableMapping[str, Any]
267    ) -> Tuple[CursorValueType, MutableMapping[str, Any]]:
268        if self._connector_state_converter.is_state_message_compatible(state):
269            partitioned_state = self._connector_state_converter.deserialize(state)
270            slices_from_partitioned_state = partitioned_state.get("slices", [])
271
272            value_from_partitioned_state = None
273            if slices_from_partitioned_state:
274                # We assume here that the slices have been already merged
275                first_slice = slices_from_partitioned_state[0]
276                value_from_partitioned_state = (
277                    first_slice[self._connector_state_converter.MOST_RECENT_RECORD_KEY]
278                    if self._connector_state_converter.MOST_RECENT_RECORD_KEY in first_slice
279                    else first_slice[self._connector_state_converter.END_KEY]
280                )
281            return (
282                value_from_partitioned_state
283                or self._start
284                or self._connector_state_converter.zero_value,
285                partitioned_state,
286            )
287        return self._connector_state_converter.convert_from_sequential_state(
288            self._cursor_field, state, self._start
289        )
290
291    def observe(self, record: Record) -> None:
292        # Because observe writes to the most_recent_cursor_value_per_partition mapping,
293        # it is not thread-safe. However, this shouldn't lead to concurrency issues because
294        # observe() is only invoked by PartitionReader.process_partition(). Since the map is
295        # broken down according to partition, concurrent threads processing only read/write
296        # from different keys which avoids any conflicts.
297        #
298        # If we were to add thread safety, we should implement a lock per-partition
299        # which is instantiated during stream_slices()
300        most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get(
301            record.associated_slice
302        )
303        try:
304            cursor_value = self._extract_cursor_value(record)
305
306            if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value:
307                self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value
308            elif most_recent_cursor_value > cursor_value:
309                self._is_ascending_order = False
310        except ValueError:
311            self._log_for_record_without_cursor_value()
312
313    def _extract_cursor_value(self, record: Record) -> Any:
314        return self._connector_state_converter.parse_value(self._cursor_field.extract_value(record))
315
316    def close_partition(self, partition: Partition) -> None:
317        with self._lock:
318            slice_count_before = len(self._concurrent_state.get("slices", []))
319            self._add_slice_to_state(partition)
320            if slice_count_before < len(
321                self._concurrent_state["slices"]
322            ):  # only emit if at least one slice has been processed
323                self._merge_partitions()
324                self._emit_state_message()
325        self._has_closed_at_least_one_slice = True
326
327    def _add_slice_to_state(self, partition: Partition) -> None:
328        most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get(
329            partition.to_slice()
330        )
331
332        if self._slice_boundary_fields:
333            if "slices" not in self._concurrent_state:
334                raise RuntimeError(
335                    f"The state for stream {self._stream_name} should have at least one slice to delineate the sync start time, but no slices are present. This is unexpected. Please contact Support."
336                )
337            self._concurrent_state["slices"].append(
338                {
339                    self._connector_state_converter.START_KEY: self._extract_from_slice(
340                        partition, self._slice_boundary_fields[self._START_BOUNDARY]
341                    ),
342                    self._connector_state_converter.END_KEY: self._extract_from_slice(
343                        partition, self._slice_boundary_fields[self._END_BOUNDARY]
344                    ),
345                    self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value,
346                }
347            )
348        elif most_recent_cursor_value:
349            if self._has_closed_at_least_one_slice:
350                # If we track state value using records cursor field, we can only do that if there is one partition. This is because we save
351                # the state every time we close a partition. We assume that if there are multiple slices, they need to be providing
352                # boundaries. There are cases where partitions could not have boundaries:
353                # * The cursor should be per-partition
354                # * The stream state is actually the parent stream state
355                # There might be other cases not listed above. Those are not supported today hence the stream should not use this cursor for
356                # state management. For the specific user that was affected with this issue, we need to:
357                # * Fix state tracking (which is currently broken)
358                # * Make the new version available
359                # * (Probably) ask the user to reset the stream to avoid data loss
360                raise ValueError(
361                    "Given that slice_boundary_fields is not defined and that per-partition state is not supported, only one slice is "
362                    "expected. Please contact the Airbyte team."
363                )
364
365            self._concurrent_state["slices"].append(
366                {
367                    self._connector_state_converter.START_KEY: self.start,
368                    self._connector_state_converter.END_KEY: most_recent_cursor_value,
369                    self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value,
370                }
371            )
372
373    def _emit_state_message(self) -> None:
374        self._connector_state_manager.update_state_for_stream(
375            self._stream_name,
376            self._stream_namespace,
377            self.state,
378        )
379        state_message = self._connector_state_manager.create_state_message(
380            self._stream_name, self._stream_namespace
381        )
382        self._message_repository.emit_message(state_message)
383
384    def _merge_partitions(self) -> None:
385        self._concurrent_state["slices"] = self._connector_state_converter.merge_intervals(
386            self._concurrent_state["slices"]
387        )
388
389    def _extract_from_slice(self, partition: Partition, key: str) -> CursorValueType:
390        try:
391            _slice = partition.to_slice()
392            if not _slice:
393                raise KeyError(f"Could not find key `{key}` in empty slice")
394            return self._connector_state_converter.parse_value(_slice[key])  # type: ignore  # we expect the devs to specify a key that would return a CursorValueType
395        except KeyError as exception:
396            raise KeyError(
397                f"Partition is expected to have key `{key}` but could not be found"
398            ) from exception
399
400    def ensure_at_least_one_state_emitted(self) -> None:
401        """
402        The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be
403        called.
404        """
405        self._emit_state_message()
406
407    def stream_slices(self) -> Iterable[StreamSlice]:
408        """
409        Generating slices based on a few parameters:
410        * lookback_window: Buffer to remove from END_KEY of the highest slice
411        * slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created
412        * start: `_split_per_slice_range` will clip any value to `self._start which means that:
413          * if upper is less than self._start, no slices will be generated
414          * if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case)
415
416        Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be
417        inclusive in the API that is queried.
418        """
419        self._merge_partitions()
420
421        if self._start is not None and self._is_start_before_first_slice():
422            yield from self._split_per_slice_range(
423                self._start,
424                self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY],
425                False,
426            )
427
428        if len(self._concurrent_state["slices"]) == 1:
429            yield from self._split_per_slice_range(
430                self._calculate_lower_boundary_of_last_slice(
431                    self._concurrent_state["slices"][0][self._connector_state_converter.END_KEY]
432                ),
433                self._end_provider(),
434                True,
435            )
436        elif len(self._concurrent_state["slices"]) > 1:
437            for i in range(len(self._concurrent_state["slices"]) - 1):
438                if self._cursor_granularity:
439                    yield from self._split_per_slice_range(
440                        self._concurrent_state["slices"][i][self._connector_state_converter.END_KEY]
441                        + self._cursor_granularity,
442                        self._concurrent_state["slices"][i + 1][
443                            self._connector_state_converter.START_KEY
444                        ],
445                        False,
446                    )
447                else:
448                    yield from self._split_per_slice_range(
449                        self._concurrent_state["slices"][i][
450                            self._connector_state_converter.END_KEY
451                        ],
452                        self._concurrent_state["slices"][i + 1][
453                            self._connector_state_converter.START_KEY
454                        ],
455                        False,
456                    )
457            yield from self._split_per_slice_range(
458                self._calculate_lower_boundary_of_last_slice(
459                    self._concurrent_state["slices"][-1][self._connector_state_converter.END_KEY]
460                ),
461                self._end_provider(),
462                True,
463            )
464        else:
465            raise ValueError("Expected at least one slice")
466
467    def _is_start_before_first_slice(self) -> bool:
468        return (
469            self._start is not None
470            and self._start
471            < self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY]
472        )
473
474    def _calculate_lower_boundary_of_last_slice(
475        self, lower_boundary: CursorValueType
476    ) -> CursorValueType:
477        if self._lookback_window:
478            return lower_boundary - self._lookback_window
479        return lower_boundary
480
481    def _split_per_slice_range(
482        self, lower: CursorValueType, upper: CursorValueType, upper_is_end: bool
483    ) -> Iterable[StreamSlice]:
484        if lower >= upper:
485            return
486
487        if self._start and upper < self._start:
488            return
489
490        lower = max(lower, self._start) if self._start else lower
491        if not self._slice_range or self._evaluate_upper_safely(lower, self._slice_range) >= upper:
492            clamped_lower = self._clamping_strategy.clamp(lower)
493            clamped_upper = self._clamping_strategy.clamp(upper)
494            start_value, end_value = (
495                (clamped_lower, clamped_upper - self._cursor_granularity)
496                if self._cursor_granularity and not upper_is_end
497                else (clamped_lower, clamped_upper)
498            )
499            yield StreamSlice(
500                partition={},
501                cursor_slice={
502                    self._slice_boundary_fields_wrapper[
503                        self._START_BOUNDARY
504                    ]: self._connector_state_converter.output_format(start_value),
505                    self._slice_boundary_fields_wrapper[
506                        self._END_BOUNDARY
507                    ]: self._connector_state_converter.output_format(end_value),
508                },
509            )
510        else:
511            stop_processing = False
512            current_lower_boundary = lower
513            while not stop_processing:
514                current_upper_boundary = min(
515                    self._evaluate_upper_safely(current_lower_boundary, self._slice_range), upper
516                )
517                has_reached_upper_boundary = current_upper_boundary >= upper
518
519                clamped_upper = (
520                    self._clamping_strategy.clamp(current_upper_boundary)
521                    if current_upper_boundary != upper
522                    else current_upper_boundary
523                )
524                clamped_lower = self._clamping_strategy.clamp(current_lower_boundary)
525                if clamped_lower >= clamped_upper:
526                    # clamping collapsed both values which means that it is time to stop processing
527                    # FIXME should this be replace by proper end_provider
528                    break
529                start_value, end_value = (
530                    (clamped_lower, clamped_upper - self._cursor_granularity)
531                    if self._cursor_granularity
532                    and (not upper_is_end or not has_reached_upper_boundary)
533                    else (clamped_lower, clamped_upper)
534                )
535                yield StreamSlice(
536                    partition={},
537                    cursor_slice={
538                        self._slice_boundary_fields_wrapper[
539                            self._START_BOUNDARY
540                        ]: self._connector_state_converter.output_format(start_value),
541                        self._slice_boundary_fields_wrapper[
542                            self._END_BOUNDARY
543                        ]: self._connector_state_converter.output_format(end_value),
544                    },
545                )
546                current_lower_boundary = clamped_upper
547                if current_upper_boundary >= upper:
548                    stop_processing = True
549
550    def _evaluate_upper_safely(self, lower: CursorValueType, step: GapType) -> CursorValueType:
551        """
552        Given that we set the default step at datetime.timedelta.max, we will generate an OverflowError when evaluating the next start_date
553        This method assumes that users would never enter a step that would generate an overflow. Given that would be the case, the code
554        would have broken anyway.
555        """
556        try:
557            return lower + step
558        except OverflowError:
559            return self._end_provider()
560
561    def should_be_synced(self, record: Record) -> bool:
562        """
563        Determines if a record should be synced based on its cursor value.
564        :param record: The record to evaluate
565
566        :return: True if the record's cursor value falls within the sync boundaries
567        """
568        try:
569            record_cursor_value: CursorValueType = self._extract_cursor_value(record)
570        except ValueError:
571            self._log_for_record_without_cursor_value()
572            return True
573        return self.start <= record_cursor_value <= self._end_provider()
574
575    def _log_for_record_without_cursor_value(self) -> None:
576        if not self._should_be_synced_logger_triggered:
577            LOGGER.warning(
578                f"Could not find cursor field `{self.cursor_field.cursor_field_key}` in record for stream {self._stream_name}. The incremental sync will assume it needs to be synced"
579            )
580            self._should_be_synced_logger_triggered = True
581
582    def reduce_slice_range(self, stream_slice: StreamSlice) -> StreamSlice:
583        # In theory, we might be more flexible here meaning that it doesn't need to be in ascending order but it just
584        # needs to be ordered. For now though, we will only support ascending order.
585        if not self._is_ascending_order:
586            LOGGER.warning(
587                "Attempting to reduce slice while records are not returned in incremental order might lead to missing records"
588            )
589
590        if stream_slice in self._most_recent_cursor_value_per_partition:
591            return StreamSlice(
592                partition=stream_slice.partition,
593                cursor_slice={
594                    self._slice_boundary_fields_wrapper[
595                        self._START_BOUNDARY
596                    ]: self._connector_state_converter.output_format(
597                        self._most_recent_cursor_value_per_partition[stream_slice]
598                    ),
599                    self._slice_boundary_fields_wrapper[
600                        self._END_BOUNDARY
601                    ]: stream_slice.cursor_slice[
602                        self._slice_boundary_fields_wrapper[self._END_BOUNDARY]
603                    ],
604                },
605                extra_fields=stream_slice.extra_fields,
606            )
607        else:
608            return stream_slice
609
610    def get_cursor_datetime_from_state(
611        self, stream_state: Mapping[str, Any]
612    ) -> datetime.datetime | None:
613        """Extract and parse the cursor datetime from the given stream state.
614
615        For concurrent cursors, the state can be in two formats:
616        1. Sequential/legacy format: {cursor_field: cursor_value}
617        2. Concurrent format: {state_type: "date-range", slices: [...]}
618
619        Returns the cursor datetime if present and parseable, otherwise returns None.
620        """
621        # Check if state is in concurrent format (need to convert to dict for type compatibility)
622        mutable_state: MutableMapping[str, Any] = dict(stream_state)
623        if self._connector_state_converter.is_state_message_compatible(mutable_state):
624            slices = stream_state.get("slices", [])
625            if not slices:
626                return None
627            # Get the most recent cursor value from the first slice (after merging)
628            first_slice = slices[0]
629            cursor_value = first_slice.get(
630                self._connector_state_converter.MOST_RECENT_RECORD_KEY
631            ) or first_slice.get(self._connector_state_converter.END_KEY)
632            if not cursor_value:
633                return None
634            try:
635                parsed_value = self._connector_state_converter.parse_value(cursor_value)
636                if isinstance(parsed_value, datetime.datetime):
637                    return parsed_value
638                return None
639            except (ValueError, TypeError):
640                return None
641
642        # Sequential/legacy format: {cursor_field: cursor_value}
643        cursor_value = stream_state.get(self._cursor_field.cursor_field_key)
644        if not cursor_value:
645            return None
646        try:
647            parsed_value = self._connector_state_converter.parse_value(cursor_value)
648            if isinstance(parsed_value, datetime.datetime):
649                return parsed_value
650            return None
651        except (ValueError, TypeError):
652            return None
LOGGER = <Logger airbyte (INFO)>
class CursorField:
42class CursorField:
43    def __init__(
44        self, cursor_field_key: str, supports_catalog_defined_cursor_field: bool = False
45    ) -> None:
46        self.cursor_field_key = cursor_field_key
47        self.supports_catalog_defined_cursor_field = supports_catalog_defined_cursor_field
48
49    def extract_value(self, record: Record) -> Any:
50        cursor_value = record.data.get(self.cursor_field_key)
51        if cursor_value is None:
52            raise ValueError(f"Could not find cursor field {self.cursor_field_key} in record")
53        return cursor_value  # type: ignore  # we assume that the value the path points at is a comparable
CursorField( cursor_field_key: str, supports_catalog_defined_cursor_field: bool = False)
43    def __init__(
44        self, cursor_field_key: str, supports_catalog_defined_cursor_field: bool = False
45    ) -> None:
46        self.cursor_field_key = cursor_field_key
47        self.supports_catalog_defined_cursor_field = supports_catalog_defined_cursor_field
cursor_field_key
supports_catalog_defined_cursor_field
def extract_value(self, record: airbyte_cdk.Record) -> Any:
49    def extract_value(self, record: Record) -> Any:
50        cursor_value = record.data.get(self.cursor_field_key)
51        if cursor_value is None:
52            raise ValueError(f"Could not find cursor field {self.cursor_field_key} in record")
53        return cursor_value  # type: ignore  # we assume that the value the path points at is a comparable
 56class Cursor(StreamSlicer, ABC):
 57    @property
 58    @abstractmethod
 59    def state(self) -> MutableMapping[str, Any]: ...
 60
 61    @abstractmethod
 62    def observe(self, record: Record) -> None:
 63        """
 64        Indicate to the cursor that the record has been emitted
 65        """
 66        raise NotImplementedError()
 67
 68    @abstractmethod
 69    def close_partition(self, partition: Partition) -> None:
 70        """
 71        Indicate to the cursor that the partition has been successfully processed
 72        """
 73        raise NotImplementedError()
 74
 75    @abstractmethod
 76    def ensure_at_least_one_state_emitted(self) -> None:
 77        """
 78        State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per
 79        stream. Hence, if no partitions are generated, this method needs to be called.
 80        """
 81        raise NotImplementedError()
 82
 83    @abstractmethod
 84    def should_be_synced(self, record: Record) -> bool:
 85        pass
 86
 87    def stream_slices(self) -> Iterable[StreamSlice]:
 88        """
 89        Default placeholder implementation of generate_slices.
 90        Subclasses can override this method to provide actual behavior.
 91        """
 92        yield StreamSlice(partition={}, cursor_slice={})
 93
 94    def get_cursor_datetime_from_state(
 95        self, stream_state: Mapping[str, Any]
 96    ) -> datetime.datetime | None:
 97        """Extract and parse the cursor datetime from the given stream state.
 98
 99        This method is used by StateDelegatingStream to validate cursor age against
100        an API's data retention period. Subclasses should implement this method to
101        extract the cursor value from their specific state structure and parse it
102        into a datetime object.
103
104        Returns None if the cursor cannot be extracted or parsed, which will cause
105        StateDelegatingStream to fall back to full refresh (safe default).
106
107        Raises NotImplementedError by default - subclasses must implement this method
108        if they want to support cursor age validation with api_retention_period.
109        """
110        raise NotImplementedError(
111            f"{self.__class__.__name__} does not implement get_cursor_datetime_from_state. "
112            f"Cursor age validation with api_retention_period is not supported for this cursor type."
113        )

Slices the stream into chunks that can be fetched independently. Slices enable state checkpointing and data retrieval parallelization.

state: MutableMapping[str, Any]
57    @property
58    @abstractmethod
59    def state(self) -> MutableMapping[str, Any]: ...
@abstractmethod
def observe(self, record: airbyte_cdk.Record) -> None:
61    @abstractmethod
62    def observe(self, record: Record) -> None:
63        """
64        Indicate to the cursor that the record has been emitted
65        """
66        raise NotImplementedError()

Indicate to the cursor that the record has been emitted

@abstractmethod
def close_partition( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition) -> None:
68    @abstractmethod
69    def close_partition(self, partition: Partition) -> None:
70        """
71        Indicate to the cursor that the partition has been successfully processed
72        """
73        raise NotImplementedError()

Indicate to the cursor that the partition has been successfully processed

@abstractmethod
def ensure_at_least_one_state_emitted(self) -> None:
75    @abstractmethod
76    def ensure_at_least_one_state_emitted(self) -> None:
77        """
78        State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per
79        stream. Hence, if no partitions are generated, this method needs to be called.
80        """
81        raise NotImplementedError()

State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per stream. Hence, if no partitions are generated, this method needs to be called.

@abstractmethod
def should_be_synced(self, record: airbyte_cdk.Record) -> bool:
83    @abstractmethod
84    def should_be_synced(self, record: Record) -> bool:
85        pass
def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
87    def stream_slices(self) -> Iterable[StreamSlice]:
88        """
89        Default placeholder implementation of generate_slices.
90        Subclasses can override this method to provide actual behavior.
91        """
92        yield StreamSlice(partition={}, cursor_slice={})

Default placeholder implementation of generate_slices. Subclasses can override this method to provide actual behavior.

def get_cursor_datetime_from_state(self, stream_state: Mapping[str, Any]) -> datetime.datetime | None:
 94    def get_cursor_datetime_from_state(
 95        self, stream_state: Mapping[str, Any]
 96    ) -> datetime.datetime | None:
 97        """Extract and parse the cursor datetime from the given stream state.
 98
 99        This method is used by StateDelegatingStream to validate cursor age against
100        an API's data retention period. Subclasses should implement this method to
101        extract the cursor value from their specific state structure and parse it
102        into a datetime object.
103
104        Returns None if the cursor cannot be extracted or parsed, which will cause
105        StateDelegatingStream to fall back to full refresh (safe default).
106
107        Raises NotImplementedError by default - subclasses must implement this method
108        if they want to support cursor age validation with api_retention_period.
109        """
110        raise NotImplementedError(
111            f"{self.__class__.__name__} does not implement get_cursor_datetime_from_state. "
112            f"Cursor age validation with api_retention_period is not supported for this cursor type."
113        )

Extract and parse the cursor datetime from the given stream state.

This method is used by StateDelegatingStream to validate cursor age against an API's data retention period. Subclasses should implement this method to extract the cursor value from their specific state structure and parse it into a datetime object.

Returns None if the cursor cannot be extracted or parsed, which will cause StateDelegatingStream to fall back to full refresh (safe default).

Raises NotImplementedError by default - subclasses must implement this method if they want to support cursor age validation with api_retention_period.

class FinalStateCursor(Cursor):
116class FinalStateCursor(Cursor):
117    """Cursor that is used to guarantee at least one state message is emitted for a concurrent stream."""
118
119    def __init__(
120        self,
121        stream_name: str,
122        stream_namespace: Optional[str],
123        message_repository: MessageRepository,
124    ) -> None:
125        self._stream_name = stream_name
126        self._stream_namespace = stream_namespace
127        self._message_repository = message_repository
128        # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel
129        # state message rather than manage overall source state. This is also only temporary as we move to the resumable
130        # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state.
131        self._connector_state_manager = ConnectorStateManager()
132        self._has_closed_at_least_one_slice = False
133
134    @property
135    def state(self) -> MutableMapping[str, Any]:
136        return {NO_CURSOR_STATE_KEY: True}
137
138    def observe(self, record: Record) -> None:
139        pass
140
141    def close_partition(self, partition: Partition) -> None:
142        pass
143
144    def ensure_at_least_one_state_emitted(self) -> None:
145        """
146        Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync
147        """
148
149        self._connector_state_manager.update_state_for_stream(
150            self._stream_name, self._stream_namespace, self.state
151        )
152        state_message = self._connector_state_manager.create_state_message(
153            self._stream_name, self._stream_namespace
154        )
155        self._message_repository.emit_message(state_message)
156
157    def should_be_synced(self, record: Record) -> bool:
158        return True
159
160    def get_cursor_datetime_from_state(
161        self, stream_state: Mapping[str, Any]
162    ) -> datetime.datetime | None:
163        """Return now() if state indicates a completed full refresh, else None.
164
165        When the state has NO_CURSOR_STATE_KEY: True, it means the previous sync was a
166        completed full refresh. Returning now() indicates the cursor is "current" and
167        within any retention period, so we should use incremental sync.
168
169        For any other state format, return None to indicate this cursor cannot parse it,
170        allowing the incremental cursor to handle the state instead.
171        """
172        if stream_state.get(NO_CURSOR_STATE_KEY):
173            return datetime.datetime.now(datetime.timezone.utc)
174        return None

Cursor that is used to guarantee at least one state message is emitted for a concurrent stream.

FinalStateCursor( stream_name: str, stream_namespace: Optional[str], message_repository: airbyte_cdk.MessageRepository)
119    def __init__(
120        self,
121        stream_name: str,
122        stream_namespace: Optional[str],
123        message_repository: MessageRepository,
124    ) -> None:
125        self._stream_name = stream_name
126        self._stream_namespace = stream_namespace
127        self._message_repository = message_repository
128        # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel
129        # state message rather than manage overall source state. This is also only temporary as we move to the resumable
130        # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state.
131        self._connector_state_manager = ConnectorStateManager()
132        self._has_closed_at_least_one_slice = False
state: MutableMapping[str, Any]
134    @property
135    def state(self) -> MutableMapping[str, Any]:
136        return {NO_CURSOR_STATE_KEY: True}
def observe(self, record: airbyte_cdk.Record) -> None:
138    def observe(self, record: Record) -> None:
139        pass

Indicate to the cursor that the record has been emitted

def close_partition( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition) -> None:
141    def close_partition(self, partition: Partition) -> None:
142        pass

Indicate to the cursor that the partition has been successfully processed

def ensure_at_least_one_state_emitted(self) -> None:
144    def ensure_at_least_one_state_emitted(self) -> None:
145        """
146        Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync
147        """
148
149        self._connector_state_manager.update_state_for_stream(
150            self._stream_name, self._stream_namespace, self.state
151        )
152        state_message = self._connector_state_manager.create_state_message(
153            self._stream_name, self._stream_namespace
154        )
155        self._message_repository.emit_message(state_message)

Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync

def should_be_synced(self, record: airbyte_cdk.Record) -> bool:
157    def should_be_synced(self, record: Record) -> bool:
158        return True
def get_cursor_datetime_from_state(self, stream_state: Mapping[str, Any]) -> datetime.datetime | None:
160    def get_cursor_datetime_from_state(
161        self, stream_state: Mapping[str, Any]
162    ) -> datetime.datetime | None:
163        """Return now() if state indicates a completed full refresh, else None.
164
165        When the state has NO_CURSOR_STATE_KEY: True, it means the previous sync was a
166        completed full refresh. Returning now() indicates the cursor is "current" and
167        within any retention period, so we should use incremental sync.
168
169        For any other state format, return None to indicate this cursor cannot parse it,
170        allowing the incremental cursor to handle the state instead.
171        """
172        if stream_state.get(NO_CURSOR_STATE_KEY):
173            return datetime.datetime.now(datetime.timezone.utc)
174        return None

Return now() if state indicates a completed full refresh, else None.

When the state has NO_CURSOR_STATE_KEY: True, it means the previous sync was a completed full refresh. Returning now() indicates the cursor is "current" and within any retention period, so we should use incremental sync.

For any other state format, return None to indicate this cursor cannot parse it, allowing the incremental cursor to handle the state instead.

Inherited Members
Cursor
stream_slices
class ConcurrentCursor(Cursor):
177class ConcurrentCursor(Cursor):
178    _START_BOUNDARY = 0
179    _END_BOUNDARY = 1
180
181    def copy_without_state(self) -> "ConcurrentCursor":
182        return self.__class__(
183            stream_name=self._stream_name,
184            stream_namespace=self._stream_namespace,
185            stream_state={},
186            message_repository=NoopMessageRepository(),
187            connector_state_manager=ConnectorStateManager(),
188            connector_state_converter=self._connector_state_converter,
189            cursor_field=self._cursor_field,
190            slice_boundary_fields=self._slice_boundary_fields,
191            start=self._start,
192            end_provider=self._end_provider,
193            lookback_window=self._lookback_window,
194            slice_range=self._slice_range,
195            cursor_granularity=self._cursor_granularity,
196            clamping_strategy=self._clamping_strategy,
197        )
198
199    def __init__(
200        self,
201        stream_name: str,
202        stream_namespace: Optional[str],
203        stream_state: Any,
204        message_repository: MessageRepository,
205        connector_state_manager: ConnectorStateManager,
206        connector_state_converter: AbstractStreamStateConverter,
207        cursor_field: CursorField,
208        slice_boundary_fields: Optional[Tuple[str, str]],
209        start: Optional[CursorValueType],
210        end_provider: Callable[[], CursorValueType],
211        lookback_window: Optional[GapType] = None,
212        slice_range: Optional[GapType] = None,
213        cursor_granularity: Optional[GapType] = None,
214        clamping_strategy: ClampingStrategy = NoClamping(),
215    ) -> None:
216        self._stream_name = stream_name
217        self._stream_namespace = stream_namespace
218        self._message_repository = message_repository
219        self._connector_state_converter = connector_state_converter
220        self._connector_state_manager = connector_state_manager
221        self._cursor_field = cursor_field
222        # To see some example where the slice boundaries might not be defined, check https://github.com/airbytehq/airbyte/blob/1ce84d6396e446e1ac2377362446e3fb94509461/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py#L363-L379
223        self._slice_boundary_fields = slice_boundary_fields
224        self._start = start
225        self._end_provider = end_provider
226        self.start, self._concurrent_state = self._get_concurrent_state(stream_state)
227        self._lookback_window = lookback_window
228        self._slice_range = slice_range
229        self._most_recent_cursor_value_per_partition: MutableMapping[
230            Union[StreamSlice, Mapping[str, Any], None], Any
231        ] = {}
232        self._has_closed_at_least_one_slice = False
233        self._cursor_granularity = cursor_granularity
234        # Flag to track if the logger has been triggered (per stream)
235        self._should_be_synced_logger_triggered = False
236        self._clamping_strategy = clamping_strategy
237        self._is_ascending_order = True
238
239        # A lock is required when closing a partition because updating the cursor's concurrent_state is
240        # not thread safe. When multiple partitions are being closed by the cursor at the same time, it is
241        # possible for one partition to update concurrent_state after a second partition has already read
242        # the previous state. This can lead to the second partition overwriting the previous one's state.
243        self._lock = threading.Lock()
244
245    @property
246    def state(self) -> MutableMapping[str, Any]:
247        return self._connector_state_converter.convert_to_state_message(
248            self.cursor_field, self._concurrent_state
249        )
250
251    @property
252    def cursor_field(self) -> CursorField:
253        return self._cursor_field
254
255    @property
256    def _slice_boundary_fields_wrapper(self) -> Tuple[str, str]:
257        return (
258            self._slice_boundary_fields
259            if self._slice_boundary_fields
260            else (
261                self._connector_state_converter.START_KEY,
262                self._connector_state_converter.END_KEY,
263            )
264        )
265
266    def _get_concurrent_state(
267        self, state: MutableMapping[str, Any]
268    ) -> Tuple[CursorValueType, MutableMapping[str, Any]]:
269        if self._connector_state_converter.is_state_message_compatible(state):
270            partitioned_state = self._connector_state_converter.deserialize(state)
271            slices_from_partitioned_state = partitioned_state.get("slices", [])
272
273            value_from_partitioned_state = None
274            if slices_from_partitioned_state:
275                # We assume here that the slices have been already merged
276                first_slice = slices_from_partitioned_state[0]
277                value_from_partitioned_state = (
278                    first_slice[self._connector_state_converter.MOST_RECENT_RECORD_KEY]
279                    if self._connector_state_converter.MOST_RECENT_RECORD_KEY in first_slice
280                    else first_slice[self._connector_state_converter.END_KEY]
281                )
282            return (
283                value_from_partitioned_state
284                or self._start
285                or self._connector_state_converter.zero_value,
286                partitioned_state,
287            )
288        return self._connector_state_converter.convert_from_sequential_state(
289            self._cursor_field, state, self._start
290        )
291
292    def observe(self, record: Record) -> None:
293        # Because observe writes to the most_recent_cursor_value_per_partition mapping,
294        # it is not thread-safe. However, this shouldn't lead to concurrency issues because
295        # observe() is only invoked by PartitionReader.process_partition(). Since the map is
296        # broken down according to partition, concurrent threads processing only read/write
297        # from different keys which avoids any conflicts.
298        #
299        # If we were to add thread safety, we should implement a lock per-partition
300        # which is instantiated during stream_slices()
301        most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get(
302            record.associated_slice
303        )
304        try:
305            cursor_value = self._extract_cursor_value(record)
306
307            if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value:
308                self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value
309            elif most_recent_cursor_value > cursor_value:
310                self._is_ascending_order = False
311        except ValueError:
312            self._log_for_record_without_cursor_value()
313
314    def _extract_cursor_value(self, record: Record) -> Any:
315        return self._connector_state_converter.parse_value(self._cursor_field.extract_value(record))
316
317    def close_partition(self, partition: Partition) -> None:
318        with self._lock:
319            slice_count_before = len(self._concurrent_state.get("slices", []))
320            self._add_slice_to_state(partition)
321            if slice_count_before < len(
322                self._concurrent_state["slices"]
323            ):  # only emit if at least one slice has been processed
324                self._merge_partitions()
325                self._emit_state_message()
326        self._has_closed_at_least_one_slice = True
327
328    def _add_slice_to_state(self, partition: Partition) -> None:
329        most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get(
330            partition.to_slice()
331        )
332
333        if self._slice_boundary_fields:
334            if "slices" not in self._concurrent_state:
335                raise RuntimeError(
336                    f"The state for stream {self._stream_name} should have at least one slice to delineate the sync start time, but no slices are present. This is unexpected. Please contact Support."
337                )
338            self._concurrent_state["slices"].append(
339                {
340                    self._connector_state_converter.START_KEY: self._extract_from_slice(
341                        partition, self._slice_boundary_fields[self._START_BOUNDARY]
342                    ),
343                    self._connector_state_converter.END_KEY: self._extract_from_slice(
344                        partition, self._slice_boundary_fields[self._END_BOUNDARY]
345                    ),
346                    self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value,
347                }
348            )
349        elif most_recent_cursor_value:
350            if self._has_closed_at_least_one_slice:
351                # If we track state value using records cursor field, we can only do that if there is one partition. This is because we save
352                # the state every time we close a partition. We assume that if there are multiple slices, they need to be providing
353                # boundaries. There are cases where partitions could not have boundaries:
354                # * The cursor should be per-partition
355                # * The stream state is actually the parent stream state
356                # There might be other cases not listed above. Those are not supported today hence the stream should not use this cursor for
357                # state management. For the specific user that was affected with this issue, we need to:
358                # * Fix state tracking (which is currently broken)
359                # * Make the new version available
360                # * (Probably) ask the user to reset the stream to avoid data loss
361                raise ValueError(
362                    "Given that slice_boundary_fields is not defined and that per-partition state is not supported, only one slice is "
363                    "expected. Please contact the Airbyte team."
364                )
365
366            self._concurrent_state["slices"].append(
367                {
368                    self._connector_state_converter.START_KEY: self.start,
369                    self._connector_state_converter.END_KEY: most_recent_cursor_value,
370                    self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value,
371                }
372            )
373
374    def _emit_state_message(self) -> None:
375        self._connector_state_manager.update_state_for_stream(
376            self._stream_name,
377            self._stream_namespace,
378            self.state,
379        )
380        state_message = self._connector_state_manager.create_state_message(
381            self._stream_name, self._stream_namespace
382        )
383        self._message_repository.emit_message(state_message)
384
385    def _merge_partitions(self) -> None:
386        self._concurrent_state["slices"] = self._connector_state_converter.merge_intervals(
387            self._concurrent_state["slices"]
388        )
389
390    def _extract_from_slice(self, partition: Partition, key: str) -> CursorValueType:
391        try:
392            _slice = partition.to_slice()
393            if not _slice:
394                raise KeyError(f"Could not find key `{key}` in empty slice")
395            return self._connector_state_converter.parse_value(_slice[key])  # type: ignore  # we expect the devs to specify a key that would return a CursorValueType
396        except KeyError as exception:
397            raise KeyError(
398                f"Partition is expected to have key `{key}` but could not be found"
399            ) from exception
400
401    def ensure_at_least_one_state_emitted(self) -> None:
402        """
403        The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be
404        called.
405        """
406        self._emit_state_message()
407
408    def stream_slices(self) -> Iterable[StreamSlice]:
409        """
410        Generating slices based on a few parameters:
411        * lookback_window: Buffer to remove from END_KEY of the highest slice
412        * slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created
413        * start: `_split_per_slice_range` will clip any value to `self._start which means that:
414          * if upper is less than self._start, no slices will be generated
415          * if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case)
416
417        Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be
418        inclusive in the API that is queried.
419        """
420        self._merge_partitions()
421
422        if self._start is not None and self._is_start_before_first_slice():
423            yield from self._split_per_slice_range(
424                self._start,
425                self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY],
426                False,
427            )
428
429        if len(self._concurrent_state["slices"]) == 1:
430            yield from self._split_per_slice_range(
431                self._calculate_lower_boundary_of_last_slice(
432                    self._concurrent_state["slices"][0][self._connector_state_converter.END_KEY]
433                ),
434                self._end_provider(),
435                True,
436            )
437        elif len(self._concurrent_state["slices"]) > 1:
438            for i in range(len(self._concurrent_state["slices"]) - 1):
439                if self._cursor_granularity:
440                    yield from self._split_per_slice_range(
441                        self._concurrent_state["slices"][i][self._connector_state_converter.END_KEY]
442                        + self._cursor_granularity,
443                        self._concurrent_state["slices"][i + 1][
444                            self._connector_state_converter.START_KEY
445                        ],
446                        False,
447                    )
448                else:
449                    yield from self._split_per_slice_range(
450                        self._concurrent_state["slices"][i][
451                            self._connector_state_converter.END_KEY
452                        ],
453                        self._concurrent_state["slices"][i + 1][
454                            self._connector_state_converter.START_KEY
455                        ],
456                        False,
457                    )
458            yield from self._split_per_slice_range(
459                self._calculate_lower_boundary_of_last_slice(
460                    self._concurrent_state["slices"][-1][self._connector_state_converter.END_KEY]
461                ),
462                self._end_provider(),
463                True,
464            )
465        else:
466            raise ValueError("Expected at least one slice")
467
468    def _is_start_before_first_slice(self) -> bool:
469        return (
470            self._start is not None
471            and self._start
472            < self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY]
473        )
474
475    def _calculate_lower_boundary_of_last_slice(
476        self, lower_boundary: CursorValueType
477    ) -> CursorValueType:
478        if self._lookback_window:
479            return lower_boundary - self._lookback_window
480        return lower_boundary
481
482    def _split_per_slice_range(
483        self, lower: CursorValueType, upper: CursorValueType, upper_is_end: bool
484    ) -> Iterable[StreamSlice]:
485        if lower >= upper:
486            return
487
488        if self._start and upper < self._start:
489            return
490
491        lower = max(lower, self._start) if self._start else lower
492        if not self._slice_range or self._evaluate_upper_safely(lower, self._slice_range) >= upper:
493            clamped_lower = self._clamping_strategy.clamp(lower)
494            clamped_upper = self._clamping_strategy.clamp(upper)
495            start_value, end_value = (
496                (clamped_lower, clamped_upper - self._cursor_granularity)
497                if self._cursor_granularity and not upper_is_end
498                else (clamped_lower, clamped_upper)
499            )
500            yield StreamSlice(
501                partition={},
502                cursor_slice={
503                    self._slice_boundary_fields_wrapper[
504                        self._START_BOUNDARY
505                    ]: self._connector_state_converter.output_format(start_value),
506                    self._slice_boundary_fields_wrapper[
507                        self._END_BOUNDARY
508                    ]: self._connector_state_converter.output_format(end_value),
509                },
510            )
511        else:
512            stop_processing = False
513            current_lower_boundary = lower
514            while not stop_processing:
515                current_upper_boundary = min(
516                    self._evaluate_upper_safely(current_lower_boundary, self._slice_range), upper
517                )
518                has_reached_upper_boundary = current_upper_boundary >= upper
519
520                clamped_upper = (
521                    self._clamping_strategy.clamp(current_upper_boundary)
522                    if current_upper_boundary != upper
523                    else current_upper_boundary
524                )
525                clamped_lower = self._clamping_strategy.clamp(current_lower_boundary)
526                if clamped_lower >= clamped_upper:
527                    # clamping collapsed both values which means that it is time to stop processing
528                    # FIXME should this be replace by proper end_provider
529                    break
530                start_value, end_value = (
531                    (clamped_lower, clamped_upper - self._cursor_granularity)
532                    if self._cursor_granularity
533                    and (not upper_is_end or not has_reached_upper_boundary)
534                    else (clamped_lower, clamped_upper)
535                )
536                yield StreamSlice(
537                    partition={},
538                    cursor_slice={
539                        self._slice_boundary_fields_wrapper[
540                            self._START_BOUNDARY
541                        ]: self._connector_state_converter.output_format(start_value),
542                        self._slice_boundary_fields_wrapper[
543                            self._END_BOUNDARY
544                        ]: self._connector_state_converter.output_format(end_value),
545                    },
546                )
547                current_lower_boundary = clamped_upper
548                if current_upper_boundary >= upper:
549                    stop_processing = True
550
551    def _evaluate_upper_safely(self, lower: CursorValueType, step: GapType) -> CursorValueType:
552        """
553        Given that we set the default step at datetime.timedelta.max, we will generate an OverflowError when evaluating the next start_date
554        This method assumes that users would never enter a step that would generate an overflow. Given that would be the case, the code
555        would have broken anyway.
556        """
557        try:
558            return lower + step
559        except OverflowError:
560            return self._end_provider()
561
562    def should_be_synced(self, record: Record) -> bool:
563        """
564        Determines if a record should be synced based on its cursor value.
565        :param record: The record to evaluate
566
567        :return: True if the record's cursor value falls within the sync boundaries
568        """
569        try:
570            record_cursor_value: CursorValueType = self._extract_cursor_value(record)
571        except ValueError:
572            self._log_for_record_without_cursor_value()
573            return True
574        return self.start <= record_cursor_value <= self._end_provider()
575
576    def _log_for_record_without_cursor_value(self) -> None:
577        if not self._should_be_synced_logger_triggered:
578            LOGGER.warning(
579                f"Could not find cursor field `{self.cursor_field.cursor_field_key}` in record for stream {self._stream_name}. The incremental sync will assume it needs to be synced"
580            )
581            self._should_be_synced_logger_triggered = True
582
583    def reduce_slice_range(self, stream_slice: StreamSlice) -> StreamSlice:
584        # In theory, we might be more flexible here meaning that it doesn't need to be in ascending order but it just
585        # needs to be ordered. For now though, we will only support ascending order.
586        if not self._is_ascending_order:
587            LOGGER.warning(
588                "Attempting to reduce slice while records are not returned in incremental order might lead to missing records"
589            )
590
591        if stream_slice in self._most_recent_cursor_value_per_partition:
592            return StreamSlice(
593                partition=stream_slice.partition,
594                cursor_slice={
595                    self._slice_boundary_fields_wrapper[
596                        self._START_BOUNDARY
597                    ]: self._connector_state_converter.output_format(
598                        self._most_recent_cursor_value_per_partition[stream_slice]
599                    ),
600                    self._slice_boundary_fields_wrapper[
601                        self._END_BOUNDARY
602                    ]: stream_slice.cursor_slice[
603                        self._slice_boundary_fields_wrapper[self._END_BOUNDARY]
604                    ],
605                },
606                extra_fields=stream_slice.extra_fields,
607            )
608        else:
609            return stream_slice
610
611    def get_cursor_datetime_from_state(
612        self, stream_state: Mapping[str, Any]
613    ) -> datetime.datetime | None:
614        """Extract and parse the cursor datetime from the given stream state.
615
616        For concurrent cursors, the state can be in two formats:
617        1. Sequential/legacy format: {cursor_field: cursor_value}
618        2. Concurrent format: {state_type: "date-range", slices: [...]}
619
620        Returns the cursor datetime if present and parseable, otherwise returns None.
621        """
622        # Check if state is in concurrent format (need to convert to dict for type compatibility)
623        mutable_state: MutableMapping[str, Any] = dict(stream_state)
624        if self._connector_state_converter.is_state_message_compatible(mutable_state):
625            slices = stream_state.get("slices", [])
626            if not slices:
627                return None
628            # Get the most recent cursor value from the first slice (after merging)
629            first_slice = slices[0]
630            cursor_value = first_slice.get(
631                self._connector_state_converter.MOST_RECENT_RECORD_KEY
632            ) or first_slice.get(self._connector_state_converter.END_KEY)
633            if not cursor_value:
634                return None
635            try:
636                parsed_value = self._connector_state_converter.parse_value(cursor_value)
637                if isinstance(parsed_value, datetime.datetime):
638                    return parsed_value
639                return None
640            except (ValueError, TypeError):
641                return None
642
643        # Sequential/legacy format: {cursor_field: cursor_value}
644        cursor_value = stream_state.get(self._cursor_field.cursor_field_key)
645        if not cursor_value:
646            return None
647        try:
648            parsed_value = self._connector_state_converter.parse_value(cursor_value)
649            if isinstance(parsed_value, datetime.datetime):
650                return parsed_value
651            return None
652        except (ValueError, TypeError):
653            return None

Slices the stream into chunks that can be fetched independently. Slices enable state checkpointing and data retrieval parallelization.

ConcurrentCursor( stream_name: str, stream_namespace: Optional[str], stream_state: Any, message_repository: airbyte_cdk.MessageRepository, connector_state_manager: airbyte_cdk.ConnectorStateManager, connector_state_converter: airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter.AbstractStreamStateConverter, cursor_field: CursorField, slice_boundary_fields: Optional[Tuple[str, str]], start: Optional[airbyte_cdk.sources.streams.concurrent.cursor_types.CursorValueType], end_provider: Callable[[], airbyte_cdk.sources.streams.concurrent.cursor_types.CursorValueType], lookback_window: Optional[airbyte_cdk.sources.streams.concurrent.cursor_types.GapType] = None, slice_range: Optional[airbyte_cdk.sources.streams.concurrent.cursor_types.GapType] = None, cursor_granularity: Optional[airbyte_cdk.sources.streams.concurrent.cursor_types.GapType] = None, clamping_strategy: airbyte_cdk.sources.streams.concurrent.clamping.ClampingStrategy = <airbyte_cdk.sources.streams.concurrent.clamping.NoClamping object>)
199    def __init__(
200        self,
201        stream_name: str,
202        stream_namespace: Optional[str],
203        stream_state: Any,
204        message_repository: MessageRepository,
205        connector_state_manager: ConnectorStateManager,
206        connector_state_converter: AbstractStreamStateConverter,
207        cursor_field: CursorField,
208        slice_boundary_fields: Optional[Tuple[str, str]],
209        start: Optional[CursorValueType],
210        end_provider: Callable[[], CursorValueType],
211        lookback_window: Optional[GapType] = None,
212        slice_range: Optional[GapType] = None,
213        cursor_granularity: Optional[GapType] = None,
214        clamping_strategy: ClampingStrategy = NoClamping(),
215    ) -> None:
216        self._stream_name = stream_name
217        self._stream_namespace = stream_namespace
218        self._message_repository = message_repository
219        self._connector_state_converter = connector_state_converter
220        self._connector_state_manager = connector_state_manager
221        self._cursor_field = cursor_field
222        # To see some example where the slice boundaries might not be defined, check https://github.com/airbytehq/airbyte/blob/1ce84d6396e446e1ac2377362446e3fb94509461/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py#L363-L379
223        self._slice_boundary_fields = slice_boundary_fields
224        self._start = start
225        self._end_provider = end_provider
226        self.start, self._concurrent_state = self._get_concurrent_state(stream_state)
227        self._lookback_window = lookback_window
228        self._slice_range = slice_range
229        self._most_recent_cursor_value_per_partition: MutableMapping[
230            Union[StreamSlice, Mapping[str, Any], None], Any
231        ] = {}
232        self._has_closed_at_least_one_slice = False
233        self._cursor_granularity = cursor_granularity
234        # Flag to track if the logger has been triggered (per stream)
235        self._should_be_synced_logger_triggered = False
236        self._clamping_strategy = clamping_strategy
237        self._is_ascending_order = True
238
239        # A lock is required when closing a partition because updating the cursor's concurrent_state is
240        # not thread safe. When multiple partitions are being closed by the cursor at the same time, it is
241        # possible for one partition to update concurrent_state after a second partition has already read
242        # the previous state. This can lead to the second partition overwriting the previous one's state.
243        self._lock = threading.Lock()
def copy_without_state(self) -> ConcurrentCursor:
181    def copy_without_state(self) -> "ConcurrentCursor":
182        return self.__class__(
183            stream_name=self._stream_name,
184            stream_namespace=self._stream_namespace,
185            stream_state={},
186            message_repository=NoopMessageRepository(),
187            connector_state_manager=ConnectorStateManager(),
188            connector_state_converter=self._connector_state_converter,
189            cursor_field=self._cursor_field,
190            slice_boundary_fields=self._slice_boundary_fields,
191            start=self._start,
192            end_provider=self._end_provider,
193            lookback_window=self._lookback_window,
194            slice_range=self._slice_range,
195            cursor_granularity=self._cursor_granularity,
196            clamping_strategy=self._clamping_strategy,
197        )
state: MutableMapping[str, Any]
245    @property
246    def state(self) -> MutableMapping[str, Any]:
247        return self._connector_state_converter.convert_to_state_message(
248            self.cursor_field, self._concurrent_state
249        )
cursor_field: CursorField
251    @property
252    def cursor_field(self) -> CursorField:
253        return self._cursor_field
def observe(self, record: airbyte_cdk.Record) -> None:
292    def observe(self, record: Record) -> None:
293        # Because observe writes to the most_recent_cursor_value_per_partition mapping,
294        # it is not thread-safe. However, this shouldn't lead to concurrency issues because
295        # observe() is only invoked by PartitionReader.process_partition(). Since the map is
296        # broken down according to partition, concurrent threads processing only read/write
297        # from different keys which avoids any conflicts.
298        #
299        # If we were to add thread safety, we should implement a lock per-partition
300        # which is instantiated during stream_slices()
301        most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get(
302            record.associated_slice
303        )
304        try:
305            cursor_value = self._extract_cursor_value(record)
306
307            if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value:
308                self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value
309            elif most_recent_cursor_value > cursor_value:
310                self._is_ascending_order = False
311        except ValueError:
312            self._log_for_record_without_cursor_value()

Indicate to the cursor that the record has been emitted

def close_partition( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition) -> None:
317    def close_partition(self, partition: Partition) -> None:
318        with self._lock:
319            slice_count_before = len(self._concurrent_state.get("slices", []))
320            self._add_slice_to_state(partition)
321            if slice_count_before < len(
322                self._concurrent_state["slices"]
323            ):  # only emit if at least one slice has been processed
324                self._merge_partitions()
325                self._emit_state_message()
326        self._has_closed_at_least_one_slice = True

Indicate to the cursor that the partition has been successfully processed

def ensure_at_least_one_state_emitted(self) -> None:
401    def ensure_at_least_one_state_emitted(self) -> None:
402        """
403        The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be
404        called.
405        """
406        self._emit_state_message()

The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be called.

def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
408    def stream_slices(self) -> Iterable[StreamSlice]:
409        """
410        Generating slices based on a few parameters:
411        * lookback_window: Buffer to remove from END_KEY of the highest slice
412        * slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created
413        * start: `_split_per_slice_range` will clip any value to `self._start which means that:
414          * if upper is less than self._start, no slices will be generated
415          * if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case)
416
417        Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be
418        inclusive in the API that is queried.
419        """
420        self._merge_partitions()
421
422        if self._start is not None and self._is_start_before_first_slice():
423            yield from self._split_per_slice_range(
424                self._start,
425                self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY],
426                False,
427            )
428
429        if len(self._concurrent_state["slices"]) == 1:
430            yield from self._split_per_slice_range(
431                self._calculate_lower_boundary_of_last_slice(
432                    self._concurrent_state["slices"][0][self._connector_state_converter.END_KEY]
433                ),
434                self._end_provider(),
435                True,
436            )
437        elif len(self._concurrent_state["slices"]) > 1:
438            for i in range(len(self._concurrent_state["slices"]) - 1):
439                if self._cursor_granularity:
440                    yield from self._split_per_slice_range(
441                        self._concurrent_state["slices"][i][self._connector_state_converter.END_KEY]
442                        + self._cursor_granularity,
443                        self._concurrent_state["slices"][i + 1][
444                            self._connector_state_converter.START_KEY
445                        ],
446                        False,
447                    )
448                else:
449                    yield from self._split_per_slice_range(
450                        self._concurrent_state["slices"][i][
451                            self._connector_state_converter.END_KEY
452                        ],
453                        self._concurrent_state["slices"][i + 1][
454                            self._connector_state_converter.START_KEY
455                        ],
456                        False,
457                    )
458            yield from self._split_per_slice_range(
459                self._calculate_lower_boundary_of_last_slice(
460                    self._concurrent_state["slices"][-1][self._connector_state_converter.END_KEY]
461                ),
462                self._end_provider(),
463                True,
464            )
465        else:
466            raise ValueError("Expected at least one slice")

Generating slices based on a few parameters:

  • lookback_window: Buffer to remove from END_KEY of the highest slice
  • slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created
  • start: _split_per_slice_range will clip any value to `self._start which means that:
    • if upper is less than self._start, no slices will be generated
    • if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case)

Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be inclusive in the API that is queried.

def should_be_synced(self, record: airbyte_cdk.Record) -> bool:
562    def should_be_synced(self, record: Record) -> bool:
563        """
564        Determines if a record should be synced based on its cursor value.
565        :param record: The record to evaluate
566
567        :return: True if the record's cursor value falls within the sync boundaries
568        """
569        try:
570            record_cursor_value: CursorValueType = self._extract_cursor_value(record)
571        except ValueError:
572            self._log_for_record_without_cursor_value()
573            return True
574        return self.start <= record_cursor_value <= self._end_provider()

Determines if a record should be synced based on its cursor value.

Parameters
  • record: The record to evaluate
Returns

True if the record's cursor value falls within the sync boundaries

def reduce_slice_range( self, stream_slice: airbyte_cdk.StreamSlice) -> airbyte_cdk.StreamSlice:
583    def reduce_slice_range(self, stream_slice: StreamSlice) -> StreamSlice:
584        # In theory, we might be more flexible here meaning that it doesn't need to be in ascending order but it just
585        # needs to be ordered. For now though, we will only support ascending order.
586        if not self._is_ascending_order:
587            LOGGER.warning(
588                "Attempting to reduce slice while records are not returned in incremental order might lead to missing records"
589            )
590
591        if stream_slice in self._most_recent_cursor_value_per_partition:
592            return StreamSlice(
593                partition=stream_slice.partition,
594                cursor_slice={
595                    self._slice_boundary_fields_wrapper[
596                        self._START_BOUNDARY
597                    ]: self._connector_state_converter.output_format(
598                        self._most_recent_cursor_value_per_partition[stream_slice]
599                    ),
600                    self._slice_boundary_fields_wrapper[
601                        self._END_BOUNDARY
602                    ]: stream_slice.cursor_slice[
603                        self._slice_boundary_fields_wrapper[self._END_BOUNDARY]
604                    ],
605                },
606                extra_fields=stream_slice.extra_fields,
607            )
608        else:
609            return stream_slice
def get_cursor_datetime_from_state(self, stream_state: Mapping[str, Any]) -> datetime.datetime | None:
611    def get_cursor_datetime_from_state(
612        self, stream_state: Mapping[str, Any]
613    ) -> datetime.datetime | None:
614        """Extract and parse the cursor datetime from the given stream state.
615
616        For concurrent cursors, the state can be in two formats:
617        1. Sequential/legacy format: {cursor_field: cursor_value}
618        2. Concurrent format: {state_type: "date-range", slices: [...]}
619
620        Returns the cursor datetime if present and parseable, otherwise returns None.
621        """
622        # Check if state is in concurrent format (need to convert to dict for type compatibility)
623        mutable_state: MutableMapping[str, Any] = dict(stream_state)
624        if self._connector_state_converter.is_state_message_compatible(mutable_state):
625            slices = stream_state.get("slices", [])
626            if not slices:
627                return None
628            # Get the most recent cursor value from the first slice (after merging)
629            first_slice = slices[0]
630            cursor_value = first_slice.get(
631                self._connector_state_converter.MOST_RECENT_RECORD_KEY
632            ) or first_slice.get(self._connector_state_converter.END_KEY)
633            if not cursor_value:
634                return None
635            try:
636                parsed_value = self._connector_state_converter.parse_value(cursor_value)
637                if isinstance(parsed_value, datetime.datetime):
638                    return parsed_value
639                return None
640            except (ValueError, TypeError):
641                return None
642
643        # Sequential/legacy format: {cursor_field: cursor_value}
644        cursor_value = stream_state.get(self._cursor_field.cursor_field_key)
645        if not cursor_value:
646            return None
647        try:
648            parsed_value = self._connector_state_converter.parse_value(cursor_value)
649            if isinstance(parsed_value, datetime.datetime):
650                return parsed_value
651            return None
652        except (ValueError, TypeError):
653            return None

Extract and parse the cursor datetime from the given stream state.

For concurrent cursors, the state can be in two formats:

  1. Sequential/legacy format: {cursor_field: cursor_value}
  2. Concurrent format: {state_type: "date-range", slices: [...]}

Returns the cursor datetime if present and parseable, otherwise returns None.