airbyte_cdk.sources.streams.concurrent.cursor

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5import functools
  6import logging
  7import threading
  8from abc import ABC, abstractmethod
  9from typing import (
 10    Any,
 11    Callable,
 12    Iterable,
 13    List,
 14    Mapping,
 15    MutableMapping,
 16    Optional,
 17    Tuple,
 18    Union,
 19)
 20
 21from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
 22from airbyte_cdk.sources.message import MessageRepository, NoopMessageRepository
 23from airbyte_cdk.sources.streams import NO_CURSOR_STATE_KEY
 24from airbyte_cdk.sources.streams.concurrent.clamping import ClampingStrategy, NoClamping
 25from airbyte_cdk.sources.streams.concurrent.cursor_types import CursorValueType, GapType
 26from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
 27from airbyte_cdk.sources.streams.concurrent.partitions.stream_slicer import StreamSlicer
 28from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import (
 29    AbstractStreamStateConverter,
 30)
 31from airbyte_cdk.sources.types import Record, StreamSlice
 32
 33LOGGER = logging.getLogger("airbyte")
 34
 35
 36def _extract_value(mapping: Mapping[str, Any], path: List[str]) -> Any:
 37    return functools.reduce(lambda a, b: a[b], path, mapping)
 38
 39
 40class CursorField:
 41    def __init__(
 42        self, cursor_field_key: str, supports_catalog_defined_cursor_field: bool = False
 43    ) -> None:
 44        self.cursor_field_key = cursor_field_key
 45        self.supports_catalog_defined_cursor_field = supports_catalog_defined_cursor_field
 46
 47    def extract_value(self, record: Record) -> Any:
 48        cursor_value = record.data.get(self.cursor_field_key)
 49        if cursor_value is None:
 50            raise ValueError(f"Could not find cursor field {self.cursor_field_key} in record")
 51        return cursor_value  # type: ignore  # we assume that the value the path points at is a comparable
 52
 53
 54class Cursor(StreamSlicer, ABC):
 55    @property
 56    @abstractmethod
 57    def state(self) -> MutableMapping[str, Any]: ...
 58
 59    @abstractmethod
 60    def observe(self, record: Record) -> None:
 61        """
 62        Indicate to the cursor that the record has been emitted
 63        """
 64        raise NotImplementedError()
 65
 66    @abstractmethod
 67    def close_partition(self, partition: Partition) -> None:
 68        """
 69        Indicate to the cursor that the partition has been successfully processed
 70        """
 71        raise NotImplementedError()
 72
 73    @abstractmethod
 74    def ensure_at_least_one_state_emitted(self) -> None:
 75        """
 76        State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per
 77        stream. Hence, if no partitions are generated, this method needs to be called.
 78        """
 79        raise NotImplementedError()
 80
 81    @abstractmethod
 82    def should_be_synced(self, record: Record) -> bool:
 83        pass
 84
 85    def stream_slices(self) -> Iterable[StreamSlice]:
 86        """
 87        Default placeholder implementation of generate_slices.
 88        Subclasses can override this method to provide actual behavior.
 89        """
 90        yield StreamSlice(partition={}, cursor_slice={})
 91
 92
 93class FinalStateCursor(Cursor):
 94    """Cursor that is used to guarantee at least one state message is emitted for a concurrent stream."""
 95
 96    def __init__(
 97        self,
 98        stream_name: str,
 99        stream_namespace: Optional[str],
100        message_repository: MessageRepository,
101    ) -> None:
102        self._stream_name = stream_name
103        self._stream_namespace = stream_namespace
104        self._message_repository = message_repository
105        # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel
106        # state message rather than manage overall source state. This is also only temporary as we move to the resumable
107        # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state.
108        self._connector_state_manager = ConnectorStateManager()
109        self._has_closed_at_least_one_slice = False
110
111    @property
112    def state(self) -> MutableMapping[str, Any]:
113        return {NO_CURSOR_STATE_KEY: True}
114
115    def observe(self, record: Record) -> None:
116        pass
117
118    def close_partition(self, partition: Partition) -> None:
119        pass
120
121    def ensure_at_least_one_state_emitted(self) -> None:
122        """
123        Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync
124        """
125
126        self._connector_state_manager.update_state_for_stream(
127            self._stream_name, self._stream_namespace, self.state
128        )
129        state_message = self._connector_state_manager.create_state_message(
130            self._stream_name, self._stream_namespace
131        )
132        self._message_repository.emit_message(state_message)
133
134    def should_be_synced(self, record: Record) -> bool:
135        return True
136
137
138class ConcurrentCursor(Cursor):
139    _START_BOUNDARY = 0
140    _END_BOUNDARY = 1
141
142    def copy_without_state(self) -> "ConcurrentCursor":
143        return self.__class__(
144            stream_name=self._stream_name,
145            stream_namespace=self._stream_namespace,
146            stream_state={},
147            message_repository=NoopMessageRepository(),
148            connector_state_manager=ConnectorStateManager(),
149            connector_state_converter=self._connector_state_converter,
150            cursor_field=self._cursor_field,
151            slice_boundary_fields=self._slice_boundary_fields,
152            start=self._start,
153            end_provider=self._end_provider,
154            lookback_window=self._lookback_window,
155            slice_range=self._slice_range,
156            cursor_granularity=self._cursor_granularity,
157            clamping_strategy=self._clamping_strategy,
158        )
159
160    def __init__(
161        self,
162        stream_name: str,
163        stream_namespace: Optional[str],
164        stream_state: Any,
165        message_repository: MessageRepository,
166        connector_state_manager: ConnectorStateManager,
167        connector_state_converter: AbstractStreamStateConverter,
168        cursor_field: CursorField,
169        slice_boundary_fields: Optional[Tuple[str, str]],
170        start: Optional[CursorValueType],
171        end_provider: Callable[[], CursorValueType],
172        lookback_window: Optional[GapType] = None,
173        slice_range: Optional[GapType] = None,
174        cursor_granularity: Optional[GapType] = None,
175        clamping_strategy: ClampingStrategy = NoClamping(),
176    ) -> None:
177        self._stream_name = stream_name
178        self._stream_namespace = stream_namespace
179        self._message_repository = message_repository
180        self._connector_state_converter = connector_state_converter
181        self._connector_state_manager = connector_state_manager
182        self._cursor_field = cursor_field
183        # To see some example where the slice boundaries might not be defined, check https://github.com/airbytehq/airbyte/blob/1ce84d6396e446e1ac2377362446e3fb94509461/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py#L363-L379
184        self._slice_boundary_fields = slice_boundary_fields
185        self._start = start
186        self._end_provider = end_provider
187        self.start, self._concurrent_state = self._get_concurrent_state(stream_state)
188        self._lookback_window = lookback_window
189        self._slice_range = slice_range
190        self._most_recent_cursor_value_per_partition: MutableMapping[
191            Union[StreamSlice, Mapping[str, Any], None], Any
192        ] = {}
193        self._has_closed_at_least_one_slice = False
194        self._cursor_granularity = cursor_granularity
195        # Flag to track if the logger has been triggered (per stream)
196        self._should_be_synced_logger_triggered = False
197        self._clamping_strategy = clamping_strategy
198        self._is_ascending_order = True
199
200        # A lock is required when closing a partition because updating the cursor's concurrent_state is
201        # not thread safe. When multiple partitions are being closed by the cursor at the same time, it is
202        # possible for one partition to update concurrent_state after a second partition has already read
203        # the previous state. This can lead to the second partition overwriting the previous one's state.
204        self._lock = threading.Lock()
205
206    @property
207    def state(self) -> MutableMapping[str, Any]:
208        return self._connector_state_converter.convert_to_state_message(
209            self.cursor_field, self._concurrent_state
210        )
211
212    @property
213    def cursor_field(self) -> CursorField:
214        return self._cursor_field
215
216    @property
217    def _slice_boundary_fields_wrapper(self) -> Tuple[str, str]:
218        return (
219            self._slice_boundary_fields
220            if self._slice_boundary_fields
221            else (
222                self._connector_state_converter.START_KEY,
223                self._connector_state_converter.END_KEY,
224            )
225        )
226
227    def _get_concurrent_state(
228        self, state: MutableMapping[str, Any]
229    ) -> Tuple[CursorValueType, MutableMapping[str, Any]]:
230        if self._connector_state_converter.is_state_message_compatible(state):
231            partitioned_state = self._connector_state_converter.deserialize(state)
232            slices_from_partitioned_state = partitioned_state.get("slices", [])
233
234            value_from_partitioned_state = None
235            if slices_from_partitioned_state:
236                # We assume here that the slices have been already merged
237                first_slice = slices_from_partitioned_state[0]
238                value_from_partitioned_state = (
239                    first_slice[self._connector_state_converter.MOST_RECENT_RECORD_KEY]
240                    if self._connector_state_converter.MOST_RECENT_RECORD_KEY in first_slice
241                    else first_slice[self._connector_state_converter.END_KEY]
242                )
243            return (
244                value_from_partitioned_state
245                or self._start
246                or self._connector_state_converter.zero_value,
247                partitioned_state,
248            )
249        return self._connector_state_converter.convert_from_sequential_state(
250            self._cursor_field, state, self._start
251        )
252
253    def observe(self, record: Record) -> None:
254        # Because observe writes to the most_recent_cursor_value_per_partition mapping,
255        # it is not thread-safe. However, this shouldn't lead to concurrency issues because
256        # observe() is only invoked by PartitionReader.process_partition(). Since the map is
257        # broken down according to partition, concurrent threads processing only read/write
258        # from different keys which avoids any conflicts.
259        #
260        # If we were to add thread safety, we should implement a lock per-partition
261        # which is instantiated during stream_slices()
262        most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get(
263            record.associated_slice
264        )
265        try:
266            cursor_value = self._extract_cursor_value(record)
267
268            if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value:
269                self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value
270            elif most_recent_cursor_value > cursor_value:
271                self._is_ascending_order = False
272        except ValueError:
273            self._log_for_record_without_cursor_value()
274
275    def _extract_cursor_value(self, record: Record) -> Any:
276        return self._connector_state_converter.parse_value(self._cursor_field.extract_value(record))
277
278    def close_partition(self, partition: Partition) -> None:
279        with self._lock:
280            slice_count_before = len(self._concurrent_state.get("slices", []))
281            self._add_slice_to_state(partition)
282            if slice_count_before < len(
283                self._concurrent_state["slices"]
284            ):  # only emit if at least one slice has been processed
285                self._merge_partitions()
286                self._emit_state_message()
287        self._has_closed_at_least_one_slice = True
288
289    def _add_slice_to_state(self, partition: Partition) -> None:
290        most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get(
291            partition.to_slice()
292        )
293
294        if self._slice_boundary_fields:
295            if "slices" not in self._concurrent_state:
296                raise RuntimeError(
297                    f"The state for stream {self._stream_name} should have at least one slice to delineate the sync start time, but no slices are present. This is unexpected. Please contact Support."
298                )
299            self._concurrent_state["slices"].append(
300                {
301                    self._connector_state_converter.START_KEY: self._extract_from_slice(
302                        partition, self._slice_boundary_fields[self._START_BOUNDARY]
303                    ),
304                    self._connector_state_converter.END_KEY: self._extract_from_slice(
305                        partition, self._slice_boundary_fields[self._END_BOUNDARY]
306                    ),
307                    self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value,
308                }
309            )
310        elif most_recent_cursor_value:
311            if self._has_closed_at_least_one_slice:
312                # If we track state value using records cursor field, we can only do that if there is one partition. This is because we save
313                # the state every time we close a partition. We assume that if there are multiple slices, they need to be providing
314                # boundaries. There are cases where partitions could not have boundaries:
315                # * The cursor should be per-partition
316                # * The stream state is actually the parent stream state
317                # There might be other cases not listed above. Those are not supported today hence the stream should not use this cursor for
318                # state management. For the specific user that was affected with this issue, we need to:
319                # * Fix state tracking (which is currently broken)
320                # * Make the new version available
321                # * (Probably) ask the user to reset the stream to avoid data loss
322                raise ValueError(
323                    "Given that slice_boundary_fields is not defined and that per-partition state is not supported, only one slice is "
324                    "expected. Please contact the Airbyte team."
325                )
326
327            self._concurrent_state["slices"].append(
328                {
329                    self._connector_state_converter.START_KEY: self.start,
330                    self._connector_state_converter.END_KEY: most_recent_cursor_value,
331                    self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value,
332                }
333            )
334
335    def _emit_state_message(self) -> None:
336        self._connector_state_manager.update_state_for_stream(
337            self._stream_name,
338            self._stream_namespace,
339            self.state,
340        )
341        state_message = self._connector_state_manager.create_state_message(
342            self._stream_name, self._stream_namespace
343        )
344        self._message_repository.emit_message(state_message)
345
346    def _merge_partitions(self) -> None:
347        self._concurrent_state["slices"] = self._connector_state_converter.merge_intervals(
348            self._concurrent_state["slices"]
349        )
350
351    def _extract_from_slice(self, partition: Partition, key: str) -> CursorValueType:
352        try:
353            _slice = partition.to_slice()
354            if not _slice:
355                raise KeyError(f"Could not find key `{key}` in empty slice")
356            return self._connector_state_converter.parse_value(_slice[key])  # type: ignore  # we expect the devs to specify a key that would return a CursorValueType
357        except KeyError as exception:
358            raise KeyError(
359                f"Partition is expected to have key `{key}` but could not be found"
360            ) from exception
361
362    def ensure_at_least_one_state_emitted(self) -> None:
363        """
364        The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be
365        called.
366        """
367        self._emit_state_message()
368
369    def stream_slices(self) -> Iterable[StreamSlice]:
370        """
371        Generating slices based on a few parameters:
372        * lookback_window: Buffer to remove from END_KEY of the highest slice
373        * slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created
374        * start: `_split_per_slice_range` will clip any value to `self._start which means that:
375          * if upper is less than self._start, no slices will be generated
376          * if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case)
377
378        Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be
379        inclusive in the API that is queried.
380        """
381        self._merge_partitions()
382
383        if self._start is not None and self._is_start_before_first_slice():
384            yield from self._split_per_slice_range(
385                self._start,
386                self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY],
387                False,
388            )
389
390        if len(self._concurrent_state["slices"]) == 1:
391            yield from self._split_per_slice_range(
392                self._calculate_lower_boundary_of_last_slice(
393                    self._concurrent_state["slices"][0][self._connector_state_converter.END_KEY]
394                ),
395                self._end_provider(),
396                True,
397            )
398        elif len(self._concurrent_state["slices"]) > 1:
399            for i in range(len(self._concurrent_state["slices"]) - 1):
400                if self._cursor_granularity:
401                    yield from self._split_per_slice_range(
402                        self._concurrent_state["slices"][i][self._connector_state_converter.END_KEY]
403                        + self._cursor_granularity,
404                        self._concurrent_state["slices"][i + 1][
405                            self._connector_state_converter.START_KEY
406                        ],
407                        False,
408                    )
409                else:
410                    yield from self._split_per_slice_range(
411                        self._concurrent_state["slices"][i][
412                            self._connector_state_converter.END_KEY
413                        ],
414                        self._concurrent_state["slices"][i + 1][
415                            self._connector_state_converter.START_KEY
416                        ],
417                        False,
418                    )
419            yield from self._split_per_slice_range(
420                self._calculate_lower_boundary_of_last_slice(
421                    self._concurrent_state["slices"][-1][self._connector_state_converter.END_KEY]
422                ),
423                self._end_provider(),
424                True,
425            )
426        else:
427            raise ValueError("Expected at least one slice")
428
429    def _is_start_before_first_slice(self) -> bool:
430        return (
431            self._start is not None
432            and self._start
433            < self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY]
434        )
435
436    def _calculate_lower_boundary_of_last_slice(
437        self, lower_boundary: CursorValueType
438    ) -> CursorValueType:
439        if self._lookback_window:
440            return lower_boundary - self._lookback_window
441        return lower_boundary
442
443    def _split_per_slice_range(
444        self, lower: CursorValueType, upper: CursorValueType, upper_is_end: bool
445    ) -> Iterable[StreamSlice]:
446        if lower >= upper:
447            return
448
449        if self._start and upper < self._start:
450            return
451
452        lower = max(lower, self._start) if self._start else lower
453        if not self._slice_range or self._evaluate_upper_safely(lower, self._slice_range) >= upper:
454            clamped_lower = self._clamping_strategy.clamp(lower)
455            clamped_upper = self._clamping_strategy.clamp(upper)
456            start_value, end_value = (
457                (clamped_lower, clamped_upper - self._cursor_granularity)
458                if self._cursor_granularity and not upper_is_end
459                else (clamped_lower, clamped_upper)
460            )
461            yield StreamSlice(
462                partition={},
463                cursor_slice={
464                    self._slice_boundary_fields_wrapper[
465                        self._START_BOUNDARY
466                    ]: self._connector_state_converter.output_format(start_value),
467                    self._slice_boundary_fields_wrapper[
468                        self._END_BOUNDARY
469                    ]: self._connector_state_converter.output_format(end_value),
470                },
471            )
472        else:
473            stop_processing = False
474            current_lower_boundary = lower
475            while not stop_processing:
476                current_upper_boundary = min(
477                    self._evaluate_upper_safely(current_lower_boundary, self._slice_range), upper
478                )
479                has_reached_upper_boundary = current_upper_boundary >= upper
480
481                clamped_upper = (
482                    self._clamping_strategy.clamp(current_upper_boundary)
483                    if current_upper_boundary != upper
484                    else current_upper_boundary
485                )
486                clamped_lower = self._clamping_strategy.clamp(current_lower_boundary)
487                if clamped_lower >= clamped_upper:
488                    # clamping collapsed both values which means that it is time to stop processing
489                    # FIXME should this be replace by proper end_provider
490                    break
491                start_value, end_value = (
492                    (clamped_lower, clamped_upper - self._cursor_granularity)
493                    if self._cursor_granularity
494                    and (not upper_is_end or not has_reached_upper_boundary)
495                    else (clamped_lower, clamped_upper)
496                )
497                yield StreamSlice(
498                    partition={},
499                    cursor_slice={
500                        self._slice_boundary_fields_wrapper[
501                            self._START_BOUNDARY
502                        ]: self._connector_state_converter.output_format(start_value),
503                        self._slice_boundary_fields_wrapper[
504                            self._END_BOUNDARY
505                        ]: self._connector_state_converter.output_format(end_value),
506                    },
507                )
508                current_lower_boundary = clamped_upper
509                if current_upper_boundary >= upper:
510                    stop_processing = True
511
512    def _evaluate_upper_safely(self, lower: CursorValueType, step: GapType) -> CursorValueType:
513        """
514        Given that we set the default step at datetime.timedelta.max, we will generate an OverflowError when evaluating the next start_date
515        This method assumes that users would never enter a step that would generate an overflow. Given that would be the case, the code
516        would have broken anyway.
517        """
518        try:
519            return lower + step
520        except OverflowError:
521            return self._end_provider()
522
523    def should_be_synced(self, record: Record) -> bool:
524        """
525        Determines if a record should be synced based on its cursor value.
526        :param record: The record to evaluate
527
528        :return: True if the record's cursor value falls within the sync boundaries
529        """
530        try:
531            record_cursor_value: CursorValueType = self._extract_cursor_value(record)
532        except ValueError:
533            self._log_for_record_without_cursor_value()
534            return True
535        return self.start <= record_cursor_value <= self._end_provider()
536
537    def _log_for_record_without_cursor_value(self) -> None:
538        if not self._should_be_synced_logger_triggered:
539            LOGGER.warning(
540                f"Could not find cursor field `{self.cursor_field.cursor_field_key}` in record for stream {self._stream_name}. The incremental sync will assume it needs to be synced"
541            )
542            self._should_be_synced_logger_triggered = True
543
544    def reduce_slice_range(self, stream_slice: StreamSlice) -> StreamSlice:
545        # In theory, we might be more flexible here meaning that it doesn't need to be in ascending order but it just
546        # needs to be ordered. For now though, we will only support ascending order.
547        if not self._is_ascending_order:
548            LOGGER.warning(
549                "Attempting to reduce slice while records are not returned in incremental order might lead to missing records"
550            )
551
552        if stream_slice in self._most_recent_cursor_value_per_partition:
553            return StreamSlice(
554                partition=stream_slice.partition,
555                cursor_slice={
556                    self._slice_boundary_fields_wrapper[
557                        self._START_BOUNDARY
558                    ]: self._connector_state_converter.output_format(
559                        self._most_recent_cursor_value_per_partition[stream_slice]
560                    ),
561                    self._slice_boundary_fields_wrapper[
562                        self._END_BOUNDARY
563                    ]: stream_slice.cursor_slice[
564                        self._slice_boundary_fields_wrapper[self._END_BOUNDARY]
565                    ],
566                },
567                extra_fields=stream_slice.extra_fields,
568            )
569        else:
570            return stream_slice
LOGGER = <Logger airbyte (INFO)>
class CursorField:
41class CursorField:
42    def __init__(
43        self, cursor_field_key: str, supports_catalog_defined_cursor_field: bool = False
44    ) -> None:
45        self.cursor_field_key = cursor_field_key
46        self.supports_catalog_defined_cursor_field = supports_catalog_defined_cursor_field
47
48    def extract_value(self, record: Record) -> Any:
49        cursor_value = record.data.get(self.cursor_field_key)
50        if cursor_value is None:
51            raise ValueError(f"Could not find cursor field {self.cursor_field_key} in record")
52        return cursor_value  # type: ignore  # we assume that the value the path points at is a comparable
CursorField( cursor_field_key: str, supports_catalog_defined_cursor_field: bool = False)
42    def __init__(
43        self, cursor_field_key: str, supports_catalog_defined_cursor_field: bool = False
44    ) -> None:
45        self.cursor_field_key = cursor_field_key
46        self.supports_catalog_defined_cursor_field = supports_catalog_defined_cursor_field
cursor_field_key
supports_catalog_defined_cursor_field
def extract_value(self, record: airbyte_cdk.Record) -> Any:
48    def extract_value(self, record: Record) -> Any:
49        cursor_value = record.data.get(self.cursor_field_key)
50        if cursor_value is None:
51            raise ValueError(f"Could not find cursor field {self.cursor_field_key} in record")
52        return cursor_value  # type: ignore  # we assume that the value the path points at is a comparable
55class Cursor(StreamSlicer, ABC):
56    @property
57    @abstractmethod
58    def state(self) -> MutableMapping[str, Any]: ...
59
60    @abstractmethod
61    def observe(self, record: Record) -> None:
62        """
63        Indicate to the cursor that the record has been emitted
64        """
65        raise NotImplementedError()
66
67    @abstractmethod
68    def close_partition(self, partition: Partition) -> None:
69        """
70        Indicate to the cursor that the partition has been successfully processed
71        """
72        raise NotImplementedError()
73
74    @abstractmethod
75    def ensure_at_least_one_state_emitted(self) -> None:
76        """
77        State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per
78        stream. Hence, if no partitions are generated, this method needs to be called.
79        """
80        raise NotImplementedError()
81
82    @abstractmethod
83    def should_be_synced(self, record: Record) -> bool:
84        pass
85
86    def stream_slices(self) -> Iterable[StreamSlice]:
87        """
88        Default placeholder implementation of generate_slices.
89        Subclasses can override this method to provide actual behavior.
90        """
91        yield StreamSlice(partition={}, cursor_slice={})

