airbyte_cdk.sources.streams.concurrent.cursor

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5import functools
  6import logging
  7import threading
  8from abc import ABC, abstractmethod
  9from typing import (
 10    Any,
 11    Callable,
 12    Iterable,
 13    List,
 14    Mapping,
 15    MutableMapping,
 16    Optional,
 17    Tuple,
 18    Union,
 19)
 20
 21from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
 22from airbyte_cdk.sources.message import MessageRepository, NoopMessageRepository
 23from airbyte_cdk.sources.streams import NO_CURSOR_STATE_KEY
 24from airbyte_cdk.sources.streams.concurrent.clamping import ClampingStrategy, NoClamping
 25from airbyte_cdk.sources.streams.concurrent.cursor_types import CursorValueType, GapType
 26from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
 27from airbyte_cdk.sources.streams.concurrent.partitions.stream_slicer import StreamSlicer
 28from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import (
 29    AbstractStreamStateConverter,
 30)
 31from airbyte_cdk.sources.types import Record, StreamSlice
 32
 33LOGGER = logging.getLogger("airbyte")
 34
 35
 36def _extract_value(mapping: Mapping[str, Any], path: List[str]) -> Any:
 37    return functools.reduce(lambda a, b: a[b], path, mapping)
 38
 39
 40class CursorField:
 41    def __init__(self, cursor_field_key: str) -> None:
 42        self.cursor_field_key = cursor_field_key
 43
 44    def extract_value(self, record: Record) -> Any:
 45        cursor_value = record.data.get(self.cursor_field_key)
 46        if cursor_value is None:
 47            raise ValueError(f"Could not find cursor field {self.cursor_field_key} in record")
 48        return cursor_value  # type: ignore  # we assume that the value the path points at is a comparable
 49
 50
 51class Cursor(StreamSlicer, ABC):
 52    @property
 53    @abstractmethod
 54    def state(self) -> MutableMapping[str, Any]: ...
 55
 56    @abstractmethod
 57    def observe(self, record: Record) -> None:
 58        """
 59        Indicate to the cursor that the record has been emitted
 60        """
 61        raise NotImplementedError()
 62
 63    @abstractmethod
 64    def close_partition(self, partition: Partition) -> None:
 65        """
 66        Indicate to the cursor that the partition has been successfully processed
 67        """
 68        raise NotImplementedError()
 69
 70    @abstractmethod
 71    def ensure_at_least_one_state_emitted(self) -> None:
 72        """
 73        State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per
 74        stream. Hence, if no partitions are generated, this method needs to be called.
 75        """
 76        raise NotImplementedError()
 77
 78    @abstractmethod
 79    def should_be_synced(self, record: Record) -> bool:
 80        pass
 81
 82    def stream_slices(self) -> Iterable[StreamSlice]:
 83        """
 84        Default placeholder implementation of generate_slices.
 85        Subclasses can override this method to provide actual behavior.
 86        """
 87        yield StreamSlice(partition={}, cursor_slice={})
 88
 89
 90class FinalStateCursor(Cursor):
 91    """Cursor that is used to guarantee at least one state message is emitted for a concurrent stream."""
 92
 93    def __init__(
 94        self,
 95        stream_name: str,
 96        stream_namespace: Optional[str],
 97        message_repository: MessageRepository,
 98    ) -> None:
 99        self._stream_name = stream_name
