airbyte_cdk.sources.streams.concurrent.cursor

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5import functools
  6import logging
  7from abc import ABC, abstractmethod
  8from typing import (
  9    Any,
 10    Callable,
 11    Iterable,
 12    List,
 13    Mapping,
 14    MutableMapping,
 15    Optional,
 16    Tuple,
 17    Union,
 18)
 19
 20from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
 21from airbyte_cdk.sources.message import MessageRepository
 22from airbyte_cdk.sources.streams import NO_CURSOR_STATE_KEY
 23from airbyte_cdk.sources.streams.concurrent.clamping import ClampingStrategy, NoClamping
 24from airbyte_cdk.sources.streams.concurrent.cursor_types import CursorValueType, GapType
 25from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
 26from airbyte_cdk.sources.streams.concurrent.partitions.stream_slicer import StreamSlicer
 27from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import (
 28    AbstractStreamStateConverter,
 29)
 30from airbyte_cdk.sources.types import Record, StreamSlice
 31
 32LOGGER = logging.getLogger("airbyte")
 33
 34
 35def _extract_value(mapping: Mapping[str, Any], path: List[str]) -> Any:
 36    return functools.reduce(lambda a, b: a[b], path, mapping)
 37
 38
 39class CursorField:
 40    def __init__(self, cursor_field_key: str) -> None:
 41        self.cursor_field_key = cursor_field_key
 42
 43    def extract_value(self, record: Record) -> CursorValueType:
 44        cursor_value = record.data.get(self.cursor_field_key)
 45        if cursor_value is None:
 46            raise ValueError(f"Could not find cursor field {self.cursor_field_key} in record")
 47        return cursor_value  # type: ignore  # we assume that the value the path points at is a comparable
 48
 49
 50class Cursor(StreamSlicer, ABC):
 51    @property
 52    @abstractmethod
 53    def state(self) -> MutableMapping[str, Any]: ...
 54
 55    @abstractmethod
 56    def observe(self, record: Record) -> None:
 57        """
 58        Indicate to the cursor that the record has been emitted
 59        """
 60        raise NotImplementedError()
 61
 62    @abstractmethod
 63    def close_partition(self, partition: Partition) -> None:
 64        """
 65        Indicate to the cursor that the partition has been successfully processed
 66        """
 67        raise NotImplementedError()
 68
 69    @abstractmethod
 70    def ensure_at_least_one_state_emitted(self) -> None:
 71        """
 72        State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per
 73        stream. Hence, if no partitions are generated, this method needs to be called.
 74        """
 75        raise NotImplementedError()
 76
 77    def stream_slices(self) -> Iterable[StreamSlice]:
 78        """
 79        Default placeholder implementation of generate_slices.
 80        Subclasses can override this method to provide actual behavior.
 81        """
 82        yield StreamSlice(partition={}, cursor_slice={})
 83
 84
 85class FinalStateCursor(Cursor):
 86    """Cursor that is used to guarantee at least one state message is emitted for a concurrent stream."""
 87
 88    def __init__(
 89        self,
 90        stream_name: str,
 91        stream_namespace: Optional[str],
 92        message_repository: MessageRepository,
 93    ) -> None:
 94        self._stream_name = stream_name
 95        self._stream_namespace = stream_namespace
 96        self._message_repository = message_repository
 97        # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel
 98        # state message rather than manage overall source state. This is also only temporary as we move to the resumable
 99        # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state.