Slices the stream into chunks that can be fetched independently. Slices enable state checkpointing and data retrieval parallelization.

state: MutableMapping[str, Any]
56    @property
57    @abstractmethod
58    def state(self) -> MutableMapping[str, Any]: ...
@abstractmethod
def observe(self, record: airbyte_cdk.Record) -> None:
60    @abstractmethod
61    def observe(self, record: Record) -> None:
62        """
63        Indicate to the cursor that the record has been emitted
64        """
65        raise NotImplementedError()

Indicate to the cursor that the record has been emitted

@abstractmethod
def close_partition( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition) -> None:
67    @abstractmethod
68    def close_partition(self, partition: Partition) -> None:
69        """
70        Indicate to the cursor that the partition has been successfully processed
71        """
72        raise NotImplementedError()

Indicate to the cursor that the partition has been successfully processed

@abstractmethod
def ensure_at_least_one_state_emitted(self) -> None:
74    @abstractmethod
75    def ensure_at_least_one_state_emitted(self) -> None:
76        """
77        State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per
78        stream. Hence, if no partitions are generated, this method needs to be called.
79        """
80        raise NotImplementedError()

State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per stream. Hence, if no partitions are generated, this method needs to be called.

@abstractmethod
def should_be_synced(self, record: airbyte_cdk.Record) -> bool:
82    @abstractmethod
83    def should_be_synced(self, record: Record) -> bool:
84        pass
def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
86    def stream_slices(self) -> Iterable[StreamSlice]:
87        """
88        Default placeholder implementation of generate_slices.
89        Subclasses can override this method to provide actual behavior.
90        """
91        yield StreamSlice(partition={}, cursor_slice={})