100        self._stream_namespace = stream_namespace
101        self._message_repository = message_repository
102        # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel
103        # state message rather than manage overall source state. This is also only temporary as we move to the resumable
104        # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state.
105        self._connector_state_manager = ConnectorStateManager()
106        self._has_closed_at_least_one_slice = False
107
108    @property
109    def state(self) -> MutableMapping[str, Any]:
110        return {NO_CURSOR_STATE_KEY: True}
111
112    def observe(self, record: Record) -> None:
113        pass
114
115    def close_partition(self, partition: Partition) -> None:
116        pass
117
118    def ensure_at_least_one_state_emitted(self) -> None:
119        """
120        Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync
121        """
122
123        self._connector_state_manager.update_state_for_stream(
124            self._stream_name, self._stream_namespace, self.state
125        )
126        state_message = self._connector_state_manager.create_state_message(
127            self._stream_name, self._stream_namespace
128        )
129        self._message_repository.emit_message(state_message)
130
131    def should_be_synced(self, record: Record) -> bool:
132        return True
133
134
135class ConcurrentCursor(Cursor):
136    _START_BOUNDARY = 0
137    _END_BOUNDARY = 1
138
139    def copy_without_state(self) -> "ConcurrentCursor":
140        return self.__class__(
141            stream_name=self._stream_name,
142            stream_namespace=self._stream_namespace,
143            stream_state={},
144            message_repository=NoopMessageRepository(),
145            connector_state_manager=ConnectorStateManager(),
146            connector_state_converter=self._connector_state_converter,
147            cursor_field=self._cursor_field,
148            slice_boundary_fields=self._slice_boundary_fields,
149            start=self._start,
150            end_provider=self._end_provider,
151            lookback_window=self._lookback_window,
152            slice_range=self._slice_range,
153            cursor_granularity=self._cursor_granularity,
154            clamping_strategy=self._clamping_strategy,
155        )
156
157    def __init__(
158        self,
159        stream_name: str,
160        stream_namespace: Optional[str],
161        stream_state: Any,
162        message_repository: MessageRepository,
163        connector_state_manager: ConnectorStateManager,
164        connector_state_converter: AbstractStreamStateConverter,
165        cursor_field: CursorField,
166        slice_boundary_fields: Optional[Tuple[str, str]],
167        start: Optional[CursorValueType],
168        end_provider: Callable[[], CursorValueType],
169        lookback_window: Optional[GapType] = None,
170        slice_range: Optional[GapType] = None,
171        cursor_granularity: Optional[GapType] = None,
172        clamping_strategy: ClampingStrategy = NoClamping(),
173    ) -> None:
174        self._stream_name = stream_name
175        self._stream_namespace = stream_namespace
176        self._message_repository = message_repository
177        self._connector_state_converter = connector_state_converter
178        self._connector_state_manager = connector_state_manager
179        self._cursor_field = cursor_field
180        # To see some example where the slice boundaries might not be defined, check https://github.com/airbytehq/airbyte/blob/1ce84d6396e446e1ac2377362446e3fb94509461/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py#L363-L379
181        self._slice_boundary_fields = slice_boundary_fields
182        self._start = start
183        self._end_provider = end_provider
184        self.start, self._concurrent_state = self._get_concurrent_state(stream_state)
185        self._lookback_window = lookback_window
186        self._slice_range = slice_range
187        self._most_recent_cursor_value_per_partition: MutableMapping[
188            Union[StreamSlice, Mapping[str, Any], None], Any
189        ] = {}
190        self._has_closed_at_least_one_slice = False
191        self._cursor_granularity = cursor_granularity
192        # Flag to track if the logger has been triggered (per stream)
193        self._should_be_synced_logger_triggered = False
194        self._clamping_strategy = clamping_strategy
195        self._is_ascending_order = True
196
197        # A lock is required when closing a partition because updating the cursor's concurrent_state is
198        # not thread safe. When multiple partitions are being closed by the cursor at the same time, it is
199        # possible for one partition to update concurrent_state after a second partition has already read
200        # the previous state. This can lead to the second partition overwriting the previous one's state.
201        self._lock = threading.Lock()
202
203    @property
204    def state(self) -> MutableMapping[str, Any]:
205        return self._connector_state_converter.convert_to_state_message(
206            self.cursor_field, self._concurrent_state
207        )
208
209    @property
210    def cursor_field(self) -> CursorField:
211        return self._cursor_field
212
213    @property
214    def _slice_boundary_fields_wrapper(self) -> Tuple[str, str]:
215        return (
216            self._slice_boundary_fields
217            if self._slice_boundary_fields
218            else (
219                self._connector_state_converter.START_KEY,
220                self._connector_state_converter.END_KEY,
221            )
222        )
223
224    def _get_concurrent_state(
225        self, state: MutableMapping[str, Any]
226    ) -> Tuple[CursorValueType, MutableMapping[str, Any]]:
227        if self._connector_state_converter.is_state_message_compatible(state):
228            partitioned_state = self._connector_state_converter.deserialize(state)
229            slices_from_partitioned_state = partitioned_state.get("slices", [])
230
231            value_from_partitioned_state = None
232            if slices_from_partitioned_state:
233                # We assume here that the slices have been already merged
234                first_slice = slices_from_partitioned_state[0]
235                value_from_partitioned_state = (
236                    first_slice[self._connector_state_converter.MOST_RECENT_RECORD_KEY]
237                    if self._connector_state_converter.MOST_RECENT_RECORD_KEY in first_slice
238                    else first_slice[self._connector_state_converter.END_KEY]
239                )
240            return (
241                value_from_partitioned_state
242                or self._start
243                or self._connector_state_converter.zero_value,
244                partitioned_state,
245            )
246        return self._connector_state_converter.convert_from_sequential_state(
247            self._cursor_field, state, self._start
248        )
249
250    def observe(self, record: Record) -> None:
251        # Because observe writes to the most_recent_cursor_value_per_partition mapping,
252        # it is not thread-safe. However, this shouldn't lead to concurrency issues because
253        # observe() is only invoked by PartitionReader.process_partition(). Since the map is
254        # broken down according to partition, concurrent threads processing only read/write
255        # from different keys which avoids any conflicts.
256        #
257        # If we were to add thread safety, we should implement a lock per-partition
258        # which is instantiated during stream_slices()
259        most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get(
260            record.associated_slice
261        )
262        try:
263            cursor_value = self._extract_cursor_value(record)
264
265            if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value:
266                self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value
267            elif most_recent_cursor_value > cursor_value:
268                self._is_ascending_order = False
269        except ValueError:
270            self._log_for_record_without_cursor_value()
271
272    def _extract_cursor_value(self, record: Record) -> Any:
273        return self._connector_state_converter.parse_value(self._cursor_field.extract_value(record))
274
275    def close_partition(self, partition: Partition) -> None:
276        with self._lock:
277            slice_count_before = len(self._concurrent_state.get("slices", []))
278            self._add_slice_to_state(partition)
279            if slice_count_before < len(
280                self._concurrent_state["slices"]
281            ):  # only emit if at least one slice has been processed
282                self._merge_partitions()
283                self._emit_state_message()
284        self._has_closed_at_least_one_slice = True
285
286    def _add_slice_to_state(self, partition: Partition) -> None:
287        most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get(
288            partition.to_slice()
289        )
290
291        if self._slice_boundary_fields:
292            if "slices" not in self._concurrent_state:
293                raise RuntimeError(
294                    f"The state for stream {self._stream_name} should have at least one slice to delineate the sync start time, but no slices are present. This is unexpected. Please contact Support."
295                )
296            self._concurrent_state["slices"].append(
297                {
298                    self._connector_state_converter.START_KEY: self._extract_from_slice(
299                        partition, self._slice_boundary_fields[self._START_BOUNDARY]
300                    ),
301                    self._connector_state_converter.END_KEY: self._extract_from_slice(
302                        partition, self._slice_boundary_fields[self._END_BOUNDARY]
303                    ),
304                    self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value,
305                }
306            )
307        elif most_recent_cursor_value:
308            if self._has_closed_at_least_one_slice:
309                # If we track state value using records cursor field, we can only do that if there is one partition. This is because we save
310                # the state every time we close a partition. We assume that if there are multiple slices, they need to be providing
311                # boundaries. There are cases where partitions could not have boundaries:
312                # * The cursor should be per-partition
313                # * The stream state is actually the parent stream state
314                # There might be other cases not listed above. Those are not supported today hence the stream should not use this cursor for
315                # state management. For the specific user that was affected with this issue, we need to:
316                # * Fix state tracking (which is currently broken)
317                # * Make the new version available
318                # * (Probably) ask the user to reset the stream to avoid data loss
319                raise ValueError(
320                    "Given that slice_boundary_fields is not defined and that per-partition state is not supported, only one slice is "
321                    "expected. Please contact the Airbyte team."
322                )
323
324            self._concurrent_state["slices"].append(
325                {
326                    self._connector_state_converter.START_KEY: self.start,
327                    self._connector_state_converter.END_KEY: most_recent_cursor_value,
328                    self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value,
329                }
330            )
331
332    def _emit_state_message(self) -> None:
333        self._connector_state_manager.update_state_for_stream(
334            self._stream_name,
335            self._stream_namespace,
336            self.state,
337        )
338        state_message = self._connector_state_manager.create_state_message(
339            self._stream_name, self._stream_namespace
340        )
341        self._message_repository.emit_message(state_message)
342
343    def _merge_partitions(self) -> None:
344        self._concurrent_state["slices"] = self._connector_state_converter.merge_intervals(
345            self._concurrent_state["slices"]
346        )
347
348    def _extract_from_slice(self, partition: Partition, key: str) -> CursorValueType:
349        try:
350            _slice = partition.to_slice()
351            if not _slice:
352                raise KeyError(f"Could not find key `{key}` in empty slice")
353            return self._connector_state_converter.parse_value(_slice[key])  # type: ignore  # we expect the devs to specify a key that would return a CursorValueType
354        except KeyError as exception:
355            raise KeyError(
356                f"Partition is expected to have key `{key}` but could not be found"
357            ) from exception
358
359    def ensure_at_least_one_state_emitted(self) -> None:
360        """
361        The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be
362        called.
363        """
364        self._emit_state_message()
365
366    def stream_slices(self) -> Iterable[StreamSlice]:
367        """
368        Generating slices based on a few parameters:
369        * lookback_window: Buffer to remove from END_KEY of the highest slice
370        * slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created
371        * start: `_split_per_slice_range` will clip any value to `self._start which means that:
372          * if upper is less than self._start, no slices will be generated
373          * if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case)
374
375        Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be
376        inclusive in the API that is queried.
377        """
378        self._merge_partitions()
379
380        if self._start is not None and self._is_start_before_first_slice():
381            yield from self._split_per_slice_range(
382                self._start,
383                self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY],
384                False,
385            )
386
387        if len(self._concurrent_state["slices"]) == 1:
388            yield from self._split_per_slice_range(
389                self._calculate_lower_boundary_of_last_slice(
390                    self._concurrent_state["slices"][0][self._connector_state_converter.END_KEY]
391                ),
392                self._end_provider(),
393                True,
394            )
395        elif len(self._concurrent_state["slices"]) > 1:
396            for i in range(len(self._concurrent_state["slices"]) - 1):
397                if self._cursor_granularity:
398                    yield from self._split_per_slice_range(
399                        self._concurrent_state["slices"][i][self._connector_state_converter.END_KEY]
400                        + self._cursor_granularity,
401                        self._concurrent_state["slices"][i + 1][
402                            self._connector_state_converter.START_KEY
403                        ],
404                        False,
405                    )
406                else:
407                    yield from self._split_per_slice_range(
408                        self._concurrent_state["slices"][i][
409                            self._connector_state_converter.END_KEY
410                        ],
411                        self._concurrent_state["slices"][i + 1][
412                            self._connector_state_converter.START_KEY
413                        ],
414                        False,
415                    )
416            yield from self._split_per_slice_range(
417                self._calculate_lower_boundary_of_last_slice(
418                    self._concurrent_state["slices"][-1][self._connector_state_converter.END_KEY]
419                ),
420                self._end_provider(),
421                True,
422            )
423        else:
424            raise ValueError("Expected at least one slice")
425
426    def _is_start_before_first_slice(self) -> bool:
427        return (
428            self._start is not None
429            and self._start
430            < self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY]
431        )
432
433    def _calculate_lower_boundary_of_last_slice(
434        self, lower_boundary: CursorValueType
435    ) -> CursorValueType:
436        if self._lookback_window:
437            return lower_boundary - self._lookback_window
438        return lower_boundary
439
440    def _split_per_slice_range(
441        self, lower: CursorValueType, upper: CursorValueType, upper_is_end: bool
442    ) -> Iterable[StreamSlice]:
443        if lower >= upper:
444            return
445
446        if self._start and upper < self._start:
447            return
448
449        lower = max(lower, self._start) if self._start else lower
450        if not self._slice_range or self._evaluate_upper_safely(lower, self._slice_range) >= upper:
451            clamped_lower = self._clamping_strategy.clamp(lower)
452            clamped_upper = self._clamping_strategy.clamp(upper)
453            start_value, end_value = (
454                (clamped_lower, clamped_upper - self._cursor_granularity)
455                if self._cursor_granularity and not upper_is_end
456                else (clamped_lower, clamped_upper)
457            )
458            yield StreamSlice(
459                partition={},
460                cursor_slice={
461                    self._slice_boundary_fields_wrapper[
462                        self._START_BOUNDARY
463                    ]: self._connector_state_converter.output_format(start_value),
464                    self._slice_boundary_fields_wrapper[
465                        self._END_BOUNDARY
466                    ]: self._connector_state_converter.output_format(end_value),
467                },
468            )
469        else:
470            stop_processing = False
471            current_lower_boundary = lower
472            while not stop_processing:
473                current_upper_boundary = min(
474                    self._evaluate_upper_safely(current_lower_boundary, self._slice_range), upper
475                )
476                has_reached_upper_boundary = current_upper_boundary >= upper
477
478                clamped_upper = (
479                    self._clamping_strategy.clamp(current_upper_boundary)
480                    if current_upper_boundary != upper
481                    else current_upper_boundary
482                )
483                clamped_lower = self._clamping_strategy.clamp(current_lower_boundary)
484                if clamped_lower >= clamped_upper:
485                    # clamping collapsed both values which means that it is time to stop processing
486                    # FIXME should this be replace by proper end_provider
487                    break
488                start_value, end_value = (
489                    (clamped_lower, clamped_upper - self._cursor_granularity)
490                    if self._cursor_granularity
491                    and (not upper_is_end or not has_reached_upper_boundary)
492                    else (clamped_lower, clamped_upper)
493                )
494                yield StreamSlice(
495                    partition={},
496                    cursor_slice={
497                        self._slice_boundary_fields_wrapper[
498                            self._START_BOUNDARY
499                        ]: self._connector_state_converter.output_format(start_value),
500                        self._slice_boundary_fields_wrapper[
501                            self._END_BOUNDARY
502                        ]: self._connector_state_converter.output_format(end_value),
503                    },
504                )
505                current_lower_boundary = clamped_upper
506                if current_upper_boundary >= upper:
507                    stop_processing = True
508
509    def _evaluate_upper_safely(self, lower: CursorValueType, step: GapType) -> CursorValueType:
510        """
511        Given that we set the default step at datetime.timedelta.max, we will generate an OverflowError when evaluating the next start_date
512        This method assumes that users would never enter a step that would generate an overflow. Given that would be the case, the code
513        would have broken anyway.
514        """
515        try:
516            return lower + step
517        except OverflowError:
518            return self._end_provider()
519
520    def should_be_synced(self, record: Record) -> bool:
521        """
522        Determines if a record should be synced based on its cursor value.
523        :param record: The record to evaluate
524
525        :return: True if the record's cursor value falls within the sync boundaries
526        """
527        try:
528            record_cursor_value: CursorValueType = self._extract_cursor_value(record)
529        except ValueError:
530            self._log_for_record_without_cursor_value()
531            return True
532        return self.start <= record_cursor_value <= self._end_provider()
533
534    def _log_for_record_without_cursor_value(self) -> None:
535        if not self._should_be_synced_logger_triggered:
536            LOGGER.warning(
537                f"Could not find cursor field `{self.cursor_field.cursor_field_key}` in record for stream {self._stream_name}. The incremental sync will assume it needs to be synced"
538            )
539            self._should_be_synced_logger_triggered = True
540
541    def reduce_slice_range(self, stream_slice: StreamSlice) -> StreamSlice:
542        # In theory, we might be more flexible here meaning that it doesn't need to be in ascending order but it just
543        # needs to be ordered. For now though, we will only support ascending order.
544        if not self._is_ascending_order:
545            LOGGER.warning(
546                "Attempting to reduce slice while records are not returned in incremental order might lead to missing records"
547            )
548
549        if stream_slice in self._most_recent_cursor_value_per_partition:
550            return StreamSlice(
551                partition=stream_slice.partition,
552                cursor_slice={
553                    self._slice_boundary_fields_wrapper[
554                        self._START_BOUNDARY
555                    ]: self._connector_state_converter.output_format(
556                        self._most_recent_cursor_value_per_partition[stream_slice]
557                    ),
558                    self._slice_boundary_fields_wrapper[
559                        self._END_BOUNDARY
560                    ]: stream_slice.cursor_slice[
561                        self._slice_boundary_fields_wrapper[self._END_BOUNDARY]
562                    ],
563                },
564                extra_fields=stream_slice.extra_fields,
565            )
566        else:
567            return stream_slice
LOGGER = <Logger airbyte (INFO)>
class CursorField:
41class CursorField:
42    def __init__(self, cursor_field_key: str) -> None:
43        self.cursor_field_key = cursor_field_key
44
45    def extract_value(self, record: Record) -> Any:
46        cursor_value = record.data.get(self.cursor_field_key)
47        if cursor_value is None:
48            raise ValueError(f"Could not find cursor field {self.cursor_field_key} in record")
49        return cursor_value  # type: ignore  # we assume that the value the path points at is a comparable
CursorField(cursor_field_key: str)
42    def __init__(self, cursor_field_key: str) -> None:
43        self.cursor_field_key = cursor_field_key
cursor_field_key
def extract_value(self, record: airbyte_cdk.Record) -> Any:
45    def extract_value(self, record: Record) -> Any:
46        cursor_value = record.data.get(self.cursor_field_key)
47        if cursor_value is None:
48            raise ValueError(f"Could not find cursor field {self.cursor_field_key} in record")
49        return cursor_value  # type: ignore  # we assume that the value the path points at is a comparable
52class Cursor(StreamSlicer, ABC):
53    @property
54    @abstractmethod
55    def state(self) -> MutableMapping[str, Any]: ...
56
57    @abstractmethod
58    def observe(self, record: Record) -> None:
59        """
60        Indicate to the cursor that the record has been emitted
61        """
62        raise NotImplementedError()
63
64    @abstractmethod
65    def close_partition(self, partition: Partition) -> None:
66        """
67        Indicate to the cursor that the partition has been successfully processed
68        """
69        raise NotImplementedError()
70
71    @abstractmethod
72    def ensure_at_least_one_state_emitted(self) -> None:
73        """
74        State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per
75        stream. Hence, if no partitions are generated, this method needs to be called.
76        """
77        raise NotImplementedError()
78
79    @abstractmethod
80    def should_be_synced(self, record: Record) -> bool:
81        pass
82
83    def stream_slices(self) -> Iterable[StreamSlice]:
84        """
85        Default placeholder implementation of generate_slices.
86        Subclasses can override this method to provide actual behavior.
87        """
88        yield StreamSlice(partition={}, cursor_slice={})