100        self._connector_state_manager = ConnectorStateManager()
101        self._has_closed_at_least_one_slice = False
102
103    @property
104    def state(self) -> MutableMapping[str, Any]:
105        return {NO_CURSOR_STATE_KEY: True}
106
107    def observe(self, record: Record) -> None:
108        pass
109
110    def close_partition(self, partition: Partition) -> None:
111        pass
112
113    def ensure_at_least_one_state_emitted(self) -> None:
114        """
115        Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync
116        """
117
118        self._connector_state_manager.update_state_for_stream(
119            self._stream_name, self._stream_namespace, self.state
120        )
121        state_message = self._connector_state_manager.create_state_message(
122            self._stream_name, self._stream_namespace
123        )
124        self._message_repository.emit_message(state_message)
125
126
127class ConcurrentCursor(Cursor):
128    _START_BOUNDARY = 0
129    _END_BOUNDARY = 1
130
131    def __init__(
132        self,
133        stream_name: str,
134        stream_namespace: Optional[str],
135        stream_state: Any,
136        message_repository: MessageRepository,
137        connector_state_manager: ConnectorStateManager,
138        connector_state_converter: AbstractStreamStateConverter,
139        cursor_field: CursorField,
140        slice_boundary_fields: Optional[Tuple[str, str]],
141        start: Optional[CursorValueType],
142        end_provider: Callable[[], CursorValueType],
143        lookback_window: Optional[GapType] = None,
144        slice_range: Optional[GapType] = None,
145        cursor_granularity: Optional[GapType] = None,
146        clamping_strategy: ClampingStrategy = NoClamping(),
147    ) -> None:
148        self._stream_name = stream_name
149        self._stream_namespace = stream_namespace
150        self._message_repository = message_repository
151        self._connector_state_converter = connector_state_converter
152        self._connector_state_manager = connector_state_manager
153        self._cursor_field = cursor_field
154        # To see some example where the slice boundaries might not be defined, check https://github.com/airbytehq/airbyte/blob/1ce84d6396e446e1ac2377362446e3fb94509461/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py#L363-L379
155        self._slice_boundary_fields = slice_boundary_fields
156        self._start = start
157        self._end_provider = end_provider
158        self.start, self._concurrent_state = self._get_concurrent_state(stream_state)
159        self._lookback_window = lookback_window
160        self._slice_range = slice_range
161        self._most_recent_cursor_value_per_partition: MutableMapping[
162            Union[StreamSlice, Mapping[str, Any], None], Any
163        ] = {}
164        self._has_closed_at_least_one_slice = False
165        self._cursor_granularity = cursor_granularity
166        # Flag to track if the logger has been triggered (per stream)
167        self._should_be_synced_logger_triggered = False
168        self._clamping_strategy = clamping_strategy
169
170    @property
171    def state(self) -> MutableMapping[str, Any]:
172        return self._connector_state_converter.convert_to_state_message(
173            self.cursor_field, self._concurrent_state
174        )
175
176    @property
177    def cursor_field(self) -> CursorField:
178        return self._cursor_field
179
180    @property
181    def _slice_boundary_fields_wrapper(self) -> Tuple[str, str]:
182        return (
183            self._slice_boundary_fields
184            if self._slice_boundary_fields
185            else (
186                self._connector_state_converter.START_KEY,
187                self._connector_state_converter.END_KEY,
188            )
189        )
190
191    def _get_concurrent_state(
192        self, state: MutableMapping[str, Any]
193    ) -> Tuple[CursorValueType, MutableMapping[str, Any]]:
194        if self._connector_state_converter.is_state_message_compatible(state):
195            return (
196                self._start or self._connector_state_converter.zero_value,
197                self._connector_state_converter.deserialize(state),
198            )
199        return self._connector_state_converter.convert_from_sequential_state(
200            self._cursor_field, state, self._start
201        )
202
203    def observe(self, record: Record) -> None:
204        most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get(
205            record.associated_slice
206        )
207        try:
208            cursor_value = self._extract_cursor_value(record)
209
210            if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value:
211                self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value
212        except ValueError:
213            self._log_for_record_without_cursor_value()
214
215    def _extract_cursor_value(self, record: Record) -> Any:
216        return self._connector_state_converter.parse_value(self._cursor_field.extract_value(record))
217
218    def close_partition(self, partition: Partition) -> None:
219        slice_count_before = len(self._concurrent_state.get("slices", []))
220        self._add_slice_to_state(partition)
221        if slice_count_before < len(
222            self._concurrent_state["slices"]
223        ):  # only emit if at least one slice has been processed
224            self._merge_partitions()
225            self._emit_state_message()
226        self._has_closed_at_least_one_slice = True
227
228    def _add_slice_to_state(self, partition: Partition) -> None:
229        most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get(
230            partition.to_slice()
231        )
232
233        if self._slice_boundary_fields:
234            if "slices" not in self._concurrent_state:
235                raise RuntimeError(
236                    f"The state for stream {self._stream_name} should have at least one slice to delineate the sync start time, but no slices are present. This is unexpected. Please contact Support."
237                )
238            self._concurrent_state["slices"].append(
239                {
240                    self._connector_state_converter.START_KEY: self._extract_from_slice(
241                        partition, self._slice_boundary_fields[self._START_BOUNDARY]
242                    ),
243                    self._connector_state_converter.END_KEY: self._extract_from_slice(
244                        partition, self._slice_boundary_fields[self._END_BOUNDARY]
245                    ),
246                    self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value,
247                }
248            )
249        elif most_recent_cursor_value:
250            if self._has_closed_at_least_one_slice:
251                # If we track state value using records cursor field, we can only do that if there is one partition. This is because we save
252                # the state every time we close a partition. We assume that if there are multiple slices, they need to be providing
253                # boundaries. There are cases where partitions could not have boundaries:
254                # * The cursor should be per-partition
255                # * The stream state is actually the parent stream state
256                # There might be other cases not listed above. Those are not supported today hence the stream should not use this cursor for
257                # state management. For the specific user that was affected with this issue, we need to:
258                # * Fix state tracking (which is currently broken)
259                # * Make the new version available
260                # * (Probably) ask the user to reset the stream to avoid data loss
261                raise ValueError(
262                    "Given that slice_boundary_fields is not defined and that per-partition state is not supported, only one slice is "
263                    "expected. Please contact the Airbyte team."
264                )
265
266            self._concurrent_state["slices"].append(
267                {
268                    self._connector_state_converter.START_KEY: self.start,
269                    self._connector_state_converter.END_KEY: most_recent_cursor_value,
270                    self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value,
271                }
272            )
273
274    def _emit_state_message(self) -> None:
275        self._connector_state_manager.update_state_for_stream(
276            self._stream_name,
277            self._stream_namespace,
278            self.state,
279        )
280        state_message = self._connector_state_manager.create_state_message(
281            self._stream_name, self._stream_namespace
282        )
283        self._message_repository.emit_message(state_message)
284
285    def _merge_partitions(self) -> None:
286        self._concurrent_state["slices"] = self._connector_state_converter.merge_intervals(
287            self._concurrent_state["slices"]
288        )
289
290    def _extract_from_slice(self, partition: Partition, key: str) -> CursorValueType:
291        try:
292            _slice = partition.to_slice()
293            if not _slice:
294                raise KeyError(f"Could not find key `{key}` in empty slice")
295            return self._connector_state_converter.parse_value(_slice[key])  # type: ignore  # we expect the devs to specify a key that would return a CursorValueType
296        except KeyError as exception:
297            raise KeyError(
298                f"Partition is expected to have key `{key}` but could not be found"
299            ) from exception
300
301    def ensure_at_least_one_state_emitted(self) -> None:
302        """
303        The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be
304        called.
305        """
306        self._emit_state_message()
307
308    def stream_slices(self) -> Iterable[StreamSlice]:
309        """
310        Generating slices based on a few parameters:
311        * lookback_window: Buffer to remove from END_KEY of the highest slice
312        * slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created
313        * start: `_split_per_slice_range` will clip any value to `self._start which means that:
314          * if upper is less than self._start, no slices will be generated
315          * if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case)
316
317        Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be
318        inclusive in the API that is queried.
319        """
320        self._merge_partitions()
321
322        if self._start is not None and self._is_start_before_first_slice():
323            yield from self._split_per_slice_range(
324                self._start,
325                self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY],
326                False,
327            )
328
329        if len(self._concurrent_state["slices"]) == 1:
330            yield from self._split_per_slice_range(
331                self._calculate_lower_boundary_of_last_slice(
332                    self._concurrent_state["slices"][0][self._connector_state_converter.END_KEY]
333                ),
334                self._end_provider(),
335                True,
336            )
337        elif len(self._concurrent_state["slices"]) > 1:
338            for i in range(len(self._concurrent_state["slices"]) - 1):
339                if self._cursor_granularity:
340                    yield from self._split_per_slice_range(
341                        self._concurrent_state["slices"][i][self._connector_state_converter.END_KEY]
342                        + self._cursor_granularity,
343                        self._concurrent_state["slices"][i + 1][
344                            self._connector_state_converter.START_KEY
345                        ],
346                        False,
347                    )
348                else:
349                    yield from self._split_per_slice_range(
350                        self._concurrent_state["slices"][i][
351                            self._connector_state_converter.END_KEY
352                        ],
353                        self._concurrent_state["slices"][i + 1][
354                            self._connector_state_converter.START_KEY
355                        ],
356                        False,
357                    )
358            yield from self._split_per_slice_range(
359                self._calculate_lower_boundary_of_last_slice(
360                    self._concurrent_state["slices"][-1][self._connector_state_converter.END_KEY]
361                ),
362                self._end_provider(),
363                True,
364            )
365        else:
366            raise ValueError("Expected at least one slice")
367
368    def _is_start_before_first_slice(self) -> bool:
369        return (
370            self._start is not None
371            and self._start
372            < self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY]
373        )
374
375    def _calculate_lower_boundary_of_last_slice(
376        self, lower_boundary: CursorValueType
377    ) -> CursorValueType:
378        if self._lookback_window:
379            return lower_boundary - self._lookback_window
380        return lower_boundary
381
382    def _split_per_slice_range(
383        self, lower: CursorValueType, upper: CursorValueType, upper_is_end: bool
384    ) -> Iterable[StreamSlice]:
385        if lower >= upper:
386            return
387
388        if self._start and upper < self._start:
389            return
390
391        lower = max(lower, self._start) if self._start else lower
392        if not self._slice_range or self._evaluate_upper_safely(lower, self._slice_range) >= upper:
393            clamped_lower = self._clamping_strategy.clamp(lower)
394            clamped_upper = self._clamping_strategy.clamp(upper)
395            start_value, end_value = (
396                (clamped_lower, clamped_upper - self._cursor_granularity)
397                if self._cursor_granularity and not upper_is_end
398                else (clamped_lower, clamped_upper)
399            )
400            yield StreamSlice(
401                partition={},
402                cursor_slice={
403                    self._slice_boundary_fields_wrapper[
404                        self._START_BOUNDARY
405                    ]: self._connector_state_converter.output_format(start_value),
406                    self._slice_boundary_fields_wrapper[
407                        self._END_BOUNDARY
408                    ]: self._connector_state_converter.output_format(end_value),
409                },
410            )
411        else:
412            stop_processing = False
413            current_lower_boundary = lower
414            while not stop_processing:
415                current_upper_boundary = min(
416                    self._evaluate_upper_safely(current_lower_boundary, self._slice_range), upper
417                )
418                has_reached_upper_boundary = current_upper_boundary >= upper
419
420                clamped_upper = (
421                    self._clamping_strategy.clamp(current_upper_boundary)
422                    if current_upper_boundary != upper
423                    else current_upper_boundary
424                )
425                clamped_lower = self._clamping_strategy.clamp(current_lower_boundary)
426                if clamped_lower >= clamped_upper:
427                    # clamping collapsed both values which means that it is time to stop processing
428                    # FIXME should this be replace by proper end_provider
429                    break
430                start_value, end_value = (
431                    (clamped_lower, clamped_upper - self._cursor_granularity)
432                    if self._cursor_granularity
433                    and (not upper_is_end or not has_reached_upper_boundary)
434                    else (clamped_lower, clamped_upper)
435                )
436                yield StreamSlice(
437                    partition={},
438                    cursor_slice={
439                        self._slice_boundary_fields_wrapper[
440                            self._START_BOUNDARY
441                        ]: self._connector_state_converter.output_format(start_value),
442                        self._slice_boundary_fields_wrapper[
443                            self._END_BOUNDARY
444                        ]: self._connector_state_converter.output_format(end_value),
445                    },
446                )
447                current_lower_boundary = clamped_upper
448                if current_upper_boundary >= upper:
449                    stop_processing = True
450
451    def _evaluate_upper_safely(self, lower: CursorValueType, step: GapType) -> CursorValueType:
452        """
453        Given that we set the default step at datetime.timedelta.max, we will generate an OverflowError when evaluating the next start_date
454        This method assumes that users would never enter a step that would generate an overflow. Given that would be the case, the code
455        would have broken anyway.
456        """
457        try:
458            return lower + step
459        except OverflowError:
460            return self._end_provider()
461
462    def should_be_synced(self, record: Record) -> bool:
463        """
464        Determines if a record should be synced based on its cursor value.
465        :param record: The record to evaluate
466
467        :return: True if the record's cursor value falls within the sync boundaries
468        """
469        try:
470            record_cursor_value: CursorValueType = self._extract_cursor_value(record)
471        except ValueError:
472            self._log_for_record_without_cursor_value()
473            return True
474        return self.start <= record_cursor_value <= self._end_provider()
475
476    def _log_for_record_without_cursor_value(self) -> None:
477        if not self._should_be_synced_logger_triggered:
478            LOGGER.warning(
479                f"Could not find cursor field `{self.cursor_field.cursor_field_key}` in record for stream {self._stream_name}. The incremental sync will assume it needs to be synced"
480            )
481            self._should_be_synced_logger_triggered = True
LOGGER = <Logger airbyte (INFO)>
class CursorField:
40class CursorField:
41    def __init__(self, cursor_field_key: str) -> None:
42        self.cursor_field_key = cursor_field_key
43
44    def extract_value(self, record: Record) -> CursorValueType:
45        cursor_value = record.data.get(self.cursor_field_key)
46        if cursor_value is None:
47            raise ValueError(f"Could not find cursor field {self.cursor_field_key} in record")
48        return cursor_value  # type: ignore  # we assume that the value the path points at is a comparable
CursorField(cursor_field_key: str)
41    def __init__(self, cursor_field_key: str) -> None:
42        self.cursor_field_key = cursor_field_key
cursor_field_key
44    def extract_value(self, record: Record) -> CursorValueType:
45        cursor_value = record.data.get(self.cursor_field_key)
46        if cursor_value is None:
47            raise ValueError(f"Could not find cursor field {self.cursor_field_key} in record")
48        return cursor_value  # type: ignore  # we assume that the value the path points at is a comparable
51class Cursor(StreamSlicer, ABC):
52    @property
53    @abstractmethod
54    def state(self) -> MutableMapping[str, Any]: ...
55
56    @abstractmethod
57    def observe(self, record: Record) -> None:
58        """
59        Indicate to the cursor that the record has been emitted
60        """
61        raise NotImplementedError()
62
63    @abstractmethod
64    def close_partition(self, partition: Partition) -> None:
65        """
66        Indicate to the cursor that the partition has been successfully processed
67        """
68        raise NotImplementedError()
69
70    @abstractmethod
71    def ensure_at_least_one_state_emitted(self) -> None:
72        """
73        State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per
74        stream. Hence, if no partitions are generated, this method needs to be called.
75        """
76        raise NotImplementedError()
77
78    def stream_slices(self) -> Iterable[StreamSlice]:
79        """
80        Default placeholder implementation of generate_slices.
81        Subclasses can override this method to provide actual behavior.
82        """
83        yield StreamSlice(partition={}, cursor_slice={})