Default placeholder implementation of generate_slices. Subclasses can override this method to provide actual behavior.

class FinalStateCursor(Cursor):
 94class FinalStateCursor(Cursor):
 95    """Cursor that is used to guarantee at least one state message is emitted for a concurrent stream."""
 96
 97    def __init__(
 98        self,
 99        stream_name: str,
100        stream_namespace: Optional[str],
101        message_repository: MessageRepository,
102    ) -> None:
103        self._stream_name = stream_name
104        self._stream_namespace = stream_namespace
105        self._message_repository = message_repository
106        # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel
107        # state message rather than manage overall source state. This is also only temporary as we move to the resumable
108        # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state.
109        self._connector_state_manager = ConnectorStateManager()
110        self._has_closed_at_least_one_slice = False
111
112    @property
113    def state(self) -> MutableMapping[str, Any]:
114        return {NO_CURSOR_STATE_KEY: True}
115
116    def observe(self, record: Record) -> None:
117        pass
118
119    def close_partition(self, partition: Partition) -> None:
120        pass
121
122    def ensure_at_least_one_state_emitted(self) -> None:
123        """
124        Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync
125        """
126
127        self._connector_state_manager.update_state_for_stream(
128            self._stream_name, self._stream_namespace, self.state
129        )
130        state_message = self._connector_state_manager.create_state_message(
131            self._stream_name, self._stream_namespace
132        )
133        self._message_repository.emit_message(state_message)
134
135    def should_be_synced(self, record: Record) -> bool:
136        return True