Slices the stream into chunks that can be fetched independently. Slices enable state checkpointing and data retrieval parallelization.

state: MutableMapping[str, Any]
53    @property
54    @abstractmethod
55    def state(self) -> MutableMapping[str, Any]: ...
@abstractmethod
def observe(self, record: airbyte_cdk.Record) -> None:
57    @abstractmethod
58    def observe(self, record: Record) -> None:
59        """
60        Indicate to the cursor that the record has been emitted
61        """
62        raise NotImplementedError()

Indicate to the cursor that the record has been emitted

@abstractmethod
def close_partition( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition) -> None:
64    @abstractmethod
65    def close_partition(self, partition: Partition) -> None:
66        """
67        Indicate to the cursor that the partition has been successfully processed
68        """
69        raise NotImplementedError()

Indicate to the cursor that the partition has been successfully processed

@abstractmethod
def ensure_at_least_one_state_emitted(self) -> None:
71    @abstractmethod
72    def ensure_at_least_one_state_emitted(self) -> None:
73        """
74        State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per
75        stream. Hence, if no partitions are generated, this method needs to be called.
76        """
77        raise NotImplementedError()

State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per stream. Hence, if no partitions are generated, this method needs to be called.

@abstractmethod
def should_be_synced(self, record: airbyte_cdk.Record) -> bool:
79    @abstractmethod
80    def should_be_synced(self, record: Record) -> bool:
81        pass
def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
83    def stream_slices(self) -> Iterable[StreamSlice]:
84        """
85        Default placeholder implementation of generate_slices.
86        Subclasses can override this method to provide actual behavior.
87        """
88        yield StreamSlice(partition={}, cursor_slice={})