Slices the stream into chunks that can be fetched independently. Slices enable state checkpointing and data retrieval parallelization.

state: MutableMapping[str, Any]
52    @property
53    @abstractmethod
54    def state(self) -> MutableMapping[str, Any]: ...
@abstractmethod
def observe(self, record: airbyte_cdk.Record) -> None:
56    @abstractmethod
57    def observe(self, record: Record) -> None:
58        """
59        Indicate to the cursor that the record has been emitted
60        """
61        raise NotImplementedError()

Indicate to the cursor that the record has been emitted

@abstractmethod
def close_partition( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition) -> None:
63    @abstractmethod
64    def close_partition(self, partition: Partition) -> None:
65        """
66        Indicate to the cursor that the partition has been successfully processed
67        """
68        raise NotImplementedError()

Indicate to the cursor that the partition has been successfully processed

@abstractmethod
def ensure_at_least_one_state_emitted(self) -> None:
70    @abstractmethod
71    def ensure_at_least_one_state_emitted(self) -> None:
72        """
73        State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per
74        stream. Hence, if no partitions are generated, this method needs to be called.
75        """
76        raise NotImplementedError()

State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per stream. Hence, if no partitions are generated, this method needs to be called.

def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
78    def stream_slices(self) -> Iterable[StreamSlice]:
79        """
80        Default placeholder implementation of generate_slices.
81        Subclasses can override this method to provide actual behavior.
82        """
83        yield StreamSlice(partition={}, cursor_slice={})