Cursor that is used to guarantee at least one state message is emitted for a concurrent stream.

FinalStateCursor( stream_name: str, stream_namespace: Optional[str], message_repository: airbyte_cdk.MessageRepository)
 97    def __init__(
 98        self,
 99        stream_name: str,
100        stream_namespace: Optional[str],
101        message_repository: MessageRepository,
102    ) -> None:
103        self._stream_name = stream_name
104        self._stream_namespace = stream_namespace
105        self._message_repository = message_repository
106        # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel
107        # state message rather than manage overall source state. This is also only temporary as we move to the resumable
108        # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state.
109        self._connector_state_manager = ConnectorStateManager()
110        self._has_closed_at_least_one_slice = False
state: MutableMapping[str, Any]
112    @property
113    def state(self) -> MutableMapping[str, Any]:
114        return {NO_CURSOR_STATE_KEY: True}
def observe(self, record: airbyte_cdk.Record) -> None:
116    def observe(self, record: Record) -> None:
117        pass

Indicate to the cursor that the record has been emitted

def close_partition( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition) -> None:
119    def close_partition(self, partition: Partition) -> None:
120        pass

Indicate to the cursor that the partition has been successfully processed

def ensure_at_least_one_state_emitted(self) -> None:
122    def ensure_at_least_one_state_emitted(self) -> None:
123        """
124        Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync
125        """
126
127        self._connector_state_manager.update_state_for_stream(
128            self._stream_name, self._stream_namespace, self.state
129        )
130        state_message = self._connector_state_manager.create_state_message(
131            self._stream_name, self._stream_namespace
132        )
133        self._message_repository.emit_message(state_message)

Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync

def should_be_synced(self, record: airbyte_cdk.Record) -> bool:
135    def should_be_synced(self, record: Record) -> bool:
136        return True
Inherited Members
Cursor
stream_slices
class ConcurrentCursor(Cursor):
139class ConcurrentCursor(Cursor):
140    _START_BOUNDARY = 0
141    _END_BOUNDARY = 1
142
143    def copy_without_state(self) -> "ConcurrentCursor":
144        return self.__class__(
145            stream_name=self._stream_name,
146            stream_namespace=self._stream_namespace,
147            stream_state={},
148            message_repository=NoopMessageRepository(),
149            connector_state_manager=ConnectorStateManager(),
150            connector_state_converter=self._connector_state_converter,
151            cursor_field=self._cursor_field,
152            slice_boundary_fields=self._slice_boundary_fields,
153            start=self._start,
154            end_provider=self._end_provider,
155            lookback_window=self._lookback_window,
156            slice_range=self._slice_range,
157            cursor_granularity=self._cursor_granularity,
158            clamping_strategy=self._clamping_strategy,
159        )
160
161    def __init__(
162        self,
163        stream_name: str,
164        stream_namespace: Optional[str],
165        stream_state: Any,
166        message_repository: MessageRepository,
167        connector_state_manager: ConnectorStateManager,
168        connector_state_converter: AbstractStreamStateConverter,
169        cursor_field: CursorField,
170        slice_boundary_fields: Optional[Tuple[str, str]],
171        start: Optional[CursorValueType],
172        end_provider: Callable[[], CursorValueType],
173        lookback_window: Optional[GapType] = None,
174        slice_range: Optional[GapType] = None,
175        cursor_granularity: Optional[GapType] = None,
176        clamping_strategy: ClampingStrategy = NoClamping(),
177    ) -> None:
178        self._stream_name = stream_name
179        self._stream_namespace = stream_namespace
180        self._message_repository = message_repository
181        self._connector_state_converter = connector_state_converter
182        self._connector_state_manager = connector_state_manager
183        self._cursor_field = cursor_field
184        # To see some example where the slice boundaries might not be defined, check https://github.com/airbytehq/airbyte/blob/1ce84d6396e446e1ac2377362446e3fb94509461/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py#L363-L379
185        self._slice_boundary_fields = slice_boundary_fields
186        self._start = start
187        self._end_provider = end_provider
188        self.start, self._concurrent_state = self._get_concurrent_state(stream_state)
189        self._lookback_window = lookback_window
190        self._slice_range = slice_range
191        self._most_recent_cursor_value_per_partition: MutableMapping[
192            Union[StreamSlice, Mapping[str, Any], None], Any
193        ] = {}
194        self._has_closed_at_least_one_slice = False
195        self._cursor_granularity = cursor_granularity
196        # Flag to track if the logger has been triggered (per stream)
197        self._should_be_synced_logger_triggered = False
198        self._clamping_strategy = clamping_strategy
199        self._is_ascending_order = True
200
201        # A lock is required when closing a partition because updating the cursor's concurrent_state is
202        # not thread safe. When multiple partitions are being closed by the cursor at the same time, it is
203        # possible for one partition to update concurrent_state after a second partition has already read
204        # the previous state. This can lead to the second partition overwriting the previous one's state.
205        self._lock = threading.Lock()
206
207    @property
208    def state(self) -> MutableMapping[str, Any]:
209        return self._connector_state_converter.convert_to_state_message(
210            self.cursor_field, self._concurrent_state
211        )
212
213    @property
214    def cursor_field(self) -> CursorField:
215        return self._cursor_field
216
217    @property
218    def _slice_boundary_fields_wrapper(self) -> Tuple[str, str]:
219        return (
220            self._slice_boundary_fields
221            if self._slice_boundary_fields
222            else (
223                self._connector_state_converter.START_KEY,
224                self._connector_state_converter.END_KEY,
225            )
226        )
227
228    def _get_concurrent_state(
229        self, state: MutableMapping[str, Any]
230    ) -> Tuple[CursorValueType, MutableMapping[str, Any]]:
231        if self._connector_state_converter.is_state_message_compatible(state):
232            partitioned_state = self._connector_state_converter.deserialize(state)
233            slices_from_partitioned_state = partitioned_state.get("slices", [])
234
235            value_from_partitioned_state = None
236            if slices_from_partitioned_state:
237                # We assume here that the slices have been already merged
238                first_slice = slices_from_partitioned_state[0]
239                value_from_partitioned_state = (
240                    first_slice[self._connector_state_converter.MOST_RECENT_RECORD_KEY]
241                    if self._connector_state_converter.MOST_RECENT_RECORD_KEY in first_slice
242                    else first_slice[self._connector_state_converter.END_KEY]
243                )
244            return (
245                value_from_partitioned_state
246                or self._start
247                or self._connector_state_converter.zero_value,
248                partitioned_state,
249            )
250        return self._connector_state_converter.convert_from_sequential_state(
251            self._cursor_field, state, self._start
252        )
253
254    def observe(self, record: Record) -> None:
255        # Because observe writes to the most_recent_cursor_value_per_partition mapping,
256        # it is not thread-safe. However, this shouldn't lead to concurrency issues because
257        # observe() is only invoked by PartitionReader.process_partition(). Since the map is
258        # broken down according to partition, concurrent threads processing only read/write
259        # from different keys which avoids any conflicts.
260        #
261        # If we were to add thread safety, we should implement a lock per-partition
262        # which is instantiated during stream_slices()
263        most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get(
264            record.associated_slice
265        )
266        try:
267            cursor_value = self._extract_cursor_value(record)
268
269            if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value:
270                self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value
271            elif most_recent_cursor_value > cursor_value:
272                self._is_ascending_order = False
273        except ValueError:
274            self._log_for_record_without_cursor_value()
275
276    def _extract_cursor_value(self, record: Record) -> Any:
277        return self._connector_state_converter.parse_value(self._cursor_field.extract_value(record))
278
279    def close_partition(self, partition: Partition) -> None:
280        with self._lock:
281            slice_count_before = len(self._concurrent_state.get("slices", []))
282            self._add_slice_to_state(partition)
283            if slice_count_before < len(
284                self._concurrent_state["slices"]
285            ):  # only emit if at least one slice has been processed
286                self._merge_partitions()
287                self._emit_state_message()
288        self._has_closed_at_least_one_slice = True
289
290    def _add_slice_to_state(self, partition: Partition) -> None:
291        most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get(
292            partition.to_slice()
293        )
294
295        if self._slice_boundary_fields:
296            if "slices" not in self._concurrent_state:
297                raise RuntimeError(
298                    f"The state for stream {self._stream_name} should have at least one slice to delineate the sync start time, but no slices are present. This is unexpected. Please contact Support."
299                )
300            self._concurrent_state["slices"].append(
301                {
302                    self._connector_state_converter.START_KEY: self._extract_from_slice(
303                        partition, self._slice_boundary_fields[self._START_BOUNDARY]
304                    ),
305                    self._connector_state_converter.END_KEY: self._extract_from_slice(
306                        partition, self._slice_boundary_fields[self._END_BOUNDARY]
307                    ),
308                    self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value,
309                }
310            )
311        elif most_recent_cursor_value:
312            if self._has_closed_at_least_one_slice:
313                # If we track state value using records cursor field, we can only do that if there is one partition. This is because we save
314                # the state every time we close a partition. We assume that if there are multiple slices, they need to be providing
315                # boundaries. There are cases where partitions could not have boundaries:
316                # * The cursor should be per-partition
317                # * The stream state is actually the parent stream state
318                # There might be other cases not listed above. Those are not supported today hence the stream should not use this cursor for
319                # state management. For the specific user that was affected with this issue, we need to:
320                # * Fix state tracking (which is currently broken)
321                # * Make the new version available
322                # * (Probably) ask the user to reset the stream to avoid data loss
323                raise ValueError(
324                    "Given that slice_boundary_fields is not defined and that per-partition state is not supported, only one slice is "
325                    "expected. Please contact the Airbyte team."
326                )
327
328            self._concurrent_state["slices"].append(
329                {
330                    self._connector_state_converter.START_KEY: self.start,
331                    self._connector_state_converter.END_KEY: most_recent_cursor_value,
332                    self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value,
333                }
334            )
335
336    def _emit_state_message(self) -> None:
337        self._connector_state_manager.update_state_for_stream(
338            self._stream_name,
339            self._stream_namespace,
340            self.state,
341        )
342        state_message = self._connector_state_manager.create_state_message(
343            self._stream_name, self._stream_namespace
344        )
345        self._message_repository.emit_message(state_message)
346
347    def _merge_partitions(self) -> None:
348        self._concurrent_state["slices"] = self._connector_state_converter.merge_intervals(
349            self._concurrent_state["slices"]
350        )
351
352    def _extract_from_slice(self, partition: Partition, key: str) -> CursorValueType:
353        try:
354            _slice = partition.to_slice()
355            if not _slice:
356                raise KeyError(f"Could not find key `{key}` in empty slice")
357            return self._connector_state_converter.parse_value(_slice[key])  # type: ignore  # we expect the devs to specify a key that would return a CursorValueType
358        except KeyError as exception:
359            raise KeyError(
360                f"Partition is expected to have key `{key}` but could not be found"
361            ) from exception
362
363    def ensure_at_least_one_state_emitted(self) -> None:
364        """
365        The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be
366        called.
367        """
368        self._emit_state_message()
369
370    def stream_slices(self) -> Iterable[StreamSlice]:
371        """
372        Generating slices based on a few parameters:
373        * lookback_window: Buffer to remove from END_KEY of the highest slice
374        * slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created
375        * start: `_split_per_slice_range` will clip any value to `self._start which means that:
376          * if upper is less than self._start, no slices will be generated
377          * if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case)
378
379        Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be
380        inclusive in the API that is queried.
381        """
382        self._merge_partitions()
383
384        if self._start is not None and self._is_start_before_first_slice():
385            yield from self._split_per_slice_range(
386                self._start,
387                self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY],
388                False,
389            )
390
391        if len(self._concurrent_state["slices"]) == 1:
392            yield from self._split_per_slice_range(
393                self._calculate_lower_boundary_of_last_slice(
394                    self._concurrent_state["slices"][0][self._connector_state_converter.END_KEY]
395                ),
396                self._end_provider(),
397                True,
398            )
399        elif len(self._concurrent_state["slices"]) > 1:
400            for i in range(len(self._concurrent_state["slices"]) - 1):
401                if self._cursor_granularity:
402                    yield from self._split_per_slice_range(
403                        self._concurrent_state["slices"][i][self._connector_state_converter.END_KEY]
404                        + self._cursor_granularity,
405                        self._concurrent_state["slices"][i + 1][
406                            self._connector_state_converter.START_KEY
407                        ],
408                        False,
409                    )
410                else:
411                    yield from self._split_per_slice_range(
412                        self._concurrent_state["slices"][i][
413                            self._connector_state_converter.END_KEY
414                        ],
415                        self._concurrent_state["slices"][i + 1][
416                            self._connector_state_converter.START_KEY
417                        ],
418                        False,
419                    )
420            yield from self._split_per_slice_range(
421                self._calculate_lower_boundary_of_last_slice(
422                    self._concurrent_state["slices"][-1][self._connector_state_converter.END_KEY]
423                ),
424                self._end_provider(),
425                True,
426            )
427        else:
428            raise ValueError("Expected at least one slice")
429
430    def _is_start_before_first_slice(self) -> bool:
431        return (
432            self._start is not None
433            and self._start
434            < self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY]
435        )
436
437    def _calculate_lower_boundary_of_last_slice(
438        self, lower_boundary: CursorValueType
439    ) -> CursorValueType:
440        if self._lookback_window:
441            return lower_boundary - self._lookback_window
442        return lower_boundary
443
444    def _split_per_slice_range(
445        self, lower: CursorValueType, upper: CursorValueType, upper_is_end: bool
446    ) -> Iterable[StreamSlice]:
447        if lower >= upper:
448            return
449
450        if self._start and upper < self._start:
451            return
452
453        lower = max(lower, self._start) if self._start else lower
454        if not self._slice_range or self._evaluate_upper_safely(lower, self._slice_range) >= upper:
455            clamped_lower = self._clamping_strategy.clamp(lower)
456            clamped_upper = self._clamping_strategy.clamp(upper)
457            start_value, end_value = (
458                (clamped_lower, clamped_upper - self._cursor_granularity)
459                if self._cursor_granularity and not upper_is_end
460                else (clamped_lower, clamped_upper)
461            )
462            yield StreamSlice(
463                partition={},
464                cursor_slice={
465                    self._slice_boundary_fields_wrapper[
466                        self._START_BOUNDARY
467                    ]: self._connector_state_converter.output_format(start_value),
468                    self._slice_boundary_fields_wrapper[
469                        self._END_BOUNDARY
470                    ]: self._connector_state_converter.output_format(end_value),
471                },
472            )
473        else:
474            stop_processing = False
475            current_lower_boundary = lower
476            while not stop_processing:
477                current_upper_boundary = min(
478                    self._evaluate_upper_safely(current_lower_boundary, self._slice_range), upper
479                )
480                has_reached_upper_boundary = current_upper_boundary >= upper
481
482                clamped_upper = (
483                    self._clamping_strategy.clamp(current_upper_boundary)
484                    if current_upper_boundary != upper
485                    else current_upper_boundary
486                )
487                clamped_lower = self._clamping_strategy.clamp(current_lower_boundary)
488                if clamped_lower >= clamped_upper:
489                    # clamping collapsed both values which means that it is time to stop processing
490                    # FIXME should this be replace by proper end_provider
491                    break
492                start_value, end_value = (
493                    (clamped_lower, clamped_upper - self._cursor_granularity)
494                    if self._cursor_granularity
495                    and (not upper_is_end or not has_reached_upper_boundary)
496                    else (clamped_lower, clamped_upper)
497                )
498                yield StreamSlice(
499                    partition={},
500                    cursor_slice={
501                        self._slice_boundary_fields_wrapper[
502                            self._START_BOUNDARY
503                        ]: self._connector_state_converter.output_format(start_value),
504                        self._slice_boundary_fields_wrapper[
505                            self._END_BOUNDARY
506                        ]: self._connector_state_converter.output_format(end_value),
507                    },
508                )
509                current_lower_boundary = clamped_upper
510                if current_upper_boundary >= upper:
511                    stop_processing = True
512
513    def _evaluate_upper_safely(self, lower: CursorValueType, step: GapType) -> CursorValueType:
514        """
515        Given that we set the default step at datetime.timedelta.max, we will generate an OverflowError when evaluating the next start_date
516        This method assumes that users would never enter a step that would generate an overflow. Given that would be the case, the code
517        would have broken anyway.
518        """
519        try:
520            return lower + step
521        except OverflowError:
522            return self._end_provider()
523
524    def should_be_synced(self, record: Record) -> bool:
525        """
526        Determines if a record should be synced based on its cursor value.
527        :param record: The record to evaluate
528
529        :return: True if the record's cursor value falls within the sync boundaries
530        """
531        try:
532            record_cursor_value: CursorValueType = self._extract_cursor_value(record)
533        except ValueError:
534            self._log_for_record_without_cursor_value()
535            return True
536        return self.start <= record_cursor_value <= self._end_provider()
537
538    def _log_for_record_without_cursor_value(self) -> None:
539        if not self._should_be_synced_logger_triggered:
540            LOGGER.warning(
541                f"Could not find cursor field `{self.cursor_field.cursor_field_key}` in record for stream {self._stream_name}. The incremental sync will assume it needs to be synced"
542            )
543            self._should_be_synced_logger_triggered = True
544
545    def reduce_slice_range(self, stream_slice: StreamSlice) -> StreamSlice:
546        # In theory, we might be more flexible here meaning that it doesn't need to be in ascending order but it just
547        # needs to be ordered. For now though, we will only support ascending order.
548        if not self._is_ascending_order:
549            LOGGER.warning(
550                "Attempting to reduce slice while records are not returned in incremental order might lead to missing records"
551            )
552
553        if stream_slice in self._most_recent_cursor_value_per_partition:
554            return StreamSlice(
555                partition=stream_slice.partition,
556                cursor_slice={
557                    self._slice_boundary_fields_wrapper[
558                        self._START_BOUNDARY
559                    ]: self._connector_state_converter.output_format(
560                        self._most_recent_cursor_value_per_partition[stream_slice]
561                    ),
562                    self._slice_boundary_fields_wrapper[
563                        self._END_BOUNDARY
564                    ]: stream_slice.cursor_slice[
565                        self._slice_boundary_fields_wrapper[self._END_BOUNDARY]
566                    ],
567                },
568                extra_fields=stream_slice.extra_fields,
569            )
570        else:
571            return stream_slice