Default placeholder implementation of generate_slices. Subclasses can override this method to provide actual behavior.

class FinalStateCursor(Cursor):
 91class FinalStateCursor(Cursor):
 92    """Cursor that is used to guarantee at least one state message is emitted for a concurrent stream."""
 93
 94    def __init__(
 95        self,
 96        stream_name: str,
 97        stream_namespace: Optional[str],
 98        message_repository: MessageRepository,
 99    ) -> None:
100        self._stream_name = stream_name
101        self._stream_namespace = stream_namespace
102        self._message_repository = message_repository
103        # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel
104        # state message rather than manage overall source state. This is also only temporary as we move to the resumable
105        # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state.
106        self._connector_state_manager = ConnectorStateManager()
107        self._has_closed_at_least_one_slice = False
108
109    @property
110    def state(self) -> MutableMapping[str, Any]:
111        return {NO_CURSOR_STATE_KEY: True}
112
113    def observe(self, record: Record) -> None:
114        pass
115
116    def close_partition(self, partition: Partition) -> None:
117        pass
118
119    def ensure_at_least_one_state_emitted(self) -> None:
120        """
121        Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync
122        """
123
124        self._connector_state_manager.update_state_for_stream(
125            self._stream_name, self._stream_namespace, self.state
126        )
127        state_message = self._connector_state_manager.create_state_message(
128            self._stream_name, self._stream_namespace
129        )
130        self._message_repository.emit_message(state_message)
131
132    def should_be_synced(self, record: Record) -> bool:
133        return True

Cursor that is used to guarantee at least one state message is emitted for a concurrent stream.

FinalStateCursor( stream_name: str, stream_namespace: Optional[str], message_repository: airbyte_cdk.MessageRepository)
 94    def __init__(
 95        self,
 96        stream_name: str,
 97        stream_namespace: Optional[str],
 98        message_repository: MessageRepository,
 99    ) -> None:
100        self._stream_name = stream_name
101        self._stream_namespace = stream_namespace
102        self._message_repository = message_repository
103        # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel
104        # state message rather than manage overall source state. This is also only temporary as we move to the resumable
105        # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state.
106        self._connector_state_manager = ConnectorStateManager()
107        self._has_closed_at_least_one_slice = False
state: MutableMapping[str, Any]
109    @property
110    def state(self) -> MutableMapping[str, Any]:
111        return {NO_CURSOR_STATE_KEY: True}
def observe(self, record: airbyte_cdk.Record) -> None:
113    def observe(self, record: Record) -> None:
114        pass

Indicate to the cursor that the record has been emitted

def close_partition( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition) -> None:
116    def close_partition(self, partition: Partition) -> None:
117        pass

Indicate to the cursor that the partition has been successfully processed

def ensure_at_least_one_state_emitted(self) -> None:
119    def ensure_at_least_one_state_emitted(self) -> None:
120        """
121        Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync
122        """
123
124        self._connector_state_manager.update_state_for_stream(
125            self._stream_name, self._stream_namespace, self.state
126        )
127        state_message = self._connector_state_manager.create_state_message(
128            self._stream_name, self._stream_namespace
129        )
130        self._message_repository.emit_message(state_message)

Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync

def should_be_synced(self, record: airbyte_cdk.Record) -> bool:
132    def should_be_synced(self, record: Record) -> bool:
133        return True
Inherited Members
Cursor
stream_slices
class ConcurrentCursor(Cursor):
136class ConcurrentCursor(Cursor):
137    _START_BOUNDARY = 0
138    _END_BOUNDARY = 1
139
140    def copy_without_state(self) -> "ConcurrentCursor":
141        return self.__class__(
142            stream_name=self._stream_name,
143            stream_namespace=self._stream_namespace,
144            stream_state={},
145            message_repository=NoopMessageRepository(),
146            connector_state_manager=ConnectorStateManager(),
147            connector_state_converter=self._connector_state_converter,
148            cursor_field=self._cursor_field,
149            slice_boundary_fields=self._slice_boundary_fields,
150            start=self._start,
151            end_provider=self._end_provider,
152            lookback_window=self._lookback_window,
153            slice_range=self._slice_range,
154            cursor_granularity=self._cursor_granularity,
155            clamping_strategy=self._clamping_strategy,
156        )
157
158    def __init__(
159        self,
160        stream_name: str,
161        stream_namespace: Optional[str],
162        stream_state: Any,
163        message_repository: MessageRepository,
164        connector_state_manager: ConnectorStateManager,
165        connector_state_converter: AbstractStreamStateConverter,
166        cursor_field: CursorField,
167        slice_boundary_fields: Optional[Tuple[str, str]],
168        start: Optional[CursorValueType],
169        end_provider: Callable[[], CursorValueType],
170        lookback_window: Optional[GapType] = None,
171        slice_range: Optional[GapType] = None,
172        cursor_granularity: Optional[GapType] = None,
173        clamping_strategy: ClampingStrategy = NoClamping(),
174    ) -> None:
175        self._stream_name = stream_name
176        self._stream_namespace = stream_namespace
177        self._message_repository = message_repository
178        self._connector_state_converter = connector_state_converter
179        self._connector_state_manager = connector_state_manager
180        self._cursor_field = cursor_field
181        # To see some example where the slice boundaries might not be defined, check https://github.com/airbytehq/airbyte/blob/1ce84d6396e446e1ac2377362446e3fb94509461/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py#L363-L379
182        self._slice_boundary_fields = slice_boundary_fields
183        self._start = start
184        self._end_provider = end_provider
185        self.start, self._concurrent_state = self._get_concurrent_state(stream_state)
186        self._lookback_window = lookback_window
187        self._slice_range = slice_range
188        self._most_recent_cursor_value_per_partition: MutableMapping[
189            Union[StreamSlice, Mapping[str, Any], None], Any
190        ] = {}
191        self._has_closed_at_least_one_slice = False
192        self._cursor_granularity = cursor_granularity
193        # Flag to track if the logger has been triggered (per stream)
194        self._should_be_synced_logger_triggered = False
195        self._clamping_strategy = clamping_strategy
196        self._is_ascending_order = True
197
198        # A lock is required when closing a partition because updating the cursor's concurrent_state is
199        # not thread safe. When multiple partitions are being closed by the cursor at the same time, it is
200        # possible for one partition to update concurrent_state after a second partition has already read
201        # the previous state. This can lead to the second partition overwriting the previous one's state.
202        self._lock = threading.Lock()
203
204    @property
205    def state(self) -> MutableMapping[str, Any]:
206        return self._connector_state_converter.convert_to_state_message(
207            self.cursor_field, self._concurrent_state
208        )
209
210    @property
211    def cursor_field(self) -> CursorField:
212        return self._cursor_field
213
214    @property
215    def _slice_boundary_fields_wrapper(self) -> Tuple[str, str]:
216        return (
217            self._slice_boundary_fields
218            if self._slice_boundary_fields
219            else (
220                self._connector_state_converter.START_KEY,
221                self._connector_state_converter.END_KEY,
222            )
223        )
224
225    def _get_concurrent_state(
226        self, state: MutableMapping[str, Any]
227    ) -> Tuple[CursorValueType, MutableMapping[str, Any]]:
228        if self._connector_state_converter.is_state_message_compatible(state):
229            partitioned_state = self._connector_state_converter.deserialize(state)
230            slices_from_partitioned_state = partitioned_state.get("slices", [])
231
232            value_from_partitioned_state = None
233            if slices_from_partitioned_state:
234                # We assume here that the slices have been already merged
235                first_slice = slices_from_partitioned_state[0]
236                value_from_partitioned_state = (
237                    first_slice[self._connector_state_converter.MOST_RECENT_RECORD_KEY]
238                    if self._connector_state_converter.MOST_RECENT_RECORD_KEY in first_slice
239                    else first_slice[self._connector_state_converter.END_KEY]
240                )
241            return (
242                value_from_partitioned_state
243                or self._start
244                or self._connector_state_converter.zero_value,
245                partitioned_state,
246            )
247        return self._connector_state_converter.convert_from_sequential_state(
248            self._cursor_field, state, self._start
249        )
250
251    def observe(self, record: Record) -> None:
252        # Because observe writes to the most_recent_cursor_value_per_partition mapping,
253        # it is not thread-safe. However, this shouldn't lead to concurrency issues because
254        # observe() is only invoked by PartitionReader.process_partition(). Since the map is
255        # broken down according to partition, concurrent threads processing only read/write
256        # from different keys which avoids any conflicts.
257        #
258        # If we were to add thread safety, we should implement a lock per-partition
259        # which is instantiated during stream_slices()
260        most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get(
261            record.associated_slice
262        )
263        try:
264            cursor_value = self._extract_cursor_value(record)
265
266            if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value:
267                self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value
268            elif most_recent_cursor_value > cursor_value:
269                self._is_ascending_order = False
270        except ValueError:
271            self._log_for_record_without_cursor_value()
272
273    def _extract_cursor_value(self, record: Record) -> Any:
274        return self._connector_state_converter.parse_value(self._cursor_field.extract_value(record))
275
276    def close_partition(self, partition: Partition) -> None:
277        with self._lock:
278            slice_count_before = len(self._concurrent_state.get("slices", []))
279            self._add_slice_to_state(partition)
280            if slice_count_before < len(
281                self._concurrent_state["slices"]
282            ):  # only emit if at least one slice has been processed
283                self._merge_partitions()
284                self._emit_state_message()
285        self._has_closed_at_least_one_slice = True
286
287    def _add_slice_to_state(self, partition: Partition) -> None:
288        most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get(
289            partition.to_slice()
290        )
291
292        if self._slice_boundary_fields:
293            if "slices" not in self._concurrent_state:
294                raise RuntimeError(
295                    f"The state for stream {self._stream_name} should have at least one slice to delineate the sync start time, but no slices are present. This is unexpected. Please contact Support."
296                )
297            self._concurrent_state["slices"].append(
298                {
299                    self._connector_state_converter.START_KEY: self._extract_from_slice(
300                        partition, self._slice_boundary_fields[self._START_BOUNDARY]
301                    ),
302                    self._connector_state_converter.END_KEY: self._extract_from_slice(
303                        partition, self._slice_boundary_fields[self._END_BOUNDARY]
304                    ),
305                    self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value,
306                }
307            )
308        elif most_recent_cursor_value:
309            if self._has_closed_at_least_one_slice:
310                # If we track state value using records cursor field, we can only do that if there is one partition. This is because we save
311                # the state every time we close a partition. We assume that if there are multiple slices, they need to be providing
312                # boundaries. There are cases where partitions could not have boundaries:
313                # * The cursor should be per-partition
314                # * The stream state is actually the parent stream state
315                # There might be other cases not listed above. Those are not supported today hence the stream should not use this cursor for
316                # state management. For the specific user that was affected with this issue, we need to:
317                # * Fix state tracking (which is currently broken)
318                # * Make the new version available
319                # * (Probably) ask the user to reset the stream to avoid data loss
320                raise ValueError(
321                    "Given that slice_boundary_fields is not defined and that per-partition state is not supported, only one slice is "
322                    "expected. Please contact the Airbyte team."
323                )
324
325            self._concurrent_state["slices"].append(
326                {
327                    self._connector_state_converter.START_KEY: self.start,
328                    self._connector_state_converter.END_KEY: most_recent_cursor_value,
329                    self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value,
330                }
331            )
332
333    def _emit_state_message(self) -> None:
334        self._connector_state_manager.update_state_for_stream(
335            self._stream_name,
336            self._stream_namespace,
337            self.state,
338        )
339        state_message = self._connector_state_manager.create_state_message(
340            self._stream_name, self._stream_namespace
341        )
342        self._message_repository.emit_message(state_message)
343
344    def _merge_partitions(self) -> None:
345        self._concurrent_state["slices"] = self._connector_state_converter.merge_intervals(
346            self._concurrent_state["slices"]
347        )
348
349    def _extract_from_slice(self, partition: Partition, key: str) -> CursorValueType:
350        try:
351            _slice = partition.to_slice()
352            if not _slice:
353                raise KeyError(f"Could not find key `{key}` in empty slice")
354            return self._connector_state_converter.parse_value(_slice[key])  # type: ignore  # we expect the devs to specify a key that would return a CursorValueType
355        except KeyError as exception:
356            raise KeyError(
357                f"Partition is expected to have key `{key}` but could not be found"
358            ) from exception
359
360    def ensure_at_least_one_state_emitted(self) -> None:
361        """
362        The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be
363        called.
364        """
365        self._emit_state_message()
366
367    def stream_slices(self) -> Iterable[StreamSlice]:
368        """
369        Generating slices based on a few parameters:
370        * lookback_window: Buffer to remove from END_KEY of the highest slice
371        * slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created
372        * start: `_split_per_slice_range` will clip any value to `self._start which means that:
373          * if upper is less than self._start, no slices will be generated
374          * if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case)
375
376        Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be
377        inclusive in the API that is queried.
378        """
379        self._merge_partitions()
380
381        if self._start is not None and self._is_start_before_first_slice():
382            yield from self._split_per_slice_range(
383                self._start,
384                self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY],
385                False,
386            )
387
388        if len(self._concurrent_state["slices"]) == 1:
389            yield from self._split_per_slice_range(
390                self._calculate_lower_boundary_of_last_slice(
391                    self._concurrent_state["slices"][0][self._connector_state_converter.END_KEY]
392                ),
393                self._end_provider(),
394                True,
395            )
396        elif len(self._concurrent_state["slices"]) > 1:
397            for i in range(len(self._concurrent_state["slices"]) - 1):
398                if self._cursor_granularity:
399                    yield from self._split_per_slice_range(
400                        self._concurrent_state["slices"][i][self._connector_state_converter.END_KEY]
401                        + self._cursor_granularity,
402                        self._concurrent_state["slices"][i + 1][
403                            self._connector_state_converter.START_KEY
404                        ],
405                        False,
406                    )
407                else:
408                    yield from self._split_per_slice_range(
409                        self._concurrent_state["slices"][i][
410                            self._connector_state_converter.END_KEY
411                        ],
412                        self._concurrent_state["slices"][i + 1][
413                            self._connector_state_converter.START_KEY
414                        ],
415                        False,
416                    )
417            yield from self._split_per_slice_range(
418                self._calculate_lower_boundary_of_last_slice(
419                    self._concurrent_state["slices"][-1][self._connector_state_converter.END_KEY]
420                ),
421                self._end_provider(),
422                True,
423            )
424        else:
425            raise ValueError("Expected at least one slice")
426
427    def _is_start_before_first_slice(self) -> bool:
428        return (
429            self._start is not None
430            and self._start
431            < self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY]
432        )
433
434    def _calculate_lower_boundary_of_last_slice(
435        self, lower_boundary: CursorValueType
436    ) -> CursorValueType:
437        if self._lookback_window:
438            return lower_boundary - self._lookback_window
439        return lower_boundary
440
441    def _split_per_slice_range(
442        self, lower: CursorValueType, upper: CursorValueType, upper_is_end: bool
443    ) -> Iterable[StreamSlice]:
444        if lower >= upper:
445            return
446
447        if self._start and upper < self._start:
448            return
449
450        lower = max(lower, self._start) if self._start else lower
451        if not self._slice_range or self._evaluate_upper_safely(lower, self._slice_range) >= upper:
452            clamped_lower = self._clamping_strategy.clamp(lower)
453            clamped_upper = self._clamping_strategy.clamp(upper)
454            start_value, end_value = (
455                (clamped_lower, clamped_upper - self._cursor_granularity)
456                if self._cursor_granularity and not upper_is_end
457                else (clamped_lower, clamped_upper)
458            )
459            yield StreamSlice(
460                partition={},
461                cursor_slice={
462                    self._slice_boundary_fields_wrapper[
463                        self._START_BOUNDARY
464                    ]: self._connector_state_converter.output_format(start_value),
465                    self._slice_boundary_fields_wrapper[
466                        self._END_BOUNDARY
467                    ]: self._connector_state_converter.output_format(end_value),
468                },
469            )
470        else:
471            stop_processing = False
472            current_lower_boundary = lower
473            while not stop_processing:
474                current_upper_boundary = min(
475                    self._evaluate_upper_safely(current_lower_boundary, self._slice_range), upper
476                )
477                has_reached_upper_boundary = current_upper_boundary >= upper
478
479                clamped_upper = (
480                    self._clamping_strategy.clamp(current_upper_boundary)
481                    if current_upper_boundary != upper
482                    else current_upper_boundary
483                )
484                clamped_lower = self._clamping_strategy.clamp(current_lower_boundary)
485                if clamped_lower >= clamped_upper:
486                    # clamping collapsed both values which means that it is time to stop processing
487                    # FIXME should this be replace by proper end_provider
488                    break
489                start_value, end_value = (
490                    (clamped_lower, clamped_upper - self._cursor_granularity)
491                    if self._cursor_granularity
492                    and (not upper_is_end or not has_reached_upper_boundary)
493                    else (clamped_lower, clamped_upper)
494                )
495                yield StreamSlice(
496                    partition={},
497                    cursor_slice={
498                        self._slice_boundary_fields_wrapper[
499                            self._START_BOUNDARY
500                        ]: self._connector_state_converter.output_format(start_value),
501                        self._slice_boundary_fields_wrapper[
502                            self._END_BOUNDARY
503                        ]: self._connector_state_converter.output_format(end_value),
504                    },
505                )
506                current_lower_boundary = clamped_upper
507                if current_upper_boundary >= upper:
508                    stop_processing = True
509
510    def _evaluate_upper_safely(self, lower: CursorValueType, step: GapType) -> CursorValueType:
511        """
512        Given that we set the default step at datetime.timedelta.max, we will generate an OverflowError when evaluating the next start_date
513        This method assumes that users would never enter a step that would generate an overflow. Given that would be the case, the code
514        would have broken anyway.
515        """
516        try:
517            return lower + step
518        except OverflowError:
519            return self._end_provider()
520
521    def should_be_synced(self, record: Record) -> bool:
522        """
523        Determines if a record should be synced based on its cursor value.
524        :param record: The record to evaluate
525
526        :return: True if the record's cursor value falls within the sync boundaries
527        """
528        try:
529            record_cursor_value: CursorValueType = self._extract_cursor_value(record)
530        except ValueError:
531            self._log_for_record_without_cursor_value()
532            return True
533        return self.start <= record_cursor_value <= self._end_provider()
534
535    def _log_for_record_without_cursor_value(self) -> None:
536        if not self._should_be_synced_logger_triggered:
537            LOGGER.warning(
538                f"Could not find cursor field `{self.cursor_field.cursor_field_key}` in record for stream {self._stream_name}. The incremental sync will assume it needs to be synced"
539            )
540            self._should_be_synced_logger_triggered = True
541
542    def reduce_slice_range(self, stream_slice: StreamSlice) -> StreamSlice:
543        # In theory, we might be more flexible here meaning that it doesn't need to be in ascending order but it just
544        # needs to be ordered. For now though, we will only support ascending order.
545        if not self._is_ascending_order:
546            LOGGER.warning(
547                "Attempting to reduce slice while records are not returned in incremental order might lead to missing records"
548            )
549
550        if stream_slice in self._most_recent_cursor_value_per_partition:
551            return StreamSlice(
552                partition=stream_slice.partition,
553                cursor_slice={
554                    self._slice_boundary_fields_wrapper[
555                        self._START_BOUNDARY
556                    ]: self._connector_state_converter.output_format(
557                        self._most_recent_cursor_value_per_partition[stream_slice]
558                    ),
559                    self._slice_boundary_fields_wrapper[
560                        self._END_BOUNDARY
561                    ]: stream_slice.cursor_slice[
562                        self._slice_boundary_fields_wrapper[self._END_BOUNDARY]
563                    ],
564                },
565                extra_fields=stream_slice.extra_fields,
566            )
567        else:
568            return stream_slice