Default placeholder implementation of generate_slices. Subclasses can override this method to provide actual behavior.

class FinalStateCursor(Cursor):
 86class FinalStateCursor(Cursor):
 87    """Cursor that is used to guarantee at least one state message is emitted for a concurrent stream."""
 88
 89    def __init__(
 90        self,
 91        stream_name: str,
 92        stream_namespace: Optional[str],
 93        message_repository: MessageRepository,
 94    ) -> None:
 95        self._stream_name = stream_name
 96        self._stream_namespace = stream_namespace
 97        self._message_repository = message_repository
 98        # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel
 99        # state message rather than manage overall source state. This is also only temporary as we move to the resumable
100        # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state.
101        self._connector_state_manager = ConnectorStateManager()
102        self._has_closed_at_least_one_slice = False
103
104    @property
105    def state(self) -> MutableMapping[str, Any]:
106        return {NO_CURSOR_STATE_KEY: True}
107
108    def observe(self, record: Record) -> None:
109        pass
110
111    def close_partition(self, partition: Partition) -> None:
112        pass
113
114    def ensure_at_least_one_state_emitted(self) -> None:
115        """
116        Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync
117        """
118
119        self._connector_state_manager.update_state_for_stream(
120            self._stream_name, self._stream_namespace, self.state
121        )
122        state_message = self._connector_state_manager.create_state_message(
123            self._stream_name, self._stream_namespace
124        )
125        self._message_repository.emit_message(state_message)

Cursor that is used to guarantee at least one state message is emitted for a concurrent stream.