Slices the stream into chunks that can be fetched independently. Slices enable state checkpointing and data retrieval parallelization.

ConcurrentCursor( stream_name: str, stream_namespace: Optional[str], stream_state: Any, message_repository: airbyte_cdk.MessageRepository, connector_state_manager: airbyte_cdk.ConnectorStateManager, connector_state_converter: airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter.AbstractStreamStateConverter, cursor_field: CursorField, slice_boundary_fields: Optional[Tuple[str, str]], start: Optional[airbyte_cdk.sources.streams.concurrent.cursor_types.CursorValueType], end_provider: Callable[[], airbyte_cdk.sources.streams.concurrent.cursor_types.CursorValueType], lookback_window: Optional[airbyte_cdk.sources.streams.concurrent.cursor_types.GapType] = None, slice_range: Optional[airbyte_cdk.sources.streams.concurrent.cursor_types.GapType] = None, cursor_granularity: Optional[airbyte_cdk.sources.streams.concurrent.cursor_types.GapType] = None, clamping_strategy: airbyte_cdk.sources.streams.concurrent.clamping.ClampingStrategy = <airbyte_cdk.sources.streams.concurrent.clamping.NoClamping object>)
161    def __init__(
162        self,
163        stream_name: str,
164        stream_namespace: Optional[str],
165        stream_state: Any,
166        message_repository: MessageRepository,
167        connector_state_manager: ConnectorStateManager,
168        connector_state_converter: AbstractStreamStateConverter,
169        cursor_field: CursorField,
170        slice_boundary_fields: Optional[Tuple[str, str]],
171        start: Optional[CursorValueType],
172        end_provider: Callable[[], CursorValueType],
173        lookback_window: Optional[GapType] = None,
174        slice_range: Optional[GapType] = None,
175        cursor_granularity: Optional[GapType] = None,
176        clamping_strategy: ClampingStrategy = NoClamping(),
177    ) -> None:
178        self._stream_name = stream_name
179        self._stream_namespace = stream_namespace
180        self._message_repository = message_repository
181        self._connector_state_converter = connector_state_converter
182        self._connector_state_manager = connector_state_manager
183        self._cursor_field = cursor_field
184        # To see some example where the slice boundaries might not be defined, check https://github.com/airbytehq/airbyte/blob/1ce84d6396e446e1ac2377362446e3fb94509461/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py#L363-L379
185        self._slice_boundary_fields = slice_boundary_fields
186        self._start = start
187        self._end_provider = end_provider
188        self.start, self._concurrent_state = self._get_concurrent_state(stream_state)
189        self._lookback_window = lookback_window
190        self._slice_range = slice_range
191        self._most_recent_cursor_value_per_partition: MutableMapping[
192            Union[StreamSlice, Mapping[str, Any], None], Any
193        ] = {}
194        self._has_closed_at_least_one_slice = False
195        self._cursor_granularity = cursor_granularity
196        # Flag to track if the logger has been triggered (per stream)
197        self._should_be_synced_logger_triggered = False
198        self._clamping_strategy = clamping_strategy
199        self._is_ascending_order = True
200
201        # A lock is required when closing a partition because updating the cursor's concurrent_state is
202        # not thread safe. When multiple partitions are being closed by the cursor at the same time, it is
203        # possible for one partition to update concurrent_state after a second partition has already read
204        # the previous state. This can lead to the second partition overwriting the previous one's state.
205        self._lock = threading.Lock()
def copy_without_state(self) -> ConcurrentCursor:
143    def copy_without_state(self) -> "ConcurrentCursor":
144        return self.__class__(
145            stream_name=self._stream_name,
146            stream_namespace=self._stream_namespace,
147            stream_state={},
148            message_repository=NoopMessageRepository(),
149            connector_state_manager=ConnectorStateManager(),
150            connector_state_converter=self._connector_state_converter,
151            cursor_field=self._cursor_field,
152            slice_boundary_fields=self._slice_boundary_fields,
153            start=self._start,
154            end_provider=self._end_provider,
155            lookback_window=self._lookback_window,
156            slice_range=self._slice_range,
157            cursor_granularity=self._cursor_granularity,
158            clamping_strategy=self._clamping_strategy,
159        )
state: MutableMapping[str, Any]
207    @property
208    def state(self) -> MutableMapping[str, Any]:
209        return self._connector_state_converter.convert_to_state_message(
210            self.cursor_field, self._concurrent_state
211        )
cursor_field: CursorField
213    @property
214    def cursor_field(self) -> CursorField:
215        return self._cursor_field
def observe(self, record: airbyte_cdk.Record) -> None:
254    def observe(self, record: Record) -> None:
255        # Because observe writes to the most_recent_cursor_value_per_partition mapping,
256        # it is not thread-safe. However, this shouldn't lead to concurrency issues because
257        # observe() is only invoked by PartitionReader.process_partition(). Since the map is
258        # broken down according to partition, concurrent threads processing only read/write
259        # from different keys which avoids any conflicts.
260        #
261        # If we were to add thread safety, we should implement a lock per-partition
262        # which is instantiated during stream_slices()
263        most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get(
264            record.associated_slice
265        )
266        try:
267            cursor_value = self._extract_cursor_value(record)
268
269            if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value:
270                self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value
271            elif most_recent_cursor_value > cursor_value:
272                self._is_ascending_order = False
273        except ValueError:
274            self._log_for_record_without_cursor_value()