Slices the stream into chunks that can be fetched independently. Slices enable state checkpointing and data retrieval parallelization.

ConcurrentCursor( stream_name: str, stream_namespace: Optional[str], stream_state: Any, message_repository: airbyte_cdk.MessageRepository, connector_state_manager: airbyte_cdk.ConnectorStateManager, connector_state_converter: airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter.AbstractStreamStateConverter, cursor_field: CursorField, slice_boundary_fields: Optional[Tuple[str, str]], start: Optional[airbyte_cdk.sources.streams.concurrent.cursor_types.CursorValueType], end_provider: Callable[[], airbyte_cdk.sources.streams.concurrent.cursor_types.CursorValueType], lookback_window: Optional[airbyte_cdk.sources.streams.concurrent.cursor_types.GapType] = None, slice_range: Optional[airbyte_cdk.sources.streams.concurrent.cursor_types.GapType] = None, cursor_granularity: Optional[airbyte_cdk.sources.streams.concurrent.cursor_types.GapType] = None, clamping_strategy: airbyte_cdk.sources.streams.concurrent.clamping.ClampingStrategy = <airbyte_cdk.sources.streams.concurrent.clamping.NoClamping object>)
158    def __init__(
159        self,
160        stream_name: str,
161        stream_namespace: Optional[str],
162        stream_state: Any,
163        message_repository: MessageRepository,
164        connector_state_manager: ConnectorStateManager,
165        connector_state_converter: AbstractStreamStateConverter,
166        cursor_field: CursorField,
167        slice_boundary_fields: Optional[Tuple[str, str]],
168        start: Optional[CursorValueType],
169        end_provider: Callable[[], CursorValueType],
170        lookback_window: Optional[GapType] = None,
171        slice_range: Optional[GapType] = None,
172        cursor_granularity: Optional[GapType] = None,
173        clamping_strategy: ClampingStrategy = NoClamping(),
174    ) -> None:
175        self._stream_name = stream_name
176        self._stream_namespace = stream_namespace
177        self._message_repository = message_repository
178        self._connector_state_converter = connector_state_converter
179        self._connector_state_manager = connector_state_manager
180        self._cursor_field = cursor_field
181        # To see some example where the slice boundaries might not be defined, check https://github.com/airbytehq/airbyte/blob/1ce84d6396e446e1ac2377362446e3fb94509461/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py#L363-L379
182        self._slice_boundary_fields = slice_boundary_fields
183        self._start = start
184        self._end_provider = end_provider
185        self.start, self._concurrent_state = self._get_concurrent_state(stream_state)
186        self._lookback_window = lookback_window
187        self._slice_range = slice_range
188        self._most_recent_cursor_value_per_partition: MutableMapping[
189            Union[StreamSlice, Mapping[str, Any], None], Any
190        ] = {}
191        self._has_closed_at_least_one_slice = False
192        self._cursor_granularity = cursor_granularity
193        # Flag to track if the logger has been triggered (per stream)
194        self._should_be_synced_logger_triggered = False
195        self._clamping_strategy = clamping_strategy
196        self._is_ascending_order = True
197
198        # A lock is required when closing a partition because updating the cursor's concurrent_state is
199        # not thread safe. When multiple partitions are being closed by the cursor at the same time, it is
200        # possible for one partition to update concurrent_state after a second partition has already read
201        # the previous state. This can lead to the second partition overwriting the previous one's state.
202        self._lock = threading.Lock()
def copy_without_state(self) -> ConcurrentCursor:
140    def copy_without_state(self) -> "ConcurrentCursor":
141        return self.__class__(
142            stream_name=self._stream_name,
143            stream_namespace=self._stream_namespace,
144            stream_state={},
145            message_repository=NoopMessageRepository(),
146            connector_state_manager=ConnectorStateManager(),
147            connector_state_converter=self._connector_state_converter,
148            cursor_field=self._cursor_field,
149            slice_boundary_fields=self._slice_boundary_fields,
150            start=self._start,
151            end_provider=self._end_provider,
152            lookback_window=self._lookback_window,
153            slice_range=self._slice_range,
154            cursor_granularity=self._cursor_granularity,
155            clamping_strategy=self._clamping_strategy,
156        )
state: MutableMapping[str, Any]
204    @property
205    def state(self) -> MutableMapping[str, Any]:
206        return self._connector_state_converter.convert_to_state_message(
207            self.cursor_field, self._concurrent_state
208        )
cursor_field: CursorField
210    @property
211    def cursor_field(self) -> CursorField:
212        return self._cursor_field
def observe(self, record: airbyte_cdk.Record) -> None:
251    def observe(self, record: Record) -> None:
252        # Because observe writes to the most_recent_cursor_value_per_partition mapping,
253        # it is not thread-safe. However, this shouldn't lead to concurrency issues because
254        # observe() is only invoked by PartitionReader.process_partition(). Since the map is
255        # broken down according to partition, concurrent threads processing only read/write
256        # from different keys which avoids any conflicts.
257        #
258        # If we were to add thread safety, we should implement a lock per-partition
259        # which is instantiated during stream_slices()
260        most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get(
261            record.associated_slice
262        )
263        try:
264            cursor_value = self._extract_cursor_value(record)
265
266            if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value:
267                self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value
268            elif most_recent_cursor_value > cursor_value:
269                self._is_ascending_order = False
270        except ValueError:
271            self._log_for_record_without_cursor_value()