FinalStateCursor( stream_name: str, stream_namespace: Optional[str], message_repository: airbyte_cdk.MessageRepository)
 89    def __init__(
 90        self,
 91        stream_name: str,
 92        stream_namespace: Optional[str],
 93        message_repository: MessageRepository,
 94    ) -> None:
 95        self._stream_name = stream_name
 96        self._stream_namespace = stream_namespace
 97        self._message_repository = message_repository
 98        # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel
 99        # state message rather than manage overall source state. This is also only temporary as we move to the resumable
100        # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state.
101        self._connector_state_manager = ConnectorStateManager()
102        self._has_closed_at_least_one_slice = False
state: MutableMapping[str, Any]
104    @property
105    def state(self) -> MutableMapping[str, Any]:
106        return {NO_CURSOR_STATE_KEY: True}
def observe(self, record: airbyte_cdk.Record) -> None:
108    def observe(self, record: Record) -> None:
109        pass

Indicate to the cursor that the record has been emitted

def close_partition( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition) -> None:
111    def close_partition(self, partition: Partition) -> None:
112        pass

Indicate to the cursor that the partition has been successfully processed

def ensure_at_least_one_state_emitted(self) -> None:
114    def ensure_at_least_one_state_emitted(self) -> None:
115        """
116        Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync
117        """
118
119        self._connector_state_manager.update_state_for_stream(
120            self._stream_name, self._stream_namespace, self.state
121        )
122        state_message = self._connector_state_manager.create_state_message(
123            self._stream_name, self._stream_namespace
124        )
125        self._message_repository.emit_message(state_message)

Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync

Inherited Members
Cursor
stream_slices
class ConcurrentCursor(Cursor):
128class ConcurrentCursor(Cursor):
129    _START_BOUNDARY = 0
130    _END_BOUNDARY = 1
131
132    def __init__(
133        self,
134        stream_name: str,
135        stream_namespace: Optional[str],
136        stream_state: Any,
137        message_repository: MessageRepository,
138        connector_state_manager: ConnectorStateManager,
139        connector_state_converter: AbstractStreamStateConverter,
140        cursor_field: CursorField,
141        slice_boundary_fields: Optional[Tuple[str, str]],
142        start: Optional[CursorValueType],
143        end_provider: Callable[[], CursorValueType],
144        lookback_window: Optional[GapType] = None,
145        slice_range: Optional[GapType] = None,
146        cursor_granularity: Optional[GapType] = None,
147        clamping_strategy: ClampingStrategy = NoClamping(),
148    ) -> None:
149        self._stream_name = stream_name
150        self._stream_namespace = stream_namespace
151        self._message_repository = message_repository
152        self._connector_state_converter = connector_state_converter
153        self._connector_state_manager = connector_state_manager
154        self._cursor_field = cursor_field
155        # To see some example where the slice boundaries might not be defined, check https://github.com/airbytehq/airbyte/blob/1ce84d6396e446e1ac2377362446e3fb94509461/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py#L363-L379
156        self._slice_boundary_fields = slice_boundary_fields
157        self._start = start
158        self._end_provider = end_provider
159        self.start, self._concurrent_state = self._get_concurrent_state(stream_state)
160        self._lookback_window = lookback_window
161        self._slice_range = slice_range
162        self._most_recent_cursor_value_per_partition: MutableMapping[
163            Union[StreamSlice, Mapping[str, Any], None], Any
164        ] = {}
165        self._has_closed_at_least_one_slice = False
166        self._cursor_granularity = cursor_granularity
167        # Flag to track if the logger has been triggered (per stream)
168        self._should_be_synced_logger_triggered = False
169        self._clamping_strategy = clamping_strategy
170
171    @property
172    def state(self) -> MutableMapping[str, Any]:
173        return self._connector_state_converter.convert_to_state_message(
174            self.cursor_field, self._concurrent_state
175        )
176
177    @property
178    def cursor_field(self) -> CursorField:
179        return self._cursor_field
180
181    @property
182    def _slice_boundary_fields_wrapper(self) -> Tuple[str, str]:
183        return (
184            self._slice_boundary_fields
185            if self._slice_boundary_fields
186            else (
187                self._connector_state_converter.START_KEY,
188                self._connector_state_converter.END_KEY,
189            )
190        )
191
192    def _get_concurrent_state(
193        self, state: MutableMapping[str, Any]
194    ) -> Tuple[CursorValueType, MutableMapping[str, Any]]:
195        if self._connector_state_converter.is_state_message_compatible(state):
196            return (
197                self._start or self._connector_state_converter.zero_value,
198                self._connector_state_converter.deserialize(state),
199            )
200        return self._connector_state_converter.convert_from_sequential_state(
201            self._cursor_field, state, self._start
202        )
203
204    def observe(self, record: Record) -> None:
205        most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get(
206            record.associated_slice
207        )
208        try:
209            cursor_value = self._extract_cursor_value(record)
210
211            if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value:
212                self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value
213        except ValueError:
214            self._log_for_record_without_cursor_value()
215
216    def _extract_cursor_value(self, record: Record) -> Any:
217        return self._connector_state_converter.parse_value(self._cursor_field.extract_value(record))
218
219    def close_partition(self, partition: Partition) -> None:
220        slice_count_before = len(self._concurrent_state.get("slices", []))
221        self._add_slice_to_state(partition)
222        if slice_count_before < len(
223            self._concurrent_state["slices"]
224        ):  # only emit if at least one slice has been processed
225            self._merge_partitions()
226            self._emit_state_message()
227        self._has_closed_at_least_one_slice = True
228
229    def _add_slice_to_state(self, partition: Partition) -> None:
230        most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get(
231            partition.to_slice()
232        )
233
234        if self._slice_boundary_fields:
235            if "slices" not in self._concurrent_state:
236                raise RuntimeError(
237                    f"The state for stream {self._stream_name} should have at least one slice to delineate the sync start time, but no slices are present. This is unexpected. Please contact Support."
238                )
239            self._concurrent_state["slices"].append(
240                {
241                    self._connector_state_converter.START_KEY: self._extract_from_slice(
242                        partition, self._slice_boundary_fields[self._START_BOUNDARY]
243                    ),
244                    self._connector_state_converter.END_KEY: self._extract_from_slice(
245                        partition, self._slice_boundary_fields[self._END_BOUNDARY]
246                    ),
247                    self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value,
248                }
249            )
250        elif most_recent_cursor_value:
251            if self._has_closed_at_least_one_slice:
252                # If we track state value using records cursor field, we can only do that if there is one partition. This is because we save
253                # the state every time we close a partition. We assume that if there are multiple slices, they need to be providing
254                # boundaries. There are cases where partitions could not have boundaries:
255                # * The cursor should be per-partition
256                # * The stream state is actually the parent stream state
257                # There might be other cases not listed above. Those are not supported today hence the stream should not use this cursor for
258                # state management. For the specific user that was affected with this issue, we need to:
259                # * Fix state tracking (which is currently broken)
260                # * Make the new version available
261                # * (Probably) ask the user to reset the stream to avoid data loss
262                raise ValueError(
263                    "Given that slice_boundary_fields is not defined and that per-partition state is not supported, only one slice is "
264                    "expected. Please contact the Airbyte team."
265                )
266
267            self._concurrent_state["slices"].append(
268                {
269                    self._connector_state_converter.START_KEY: self.start,
270                    self._connector_state_converter.END_KEY: most_recent_cursor_value,
271                    self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value,
272                }
273            )
274
275    def _emit_state_message(self) -> None:
276        self._connector_state_manager.update_state_for_stream(
277            self._stream_name,
278            self._stream_namespace,
279            self.state,
280        )
281        state_message = self._connector_state_manager.create_state_message(
282            self._stream_name, self._stream_namespace
283        )
284        self._message_repository.emit_message(state_message)
285
286    def _merge_partitions(self) -> None:
287        self._concurrent_state["slices"] = self._connector_state_converter.merge_intervals(
288            self._concurrent_state["slices"]
289        )
290
291    def _extract_from_slice(self, partition: Partition, key: str) -> CursorValueType:
292        try:
293            _slice = partition.to_slice()
294            if not _slice:
295                raise KeyError(f"Could not find key `{key}` in empty slice")
296            return self._connector_state_converter.parse_value(_slice[key])  # type: ignore  # we expect the devs to specify a key that would return a CursorValueType
297        except KeyError as exception:
298            raise KeyError(
299                f"Partition is expected to have key `{key}` but could not be found"
300            ) from exception
301
302    def ensure_at_least_one_state_emitted(self) -> None:
303        """
304        The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be
305        called.
306        """
307        self._emit_state_message()
308
309    def stream_slices(self) -> Iterable[StreamSlice]:
310        """
311        Generating slices based on a few parameters:
312        * lookback_window: Buffer to remove from END_KEY of the highest slice
313        * slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created
314        * start: `_split_per_slice_range` will clip any value to `self._start which means that:
315          * if upper is less than self._start, no slices will be generated
316          * if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case)
317
318        Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be
319        inclusive in the API that is queried.
320        """
321        self._merge_partitions()
322
323        if self._start is not None and self._is_start_before_first_slice():
324            yield from self._split_per_slice_range(
325                self._start,
326                self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY],
327                False,
328            )
329
330        if len(self._concurrent_state["slices"]) == 1:
331            yield from self._split_per_slice_range(
332                self._calculate_lower_boundary_of_last_slice(
333                    self._concurrent_state["slices"][0][self._connector_state_converter.END_KEY]
334                ),
335                self._end_provider(),
336                True,
337            )
338        elif len(self._concurrent_state["slices"]) > 1:
339            for i in range(len(self._concurrent_state["slices"]) - 1):
340                if self._cursor_granularity:
341                    yield from self._split_per_slice_range(
342                        self._concurrent_state["slices"][i][self._connector_state_converter.END_KEY]
343                        + self._cursor_granularity,
344                        self._concurrent_state["slices"][i + 1][
345                            self._connector_state_converter.START_KEY
346                        ],
347                        False,
348                    )
349                else:
350                    yield from self._split_per_slice_range(
351                        self._concurrent_state["slices"][i][
352                            self._connector_state_converter.END_KEY
353                        ],
354                        self._concurrent_state["slices"][i + 1][
355                            self._connector_state_converter.START_KEY
356                        ],
357                        False,
358                    )
359            yield from self._split_per_slice_range(
360                self._calculate_lower_boundary_of_last_slice(
361                    self._concurrent_state["slices"][-1][self._connector_state_converter.END_KEY]
362                ),
363                self._end_provider(),
364                True,
365            )
366        else:
367            raise ValueError("Expected at least one slice")
368
369    def _is_start_before_first_slice(self) -> bool:
370        return (
371            self._start is not None
372            and self._start
373            < self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY]
374        )
375
376    def _calculate_lower_boundary_of_last_slice(
377        self, lower_boundary: CursorValueType
378    ) -> CursorValueType:
379        if self._lookback_window:
380            return lower_boundary - self._lookback_window
381        return lower_boundary
382
383    def _split_per_slice_range(
384        self, lower: CursorValueType, upper: CursorValueType, upper_is_end: bool
385    ) -> Iterable[StreamSlice]:
386        if lower >= upper:
387            return
388
389        if self._start and upper < self._start:
390            return
391
392        lower = max(lower, self._start) if self._start else lower
393        if not self._slice_range or self._evaluate_upper_safely(lower, self._slice_range) >= upper:
394            clamped_lower = self._clamping_strategy.clamp(lower)
395            clamped_upper = self._clamping_strategy.clamp(upper)
396            start_value, end_value = (
397                (clamped_lower, clamped_upper - self._cursor_granularity)
398                if self._cursor_granularity and not upper_is_end
399                else (clamped_lower, clamped_upper)
400            )
401            yield StreamSlice(
402                partition={},
403                cursor_slice={
404                    self._slice_boundary_fields_wrapper[
405                        self._START_BOUNDARY
406                    ]: self._connector_state_converter.output_format(start_value),
407                    self._slice_boundary_fields_wrapper[
408                        self._END_BOUNDARY
409                    ]: self._connector_state_converter.output_format(end_value),
410                },
411            )
412        else:
413            stop_processing = False
414            current_lower_boundary = lower
415            while not stop_processing:
416                current_upper_boundary = min(
417                    self._evaluate_upper_safely(current_lower_boundary, self._slice_range), upper
418                )
419                has_reached_upper_boundary = current_upper_boundary >= upper
420
421                clamped_upper = (
422                    self._clamping_strategy.clamp(current_upper_boundary)
423                    if current_upper_boundary != upper
424                    else current_upper_boundary
425                )
426                clamped_lower = self._clamping_strategy.clamp(current_lower_boundary)
427                if clamped_lower >= clamped_upper:
428                    # clamping collapsed both values which means that it is time to stop processing
429                    # FIXME should this be replace by proper end_provider
430                    break
431                start_value, end_value = (
432                    (clamped_lower, clamped_upper - self._cursor_granularity)
433                    if self._cursor_granularity
434                    and (not upper_is_end or not has_reached_upper_boundary)
435                    else (clamped_lower, clamped_upper)
436                )
437                yield StreamSlice(
438                    partition={},
439                    cursor_slice={
440                        self._slice_boundary_fields_wrapper[
441                            self._START_BOUNDARY
442                        ]: self._connector_state_converter.output_format(start_value),
443                        self._slice_boundary_fields_wrapper[
444                            self._END_BOUNDARY
445                        ]: self._connector_state_converter.output_format(end_value),
446                    },
447                )
448                current_lower_boundary = clamped_upper
449                if current_upper_boundary >= upper:
450                    stop_processing = True
451
452    def _evaluate_upper_safely(self, lower: CursorValueType, step: GapType) -> CursorValueType:
453        """
454        Given that we set the default step at datetime.timedelta.max, we will generate an OverflowError when evaluating the next start_date
455        This method assumes that users would never enter a step that would generate an overflow. Given that would be the case, the code
456        would have broken anyway.
457        """
458        try:
459            return lower + step
460        except OverflowError:
461            return self._end_provider()
462
463    def should_be_synced(self, record: Record) -> bool:
464        """
465        Determines if a record should be synced based on its cursor value.
466        :param record: The record to evaluate
467
468        :return: True if the record's cursor value falls within the sync boundaries
469        """
470        try:
471            record_cursor_value: CursorValueType = self._extract_cursor_value(record)
472        except ValueError:
473            self._log_for_record_without_cursor_value()
474            return True
475        return self.start <= record_cursor_value <= self._end_provider()
476
477    def _log_for_record_without_cursor_value(self) -> None:
478        if not self._should_be_synced_logger_triggered:
479            LOGGER.warning(
480                f"Could not find cursor field `{self.cursor_field.cursor_field_key}` in record for stream {self._stream_name}. The incremental sync will assume it needs to be synced"
481            )
482            self._should_be_synced_logger_triggered = True