Indicate to the cursor that the record has been emitted

def close_partition( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition) -> None:
279    def close_partition(self, partition: Partition) -> None:
280        with self._lock:
281            slice_count_before = len(self._concurrent_state.get("slices", []))
282            self._add_slice_to_state(partition)
283            if slice_count_before < len(
284                self._concurrent_state["slices"]
285            ):  # only emit if at least one slice has been processed
286                self._merge_partitions()
287                self._emit_state_message()
288        self._has_closed_at_least_one_slice = True

Indicate to the cursor that the partition has been successfully processed

def ensure_at_least_one_state_emitted(self) -> None:
363    def ensure_at_least_one_state_emitted(self) -> None:
364        """
365        The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be
366        called.
367        """
368        self._emit_state_message()

The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be called.

def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
370    def stream_slices(self) -> Iterable[StreamSlice]:
371        """
372        Generating slices based on a few parameters:
373        * lookback_window: Buffer to remove from END_KEY of the highest slice
374        * slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created
375        * start: `_split_per_slice_range` will clip any value to `self._start which means that:
376          * if upper is less than self._start, no slices will be generated
377          * if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case)
378
379        Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be
380        inclusive in the API that is queried.
381        """
382        self._merge_partitions()
383
384        if self._start is not None and self._is_start_before_first_slice():
385            yield from self._split_per_slice_range(
386                self._start,
387                self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY],
388                False,
389            )
390
391        if len(self._concurrent_state["slices"]) == 1:
392            yield from self._split_per_slice_range(
393                self._calculate_lower_boundary_of_last_slice(
394                    self._concurrent_state["slices"][0][self._connector_state_converter.END_KEY]
395                ),
396                self._end_provider(),
397                True,
398            )
399        elif len(self._concurrent_state["slices"]) > 1:
400            for i in range(len(self._concurrent_state["slices"]) - 1):
401                if self._cursor_granularity:
402                    yield from self._split_per_slice_range(
403                        self._concurrent_state["slices"][i][self._connector_state_converter.END_KEY]
404                        + self._cursor_granularity,
405                        self._concurrent_state["slices"][i + 1][
406                            self._connector_state_converter.START_KEY
407                        ],
408                        False,
409                    )
410                else:
411                    yield from self._split_per_slice_range(
412                        self._concurrent_state["slices"][i][
413                            self._connector_state_converter.END_KEY
414                        ],
415                        self._concurrent_state["slices"][i + 1][
416                            self._connector_state_converter.START_KEY
417                        ],
418                        False,
419                    )
420            yield from self._split_per_slice_range(
421                self._calculate_lower_boundary_of_last_slice(
422                    self._concurrent_state["slices"][-1][self._connector_state_converter.END_KEY]
423                ),
424                self._end_provider(),
425                True,
426            )
427        else:
428            raise ValueError("Expected at least one slice")