Indicate to the cursor that the record has been emitted

def close_partition( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition) -> None:
276    def close_partition(self, partition: Partition) -> None:
277        with self._lock:
278            slice_count_before = len(self._concurrent_state.get("slices", []))
279            self._add_slice_to_state(partition)
280            if slice_count_before < len(
281                self._concurrent_state["slices"]
282            ):  # only emit if at least one slice has been processed
283                self._merge_partitions()
284                self._emit_state_message()
285        self._has_closed_at_least_one_slice = True

Indicate to the cursor that the partition has been successfully processed

def ensure_at_least_one_state_emitted(self) -> None:
360    def ensure_at_least_one_state_emitted(self) -> None:
361        """
362        The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be
363        called.
364        """
365        self._emit_state_message()

The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be called.

def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
367    def stream_slices(self) -> Iterable[StreamSlice]:
368        """
369        Generating slices based on a few parameters:
370        * lookback_window: Buffer to remove from END_KEY of the highest slice
371        * slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created
372        * start: `_split_per_slice_range` will clip any value to `self._start which means that:
373          * if upper is less than self._start, no slices will be generated
374          * if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case)
375
376        Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be
377        inclusive in the API that is queried.
378        """
379        self._merge_partitions()
380
381        if self._start is not None and self._is_start_before_first_slice():
382            yield from self._split_per_slice_range(
383                self._start,
384                self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY],
385                False,
386            )
387
388        if len(self._concurrent_state["slices"]) == 1:
389            yield from self._split_per_slice_range(
390                self._calculate_lower_boundary_of_last_slice(
391                    self._concurrent_state["slices"][0][self._connector_state_converter.END_KEY]
392                ),
393                self._end_provider(),
394                True,
395            )
396        elif len(self._concurrent_state["slices"]) > 1:
397            for i in range(len(self._concurrent_state["slices"]) - 1):
398                if self._cursor_granularity:
399                    yield from self._split_per_slice_range(
400                        self._concurrent_state["slices"][i][self._connector_state_converter.END_KEY]
401                        + self._cursor_granularity,
402                        self._concurrent_state["slices"][i + 1][
403                            self._connector_state_converter.START_KEY
404                        ],
405                        False,
406                    )
407                else:
408                    yield from self._split_per_slice_range(
409                        self._concurrent_state["slices"][i][
410                            self._connector_state_converter.END_KEY
411                        ],
412                        self._concurrent_state["slices"][i + 1][
413                            self._connector_state_converter.START_KEY
414                        ],
415                        False,
416                    )
417            yield from self._split_per_slice_range(
418                self._calculate_lower_boundary_of_last_slice(
419                    self._concurrent_state["slices"][-1][self._connector_state_converter.END_KEY]
420                ),
421                self._end_provider(),
422                True,
423            )
424        else:
425            raise ValueError("Expected at least one slice")