Slices the stream into chunks that can be fetched independently. Slices enable state checkpointing and data retrieval parallelization.

ConcurrentCursor( stream_name: str, stream_namespace: Optional[str], stream_state: Any, message_repository: airbyte_cdk.MessageRepository, connector_state_manager: airbyte_cdk.ConnectorStateManager, connector_state_converter: airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter.AbstractStreamStateConverter, cursor_field: CursorField, slice_boundary_fields: Optional[Tuple[str, str]], start: Optional[airbyte_cdk.sources.streams.concurrent.cursor_types.CursorValueType], end_provider: Callable[[], airbyte_cdk.sources.streams.concurrent.cursor_types.CursorValueType], lookback_window: Optional[airbyte_cdk.sources.streams.concurrent.cursor_types.GapType] = None, slice_range: Optional[airbyte_cdk.sources.streams.concurrent.cursor_types.GapType] = None, cursor_granularity: Optional[airbyte_cdk.sources.streams.concurrent.cursor_types.GapType] = None, clamping_strategy: airbyte_cdk.sources.streams.concurrent.clamping.ClampingStrategy = <airbyte_cdk.sources.streams.concurrent.clamping.NoClamping object>)
132    def __init__(
133        self,
134        stream_name: str,
135        stream_namespace: Optional[str],
136        stream_state: Any,
137        message_repository: MessageRepository,
138        connector_state_manager: ConnectorStateManager,
139        connector_state_converter: AbstractStreamStateConverter,
140        cursor_field: CursorField,
141        slice_boundary_fields: Optional[Tuple[str, str]],
142        start: Optional[CursorValueType],
143        end_provider: Callable[[], CursorValueType],
144        lookback_window: Optional[GapType] = None,
145        slice_range: Optional[GapType] = None,
146        cursor_granularity: Optional[GapType] = None,
147        clamping_strategy: ClampingStrategy = NoClamping(),
148    ) -> None:
149        self._stream_name = stream_name
150        self._stream_namespace = stream_namespace
151        self._message_repository = message_repository
152        self._connector_state_converter = connector_state_converter
153        self._connector_state_manager = connector_state_manager
154        self._cursor_field = cursor_field
155        # To see some example where the slice boundaries might not be defined, check https://github.com/airbytehq/airbyte/blob/1ce84d6396e446e1ac2377362446e3fb94509461/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py#L363-L379
156        self._slice_boundary_fields = slice_boundary_fields
157        self._start = start
158        self._end_provider = end_provider
159        self.start, self._concurrent_state = self._get_concurrent_state(stream_state)
160        self._lookback_window = lookback_window
161        self._slice_range = slice_range
162        self._most_recent_cursor_value_per_partition: MutableMapping[
163            Union[StreamSlice, Mapping[str, Any], None], Any
164        ] = {}
165        self._has_closed_at_least_one_slice = False
166        self._cursor_granularity = cursor_granularity
167        # Flag to track if the logger has been triggered (per stream)
168        self._should_be_synced_logger_triggered = False
169        self._clamping_strategy = clamping_strategy
state: MutableMapping[str, Any]
171    @property
172    def state(self) -> MutableMapping[str, Any]:
173        return self._connector_state_converter.convert_to_state_message(
174            self.cursor_field, self._concurrent_state
175        )
cursor_field: CursorField
177    @property
178    def cursor_field(self) -> CursorField:
179        return self._cursor_field
def observe(self, record: airbyte_cdk.Record) -> None:
204    def observe(self, record: Record) -> None:
205        most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get(
206            record.associated_slice
207        )
208        try:
209            cursor_value = self._extract_cursor_value(record)
210
211            if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value:
212                self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value
213        except ValueError:
214            self._log_for_record_without_cursor_value()