Generating slices based on a few parameters:

  • lookback_window: Buffer to remove from END_KEY of the highest slice
  • slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created
  • start: _split_per_slice_range will clip any value to `self._start which means that:
    • if upper is less than self._start, no slices will be generated
    • if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case)

Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be inclusive in the API that is queried.

def should_be_synced(self, record: airbyte_cdk.Record) -> bool:
524    def should_be_synced(self, record: Record) -> bool:
525        """
526        Determines if a record should be synced based on its cursor value.
527        :param record: The record to evaluate
528
529        :return: True if the record's cursor value falls within the sync boundaries
530        """
531        try:
532            record_cursor_value: CursorValueType = self._extract_cursor_value(record)
533        except ValueError:
534            self._log_for_record_without_cursor_value()
535            return True
536        return self.start <= record_cursor_value <= self._end_provider()

Determines if a record should be synced based on its cursor value.

Parameters
  • record: The record to evaluate
Returns

True if the record's cursor value falls within the sync boundaries

def reduce_slice_range( self, stream_slice: airbyte_cdk.StreamSlice) -> airbyte_cdk.StreamSlice:
545    def reduce_slice_range(self, stream_slice: StreamSlice) -> StreamSlice:
546        # In theory, we might be more flexible here meaning that it doesn't need to be in ascending order but it just
547        # needs to be ordered. For now though, we will only support ascending order.
548        if not self._is_ascending_order:
549            LOGGER.warning(
550                "Attempting to reduce slice while records are not returned in incremental order might lead to missing records"
551            )
552
553        if stream_slice in self._most_recent_cursor_value_per_partition:
554            return StreamSlice(
555                partition=stream_slice.partition,
556                cursor_slice={
557                    self._slice_boundary_fields_wrapper[
558                        self._START_BOUNDARY
559                    ]: self._connector_state_converter.output_format(
560                        self._most_recent_cursor_value_per_partition[stream_slice]
561                    ),
562                    self._slice_boundary_fields_wrapper[
563                        self._END_BOUNDARY
564                    ]: stream_slice.cursor_slice[
565                        self._slice_boundary_fields_wrapper[self._END_BOUNDARY]
566                    ],
567                },
568                extra_fields=stream_slice.extra_fields,
569            )
570        else:
571            return stream_slice