Generating slices based on a few parameters:

  • lookback_window: Buffer to remove from END_KEY of the highest slice
  • slice_range: Max difference between two slices. If the difference between two slices is greater, multiple slices will be created
  • start: _split_per_slice_range will clip any value to `self._start which means that:
    • if upper is less than self._start, no slices will be generated
    • if lower is less than self._start, self._start will be used as the lower boundary (lookback_window will not be considered in that case)

Note that the slices will overlap at their boundaries. We therefore expect to have at least the lower or the upper boundary to be inclusive in the API that is queried.

def should_be_synced(self, record: airbyte_cdk.Record) -> bool:
521    def should_be_synced(self, record: Record) -> bool:
522        """
523        Determines if a record should be synced based on its cursor value.
524        :param record: The record to evaluate
525
526        :return: True if the record's cursor value falls within the sync boundaries
527        """
528        try:
529            record_cursor_value: CursorValueType = self._extract_cursor_value(record)
530        except ValueError:
531            self._log_for_record_without_cursor_value()
532            return True
533        return self.start <= record_cursor_value <= self._end_provider()

Determines if a record should be synced based on its cursor value.

Parameters
  • record: The record to evaluate
Returns

True if the record's cursor value falls within the sync boundaries

def reduce_slice_range( self, stream_slice: airbyte_cdk.StreamSlice) -> airbyte_cdk.StreamSlice:
542    def reduce_slice_range(self, stream_slice: StreamSlice) -> StreamSlice:
543        # In theory, we might be more flexible here meaning that it doesn't need to be in ascending order but it just
544        # needs to be ordered. For now though, we will only support ascending order.
545        if not self._is_ascending_order:
546            LOGGER.warning(
547                "Attempting to reduce slice while records are not returned in incremental order might lead to missing records"
548            )
549
550        if stream_slice in self._most_recent_cursor_value_per_partition:
551            return StreamSlice(
552                partition=stream_slice.partition,
553                cursor_slice={
554                    self._slice_boundary_fields_wrapper[
555                        self._START_BOUNDARY
556                    ]: self._connector_state_converter.output_format(
557                        self._most_recent_cursor_value_per_partition[stream_slice]
558                    ),
559                    self._slice_boundary_fields_wrapper[
560                        self._END_BOUNDARY
561                    ]: stream_slice.cursor_slice[
562                        self._slice_boundary_fields_wrapper[self._END_BOUNDARY]
563                    ],
564                },
565                extra_fields=stream_slice.extra_fields,
566            )
567        else:
568            return stream_slice