Indicate to the cursor that the record has been emitted

def close_partition( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition) -> None:
219    def close_partition(self, partition: Partition) -> None:
220        slice_count_before = len(self._concurrent_state.get("slices", []))
221        self._add_slice_to_state(partition)
222        if slice_count_before < len(
223            self._concurrent_state["slices"]
224        ):  # only emit if at least one slice has been processed
225            self._merge_partitions()
226            self._emit_state_message()
227        self._has_closed_at_least_one_slice = True

Indicate to the cursor that the partition has been successfully processed

def ensure_at_least_one_state_emitted(self) -> None:
302    def ensure_at_least_one_state_emitted(self) -> None:
303        """
304        The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be
305        called.
306        """
307        self._emit_state_message()

The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be called.

def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
309    def stream_slices(self) -> Iterable[StreamSlice]:
310        """
311        Generating slices based on a few parameters:
312        * lookback_window: Buffer to remove from END_KEY of the highest slice
313        * slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created
314        * start: `_split_per_slice_range` will clip any value to `self._start which means that:
315          * if upper is less than self._start, no slices will be generated
316          * if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case)
317
318        Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be
319        inclusive in the API that is queried.
320        """
321        self._merge_partitions()
322
323        if self._start is not None and self._is_start_before_first_slice():
324            yield from self._split_per_slice_range(
325                self._start,
326                self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY],
327                False,
328            )
329
330        if len(self._concurrent_state["slices"]) == 1:
331            yield from self._split_per_slice_range(
332                self._calculate_lower_boundary_of_last_slice(
333                    self._concurrent_state["slices"][0][self._connector_state_converter.END_KEY]
334                ),
335                self._end_provider(),
336                True,
337            )
338        elif len(self._concurrent_state["slices"]) > 1:
339            for i in range(len(self._concurrent_state["slices"]) - 1):
340                if self._cursor_granularity:
341                    yield from self._split_per_slice_range(
342                        self._concurrent_state["slices"][i][self._connector_state_converter.END_KEY]
343                        + self._cursor_granularity,
344                        self._concurrent_state["slices"][i + 1][
345                            self._connector_state_converter.START_KEY
346                        ],
347                        False,
348                    )
349                else:
350                    yield from self._split_per_slice_range(
351                        self._concurrent_state["slices"][i][
352                            self._connector_state_converter.END_KEY
353                        ],
354                        self._concurrent_state["slices"][i + 1][
355                            self._connector_state_converter.START_KEY
356                        ],
357                        False,
358                    )
359            yield from self._split_per_slice_range(
360                self._calculate_lower_boundary_of_last_slice(
361                    self._concurrent_state["slices"][-1][self._connector_state_converter.END_KEY]
362                ),
363                self._end_provider(),
364                True,
365            )
366        else:
367            raise ValueError("Expected at least one slice")

Generating slices based on a few parameters:

  • lookback_window: Buffer to remove from END_KEY of the highest slice
  • slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created
  • start: _split_per_slice_range will clip any value to `self._start which means that:
    • if upper is less than self._start, no slices will be generated
    • if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case)

Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be inclusive in the API that is queried.

def should_be_synced(self, record: airbyte_cdk.Record) -> bool:
463    def should_be_synced(self, record: Record) -> bool:
464        """
465        Determines if a record should be synced based on its cursor value.
466        :param record: The record to evaluate
467
468        :return: True if the record's cursor value falls within the sync boundaries
469        """
470        try:
471            record_cursor_value: CursorValueType = self._extract_cursor_value(record)
472        except ValueError:
473            self._log_for_record_without_cursor_value()
474            return True
475        return self.start <= record_cursor_value <= self._end_provider()

Determines if a record should be synced based on its cursor value.

Parameters
  • record: The record to evaluate
Returns

True if the record's cursor value falls within the sync boundaries