airbyte_cdk.sources.declarative.incremental

 1#
 2# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
 3#
 4
 5from airbyte_cdk.sources.declarative.incremental.concurrent_partition_cursor import (
 6    ConcurrentCursorFactory,
 7    ConcurrentPerPartitionCursor,
 8)
 9
10__all__ = [
11    "ConcurrentCursorFactory",
12    "ConcurrentPerPartitionCursor",
13]
class ConcurrentCursorFactory:
 96class ConcurrentCursorFactory:
 97    def __init__(self, create_function: Callable[..., ConcurrentCursor]):
 98        self._create_function = create_function
 99
100    def create(
101        self, stream_state: Mapping[str, Any], runtime_lookback_window: Optional[timedelta]
102    ) -> ConcurrentCursor:
103        return self._create_function(
104            stream_state=stream_state, runtime_lookback_window=runtime_lookback_window
105        )
ConcurrentCursorFactory( create_function: Callable[..., airbyte_cdk.ConcurrentCursor])
97    def __init__(self, create_function: Callable[..., ConcurrentCursor]):
98        self._create_function = create_function
def create( self, stream_state: Mapping[str, Any], runtime_lookback_window: Optional[datetime.timedelta]) -> airbyte_cdk.ConcurrentCursor:
100    def create(
101        self, stream_state: Mapping[str, Any], runtime_lookback_window: Optional[timedelta]
102    ) -> ConcurrentCursor:
103        return self._create_function(
104            stream_state=stream_state, runtime_lookback_window=runtime_lookback_window
105        )
class ConcurrentPerPartitionCursor(airbyte_cdk.sources.streams.concurrent.cursor.Cursor):
108class ConcurrentPerPartitionCursor(Cursor):
109    """
110    Manages state per partition when a stream has many partitions, preventing data loss or duplication.
111
112    Attributes:
113        DEFAULT_MAX_PARTITIONS_NUMBER (int): Maximum number of partitions to retain in memory (default is 10,000). This limit needs to be higher than the number of threads we might enqueue (which is represented by ThreadPoolManager.DEFAULT_MAX_QUEUE_SIZE). If not, we could have partitions that have been generated and submitted to the ThreadPool but got deleted from the ConcurrentPerPartitionCursor and when closing them, it will generate KeyError.
114
115    - **Partition Limitation Logic**
116      Ensures the number of tracked partitions does not exceed the specified limit to prevent memory overuse. Oldest partitions are removed when the limit is reached.
117
118    - **Global Cursor Fallback**
119      New partitions use global state as the initial state to progress the state for deleted or new partitions. The history data added after the initial sync will be missing.
120
121    CurrentPerPartitionCursor expects the state of the ConcurrentCursor to follow the format {cursor_field: cursor_value}.
122    """
123
124    DEFAULT_MAX_PARTITIONS_NUMBER = 25_000
125    SWITCH_TO_GLOBAL_LIMIT = 10_000
126    _NO_STATE: Mapping[str, Any] = {}
127    _NO_CURSOR_STATE: Mapping[str, Any] = {}
128    _GLOBAL_STATE_KEY = "state"
129    _PERPARTITION_STATE_KEY = "states"
130    _IS_PARTITION_DUPLICATION_LOGGED = False
131    _PARENT_STATE = 0
132    _GENERATION_SEQUENCE = 1
133
134    def __init__(
135        self,
136        cursor_factory: ConcurrentCursorFactory,
137        partition_router: PartitionRouter,
138        stream_name: str,
139        stream_namespace: Optional[str],
140        stream_state: Any,
141        message_repository: MessageRepository,
142        connector_state_manager: ConnectorStateManager,
143        connector_state_converter: AbstractStreamStateConverter,
144        cursor_field: CursorField,
145        use_global_cursor: bool = False,
146        attempt_to_create_cursor_if_not_provided: bool = False,
147    ) -> None:
148        self._global_cursor: Optional[StreamState] = {}
149        self._stream_name = stream_name
150        self._stream_namespace = stream_namespace
151        self._message_repository = message_repository
152        self._connector_state_manager = connector_state_manager
153        self._connector_state_converter = connector_state_converter
154        self._cursor_field = cursor_field
155
156        self._cursor_factory = cursor_factory  # self._cursor_factory is flagged as private but is used in model_to_component_factory to ease pagination reset instantiation
157        self._partition_router = partition_router
158
159        # The dict is ordered to ensure that once the maximum number of partitions is reached,
160        # the oldest partitions can be efficiently removed, maintaining the most recent partitions.
161        self._cursor_per_partition: OrderedDict[str, ConcurrentCursor] = OrderedDict()
162        self._semaphore_per_partition: OrderedDict[str, threading.Semaphore] = OrderedDict()
163
164        # Parent-state tracking: store each partition’s parent state in creation order
165        self._partition_parent_state_map: OrderedDict[str, tuple[Mapping[str, Any], int]] = (
166            OrderedDict()
167        )
168        self._parent_state: Optional[StreamState] = None
169
170        # Tracks when the last slice for partition is emitted
171        self._partitions_done_generating_stream_slices: set[str] = set()
172        # Used to track the index of partitions that are not closed yet
173        self._processing_partitions_indexes: List[int] = list()
174        self._generated_partitions_count: int = 0
175        # Dictionary to map partition keys to their index
176        self._partition_key_to_index: dict[str, int] = {}
177
178        self._lock = threading.Lock()
179        self._lookback_window: int = 0
180        self._new_global_cursor: Optional[StreamState] = None
181        self._number_of_partitions: int = 0
182        self._use_global_cursor: bool = use_global_cursor
183        self._partition_serializer = PerPartitionKeySerializer()
184
185        # Track the last time a state message was emitted
186        self._last_emission_time: float = 0.0
187        self._timer = Timer()
188
189        self._set_initial_state(stream_state)
190
191        # FIXME this is a temporary field the time of the migration from declarative cursors to concurrent ones
192        self._attempt_to_create_cursor_if_not_provided = attempt_to_create_cursor_if_not_provided
193        self._synced_some_data = False
194        self._logged_regarding_datetime_format_error = False
195
196    @property
197    def cursor_field(self) -> CursorField:
198        return self._cursor_field
199
200    @property
201    def state(self) -> MutableMapping[str, Any]:
202        state: dict[str, Any] = {"use_global_cursor": self._use_global_cursor}
203        if not self._use_global_cursor:
204            states = []
205            for partition_tuple, cursor in self._cursor_per_partition.items():
206                if cursor.state:
207                    states.append(
208                        {
209                            "partition": self._to_dict(partition_tuple),
210                            "cursor": copy.deepcopy(cursor.state),
211                        }
212                    )
213            state[self._PERPARTITION_STATE_KEY] = states
214
215        if self._global_cursor:
216            state[self._GLOBAL_STATE_KEY] = self._global_cursor
217        if self._lookback_window is not None:
218            state["lookback_window"] = self._lookback_window
219        if self._parent_state is not None:
220            state["parent_state"] = self._parent_state
221        return state
222
223    def close_partition(self, partition: Partition) -> None:
224        # Attempt to retrieve the stream slice
225        stream_slice: Optional[StreamSlice] = partition.to_slice()  # type: ignore[assignment]
226
227        # Ensure stream_slice is not None
228        if stream_slice is None:
229            raise ValueError("stream_slice cannot be None")
230
231        partition_key = self._to_partition_key(stream_slice.partition)
232        with self._lock:
233            self._semaphore_per_partition[partition_key].acquire()
234            if not self._use_global_cursor:
235                cursor = self._cursor_per_partition[partition_key]
236                cursor.close_partition(partition=partition)
237                if (
238                    partition_key in self._partitions_done_generating_stream_slices
239                    and self._semaphore_per_partition[partition_key]._value == 0
240                ):
241                    self._update_global_cursor(cursor.state[self.cursor_field.cursor_field_key])
242
243            # Clean up the partition if it is fully processed
244            self._cleanup_if_done(partition_key)
245
246            self._check_and_update_parent_state()
247
248            self._emit_state_message()
249
250    def _check_and_update_parent_state(self) -> None:
251        last_closed_state = None
252
253        while self._partition_parent_state_map:
254            earliest_key, (candidate_state, candidate_seq) = next(
255                iter(self._partition_parent_state_map.items())
256            )
257
258            # if any partition that started <= candidate_seq is still open, we must wait
259            if (
260                self._processing_partitions_indexes
261                and self._processing_partitions_indexes[0] <= candidate_seq
262            ):
263                break
264
265            # safe to pop
266            self._partition_parent_state_map.popitem(last=False)
267            last_closed_state = candidate_state
268
269        if last_closed_state is not None:
270            self._parent_state = last_closed_state
271
272    def ensure_at_least_one_state_emitted(self) -> None:
273        """
274        The platform expects at least one state message on successful syncs. Hence, whatever happens, we expect this method to be
275        called.
276        """
277        if not any(
278            semaphore_item[1]._value for semaphore_item in self._semaphore_per_partition.items()
279        ):
280            if self._synced_some_data:
281                # we only update those if we actually synced some data
282                self._global_cursor = self._new_global_cursor
283                self._lookback_window = self._timer.finish()
284            self._parent_state = self._partition_router.get_stream_state()
285        self._emit_state_message(throttle=False)
286
287    def _throttle_state_message(self) -> Optional[float]:
288        """
289        Throttles the state message emission to once every 600 seconds.
290        """
291        current_time = time.time()
292        if current_time - self._last_emission_time <= 600:
293            return None
294        return current_time
295
296    def _emit_state_message(self, throttle: bool = True) -> None:
297        if throttle:
298            current_time = self._throttle_state_message()
299            if current_time is None:
300                return
301            self._last_emission_time = current_time
302            # Skip state emit for global cursor if parent state is empty
303            if self._use_global_cursor and not self._parent_state:
304                return
305
306        self._connector_state_manager.update_state_for_stream(
307            self._stream_name,
308            self._stream_namespace,
309            self.state,
310        )
311        state_message = self._connector_state_manager.create_state_message(
312            self._stream_name, self._stream_namespace
313        )
314        self._message_repository.emit_message(state_message)
315
316    def stream_slices(self) -> Iterable[StreamSlice]:
317        if self._timer.is_running():
318            raise RuntimeError("stream_slices has been executed more than once.")
319
320        slices = self._partition_router.stream_slices()
321        self._timer.start()
322        for partition, last, parent_state in iterate_with_last_flag_and_state(
323            slices, self._partition_router.get_stream_state
324        ):
325            yield from self._generate_slices_from_partition(partition, parent_state)
326
327    def _generate_slices_from_partition(
328        self, partition: StreamSlice, parent_state: Mapping[str, Any]
329    ) -> Iterable[StreamSlice]:
330        # Ensure the maximum number of partitions is not exceeded
331        self._ensure_partition_limit()
332
333        partition_key = self._to_partition_key(partition.partition)
334
335        cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition))
336        if not cursor:
337            cursor = self._create_cursor(
338                self._global_cursor,
339                self._lookback_window if self._global_cursor else 0,
340            )
341            with self._lock:
342                self._number_of_partitions += 1
343                self._cursor_per_partition[partition_key] = cursor
344
345        if partition_key in self._semaphore_per_partition:
346            if not self._IS_PARTITION_DUPLICATION_LOGGED:
347                logger.warning(f"Partition duplication detected for stream {self._stream_name}")
348                self._IS_PARTITION_DUPLICATION_LOGGED = True
349            return
350        else:
351            self._semaphore_per_partition[partition_key] = threading.Semaphore(0)
352
353        with self._lock:
354            seq = self._generated_partitions_count
355            self._generated_partitions_count += 1
356            self._processing_partitions_indexes.append(seq)
357            self._partition_key_to_index[partition_key] = seq
358
359            if (
360                len(self._partition_parent_state_map) == 0
361                or self._partition_parent_state_map[
362                    next(reversed(self._partition_parent_state_map))
363                ][self._PARENT_STATE]
364                != parent_state
365            ):
366                self._partition_parent_state_map[partition_key] = (deepcopy(parent_state), seq)
367
368        for cursor_slice, is_last_slice, _ in iterate_with_last_flag_and_state(
369            cursor.stream_slices(),
370            lambda: None,
371        ):
372            self._semaphore_per_partition[partition_key].release()
373            if is_last_slice:
374                self._partitions_done_generating_stream_slices.add(partition_key)
375            yield StreamSlice(
376                partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields
377            )
378
379    def _ensure_partition_limit(self) -> None:
380        """
381        Ensure the maximum number of partitions does not exceed the predefined limit.
382
383        Steps:
384        1. Attempt to remove partitions that are marked as finished in `_finished_partitions`.
385           These partitions are considered processed and safe to delete.
386        2. If the limit is still exceeded and no finished partitions are available for removal,
387           remove the oldest partition unconditionally. We expect failed partitions to be removed.
388
389        Logging:
390        - Logs a warning each time a partition is removed, indicating whether it was finished
391          or removed due to being the oldest.
392        """
393        if not self._use_global_cursor and self.limit_reached():
394            logger.info(
395                f"Exceeded the 'SWITCH_TO_GLOBAL_LIMIT' of {self.SWITCH_TO_GLOBAL_LIMIT}. "
396                f"Switching to global cursor for {self._stream_name}."
397            )
398            self._use_global_cursor = True
399
400        with self._lock:
401            while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1:
402                # Try removing finished partitions first
403                for partition_key in list(self._cursor_per_partition.keys()):
404                    if partition_key not in self._partition_key_to_index:
405                        oldest_partition = self._cursor_per_partition.pop(
406                            partition_key
407                        )  # Remove the oldest partition
408                        logger.debug(
409                            f"The maximum number of partitions has been reached. Dropping the oldest finished partition: {oldest_partition}. Over limit: {self._number_of_partitions - self.DEFAULT_MAX_PARTITIONS_NUMBER}."
410                        )
411                        break
412                else:
413                    # If no finished partitions can be removed, fall back to removing the oldest partition
414                    oldest_partition = self._cursor_per_partition.popitem(last=False)[
415                        1
416                    ]  # Remove the oldest partition
417                    logger.warning(
418                        f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._number_of_partitions - self.DEFAULT_MAX_PARTITIONS_NUMBER}."
419                    )
420
421    def _set_initial_state(self, stream_state: StreamState) -> None:
422        """
423        Initialize the cursor's state using the provided `stream_state`.
424
425        This method supports global and per-partition state initialization.
426
427        - **Global State**: If `states` is missing, the `state` is treated as global and applied to all partitions.
428          The `global state` holds a single cursor position representing the latest processed record across all partitions.
429
430        - **Lookback Window**: Configured via `lookback_window`, it defines the period (in seconds) for reprocessing records.
431          This ensures robustness in case of upstream data delays or reordering. If not specified, it defaults to 0.
432
433        - **Per-Partition State**: If `states` is present, each partition's cursor state is initialized separately.
434
435        - **Parent State**: (if available) Used to initialize partition routers based on parent streams.
436
437        Args:
438            stream_state (StreamState): The state of the streams to be set. The format of the stream state should be:
439                {
440                    "states": [
441                        {
442                            "partition": {
443                                "partition_key": "value"
444                            },
445                            "cursor": {
446                                "last_updated": "2023-05-27T00:00:00Z"
447                            }
448                        }
449                    ],
450                    "state": {
451                        "last_updated": "2023-05-27T00:00:00Z"
452                    },
453                    lookback_window: 10,
454                    "parent_state": {
455                        "parent_stream_name": {
456                            "last_updated": "2023-05-27T00:00:00Z"
457                        }
458                    }
459                }
460        """
461        if not stream_state:
462            return
463
464        if (
465            self._PERPARTITION_STATE_KEY not in stream_state
466            and self._GLOBAL_STATE_KEY not in stream_state
467        ):
468            # We assume that `stream_state` is in a global format that can be applied to all partitions.
469            # Example: {"global_state_format_key": "global_state_format_value"}
470            self._set_global_state(stream_state)
471
472        else:
473            self._use_global_cursor = stream_state.get("use_global_cursor", False)
474
475            self._lookback_window = int(stream_state.get("lookback_window", 0))
476
477            for state in stream_state.get(self._PERPARTITION_STATE_KEY, []):
478                self._number_of_partitions += 1
479                self._cursor_per_partition[self._to_partition_key(state["partition"])] = (
480                    self._create_cursor(state["cursor"])
481                )
482
483            # set default state for missing partitions if it is per partition with fallback to global
484            if self._GLOBAL_STATE_KEY in stream_state:
485                self._set_global_state(stream_state[self._GLOBAL_STATE_KEY])
486
487        # Set initial parent state
488        if stream_state.get("parent_state"):
489            self._parent_state = stream_state["parent_state"]
490
491    def _set_global_state(self, stream_state: Mapping[str, Any]) -> None:
492        """
493        Initializes the global cursor state from the provided stream state.
494
495        If the cursor field key is present in the stream state, its value is parsed,
496        formatted, and stored as the global cursor. This ensures consistency in state
497        representation across partitions.
498        """
499        if self.cursor_field.cursor_field_key in stream_state:
500            global_state_value = stream_state[self.cursor_field.cursor_field_key]
501            final_format_global_state_value = self._connector_state_converter.output_format(
502                self._connector_state_converter.parse_value(global_state_value)
503            )
504
505            fixed_global_state = {
506                self.cursor_field.cursor_field_key: final_format_global_state_value
507            }
508
509            self._global_cursor = deepcopy(fixed_global_state)
510            self._new_global_cursor = deepcopy(fixed_global_state)
511
512    def observe(self, record: Record) -> None:
513        if not record.associated_slice:
514            raise ValueError(
515                "Invalid state as stream slices that are emitted should refer to an existing cursor"
516            )
517
518        # if the current record has no cursor value, we cannot meaningfully update the state based on it, so there is nothing more to do
519        try:
520            record_cursor_value = self._cursor_field.extract_value(record)
521        except ValueError:
522            return
523
524        try:
525            record_cursor = self._connector_state_converter.output_format(
526                self._connector_state_converter.parse_value(record_cursor_value)
527            )
528        except ValueError as exception:
529            if not self._logged_regarding_datetime_format_error:
530                logger.warning(
531                    "Skipping cursor update for stream '%s': failed to parse cursor field '%s' value %r: %s",
532                    self._stream_name,
533                    self._cursor_field.cursor_field_key,
534                    record_cursor_value,
535                    exception,
536                )
537                self._logged_regarding_datetime_format_error = True
538            return
539
540        self._synced_some_data = True
541        self._update_global_cursor(record_cursor)
542        if not self._use_global_cursor:
543            self._cursor_per_partition[
544                self._to_partition_key(record.associated_slice.partition)
545            ].observe(record)
546
547    def _update_global_cursor(self, value: Any) -> None:
548        if (
549            self._new_global_cursor is None
550            or self._new_global_cursor[self.cursor_field.cursor_field_key] < value
551        ):
552            self._new_global_cursor = {self.cursor_field.cursor_field_key: copy.deepcopy(value)}
553
554    def _cleanup_if_done(self, partition_key: str) -> None:
555        """
556        Free every in-memory structure that belonged to a completed partition:
557        cursor, semaphore, flag inside `_finished_partitions`
558        """
559        if not (
560            partition_key in self._partitions_done_generating_stream_slices
561            and self._semaphore_per_partition[partition_key]._value == 0
562        ):
563            return
564
565        self._semaphore_per_partition.pop(partition_key, None)
566        self._partitions_done_generating_stream_slices.discard(partition_key)
567
568        seq = self._partition_key_to_index.pop(partition_key)
569        self._processing_partitions_indexes.remove(seq)
570
571        logger.debug(f"Partition {partition_key} fully processed and cleaned up.")
572
573    def _to_partition_key(self, partition: Mapping[str, Any]) -> str:
574        return self._partition_serializer.to_partition_key(partition)
575
576    def _to_dict(self, partition_key: str) -> Mapping[str, Any]:
577        return self._partition_serializer.to_partition(partition_key)
578
579    def _create_cursor(
580        self, cursor_state: Any, runtime_lookback_window: int = 0
581    ) -> ConcurrentCursor:
582        cursor = self._cursor_factory.create(
583            stream_state=deepcopy(cursor_state),
584            runtime_lookback_window=timedelta(seconds=runtime_lookback_window),
585        )
586        return cursor
587
588    def should_be_synced(self, record: Record) -> bool:
589        return self._get_cursor(record).should_be_synced(record)
590
591    def _get_cursor(self, record: Record) -> ConcurrentCursor:
592        if not record.associated_slice:
593            raise ValueError(
594                "Invalid state as stream slices that are emitted should refer to an existing cursor"
595            )
596
597        if self._use_global_cursor:
598            return self._create_cursor(
599                self._global_cursor,
600                self._lookback_window if self._global_cursor else 0,
601            )
602
603        partition_key = self._to_partition_key(record.associated_slice.partition)
604        if (
605            partition_key not in self._cursor_per_partition
606            and not self._attempt_to_create_cursor_if_not_provided
607        ):
608            raise ValueError(
609                "Invalid state as stream slices that are emitted should refer to an existing cursor"
610            )
611        elif partition_key not in self._cursor_per_partition:
612            return self._create_cursor(
613                self._global_cursor,
614                self._lookback_window if self._global_cursor else 0,
615            )
616        else:
617            return self._cursor_per_partition[partition_key]
618
619    def limit_reached(self) -> bool:
620        return self._number_of_partitions > self.SWITCH_TO_GLOBAL_LIMIT
621
622    @staticmethod
623    def get_parent_state(
624        stream_state: Optional[StreamState], parent_stream_name: str
625    ) -> Optional[AirbyteStateMessage]:
626        if not stream_state:
627            return None
628
629        if "parent_state" not in stream_state:
630            logger.warning(
631                f"Trying to get_parent_state for stream `{parent_stream_name}` when there are not parent state in the state"
632            )
633            return None
634        elif parent_stream_name not in stream_state["parent_state"]:
635            logger.info(
636                f"Could not find parent state for stream `{parent_stream_name}`. On parents available are {list(stream_state['parent_state'].keys())}"
637            )
638            return None
639
640        return AirbyteStateMessage(
641            type=AirbyteStateType.STREAM,
642            stream=AirbyteStreamState(
643                stream_descriptor=StreamDescriptor(parent_stream_name, None),
644                stream_state=AirbyteStateBlob(stream_state["parent_state"][parent_stream_name]),
645            ),
646        )
647
648    @staticmethod
649    def get_global_state(
650        stream_state: Optional[StreamState], parent_stream_name: str
651    ) -> Optional[AirbyteStateMessage]:
652        return (
653            AirbyteStateMessage(
654                type=AirbyteStateType.STREAM,
655                stream=AirbyteStreamState(
656                    stream_descriptor=StreamDescriptor(parent_stream_name, None),
657                    stream_state=AirbyteStateBlob(stream_state["state"]),
658                ),
659            )
660            if stream_state and "state" in stream_state
661            else None
662        )
663
664    def get_cursor_datetime_from_state(
665        self, stream_state: Mapping[str, Any]
666    ) -> datetime.datetime | None:
667        """Extract and parse the cursor datetime from the global cursor in per-partition state.
668
669        For per-partition cursors, the global cursor is stored under the "state" key.
670        This method delegates to the underlying cursor factory to parse the datetime.
671
672        Returns None if the global cursor is not present or cannot be parsed.
673        """
674        global_state = stream_state.get(self._GLOBAL_STATE_KEY)
675        if not global_state or not isinstance(global_state, dict):
676            return None
677
678        # Create a cursor to delegate the parsing
679        cursor = self._cursor_factory.create(stream_state={}, runtime_lookback_window=None)
680        return cursor.get_cursor_datetime_from_state(global_state)

Manages state per partition when a stream has many partitions, preventing data loss or duplication.

Attributes:
  • DEFAULT_MAX_PARTITIONS_NUMBER (int): Maximum number of partitions to retain in memory (default is 10,000). This limit needs to be higher than the number of threads we might enqueue (which is represented by ThreadPoolManager.DEFAULT_MAX_QUEUE_SIZE). If not, we could have partitions that have been generated and submitted to the ThreadPool but got deleted from the ConcurrentPerPartitionCursor and when closing them, it will generate KeyError.

    • Partition Limitation Logic Ensures the number of tracked partitions does not exceed the specified limit to prevent memory overuse. Oldest partitions are removed when the limit is reached.

    • Global Cursor Fallback New partitions use global state as the initial state to progress the state for deleted or new partitions. The history data added after the initial sync will be missing.

CurrentPerPartitionCursor expects the state of the ConcurrentCursor to follow the format {cursor_field: cursor_value}.

ConcurrentPerPartitionCursor( cursor_factory: ConcurrentCursorFactory, partition_router: airbyte_cdk.sources.declarative.partition_routers.PartitionRouter, stream_name: str, stream_namespace: Optional[str], stream_state: Any, message_repository: airbyte_cdk.MessageRepository, connector_state_manager: airbyte_cdk.ConnectorStateManager, connector_state_converter: airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter.AbstractStreamStateConverter, cursor_field: airbyte_cdk.CursorField, use_global_cursor: bool = False, attempt_to_create_cursor_if_not_provided: bool = False)
134    def __init__(
135        self,
136        cursor_factory: ConcurrentCursorFactory,
137        partition_router: PartitionRouter,
138        stream_name: str,
139        stream_namespace: Optional[str],
140        stream_state: Any,
141        message_repository: MessageRepository,
142        connector_state_manager: ConnectorStateManager,
143        connector_state_converter: AbstractStreamStateConverter,
144        cursor_field: CursorField,
145        use_global_cursor: bool = False,
146        attempt_to_create_cursor_if_not_provided: bool = False,
147    ) -> None:
148        self._global_cursor: Optional[StreamState] = {}
149        self._stream_name = stream_name
150        self._stream_namespace = stream_namespace
151        self._message_repository = message_repository
152        self._connector_state_manager = connector_state_manager
153        self._connector_state_converter = connector_state_converter
154        self._cursor_field = cursor_field
155
156        self._cursor_factory = cursor_factory  # self._cursor_factory is flagged as private but is used in model_to_component_factory to ease pagination reset instantiation
157        self._partition_router = partition_router
158
159        # The dict is ordered to ensure that once the maximum number of partitions is reached,
160        # the oldest partitions can be efficiently removed, maintaining the most recent partitions.
161        self._cursor_per_partition: OrderedDict[str, ConcurrentCursor] = OrderedDict()
162        self._semaphore_per_partition: OrderedDict[str, threading.Semaphore] = OrderedDict()
163
164        # Parent-state tracking: store each partition’s parent state in creation order
165        self._partition_parent_state_map: OrderedDict[str, tuple[Mapping[str, Any], int]] = (
166            OrderedDict()
167        )
168        self._parent_state: Optional[StreamState] = None
169
170        # Tracks when the last slice for partition is emitted
171        self._partitions_done_generating_stream_slices: set[str] = set()
172        # Used to track the index of partitions that are not closed yet
173        self._processing_partitions_indexes: List[int] = list()
174        self._generated_partitions_count: int = 0
175        # Dictionary to map partition keys to their index
176        self._partition_key_to_index: dict[str, int] = {}
177
178        self._lock = threading.Lock()
179        self._lookback_window: int = 0
180        self._new_global_cursor: Optional[StreamState] = None
181        self._number_of_partitions: int = 0
182        self._use_global_cursor: bool = use_global_cursor
183        self._partition_serializer = PerPartitionKeySerializer()
184
185        # Track the last time a state message was emitted
186        self._last_emission_time: float = 0.0
187        self._timer = Timer()
188
189        self._set_initial_state(stream_state)
190
191        # FIXME this is a temporary field the time of the migration from declarative cursors to concurrent ones
192        self._attempt_to_create_cursor_if_not_provided = attempt_to_create_cursor_if_not_provided
193        self._synced_some_data = False
194        self._logged_regarding_datetime_format_error = False
DEFAULT_MAX_PARTITIONS_NUMBER = 25000
SWITCH_TO_GLOBAL_LIMIT = 10000
cursor_field: airbyte_cdk.CursorField
196    @property
197    def cursor_field(self) -> CursorField:
198        return self._cursor_field
state: MutableMapping[str, Any]
200    @property
201    def state(self) -> MutableMapping[str, Any]:
202        state: dict[str, Any] = {"use_global_cursor": self._use_global_cursor}
203        if not self._use_global_cursor:
204            states = []
205            for partition_tuple, cursor in self._cursor_per_partition.items():
206                if cursor.state:
207                    states.append(
208                        {
209                            "partition": self._to_dict(partition_tuple),
210                            "cursor": copy.deepcopy(cursor.state),
211                        }
212                    )
213            state[self._PERPARTITION_STATE_KEY] = states
214
215        if self._global_cursor:
216            state[self._GLOBAL_STATE_KEY] = self._global_cursor
217        if self._lookback_window is not None:
218            state["lookback_window"] = self._lookback_window
219        if self._parent_state is not None:
220            state["parent_state"] = self._parent_state
221        return state
def close_partition( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition) -> None:
223    def close_partition(self, partition: Partition) -> None:
224        # Attempt to retrieve the stream slice
225        stream_slice: Optional[StreamSlice] = partition.to_slice()  # type: ignore[assignment]
226
227        # Ensure stream_slice is not None
228        if stream_slice is None:
229            raise ValueError("stream_slice cannot be None")
230
231        partition_key = self._to_partition_key(stream_slice.partition)
232        with self._lock:
233            self._semaphore_per_partition[partition_key].acquire()
234            if not self._use_global_cursor:
235                cursor = self._cursor_per_partition[partition_key]
236                cursor.close_partition(partition=partition)
237                if (
238                    partition_key in self._partitions_done_generating_stream_slices
239                    and self._semaphore_per_partition[partition_key]._value == 0
240                ):
241                    self._update_global_cursor(cursor.state[self.cursor_field.cursor_field_key])
242
243            # Clean up the partition if it is fully processed
244            self._cleanup_if_done(partition_key)
245
246            self._check_and_update_parent_state()
247
248            self._emit_state_message()

Indicate to the cursor that the partition has been successfully processed

def ensure_at_least_one_state_emitted(self) -> None:
272    def ensure_at_least_one_state_emitted(self) -> None:
273        """
274        The platform expects at least one state message on successful syncs. Hence, whatever happens, we expect this method to be
275        called.
276        """
277        if not any(
278            semaphore_item[1]._value for semaphore_item in self._semaphore_per_partition.items()
279        ):
280            if self._synced_some_data:
281                # we only update those if we actually synced some data
282                self._global_cursor = self._new_global_cursor
283                self._lookback_window = self._timer.finish()
284            self._parent_state = self._partition_router.get_stream_state()
285        self._emit_state_message(throttle=False)

The platform expects at least one state message on successful syncs. Hence, whatever happens, we expect this method to be called.

def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
316    def stream_slices(self) -> Iterable[StreamSlice]:
317        if self._timer.is_running():
318            raise RuntimeError("stream_slices has been executed more than once.")
319
320        slices = self._partition_router.stream_slices()
321        self._timer.start()
322        for partition, last, parent_state in iterate_with_last_flag_and_state(
323            slices, self._partition_router.get_stream_state
324        ):
325            yield from self._generate_slices_from_partition(partition, parent_state)

Default placeholder implementation of generate_slices. Subclasses can override this method to provide actual behavior.

def observe(self, record: airbyte_cdk.Record) -> None:
512    def observe(self, record: Record) -> None:
513        if not record.associated_slice:
514            raise ValueError(
515                "Invalid state as stream slices that are emitted should refer to an existing cursor"
516            )
517
518        # if the current record has no cursor value, we cannot meaningfully update the state based on it, so there is nothing more to do
519        try:
520            record_cursor_value = self._cursor_field.extract_value(record)
521        except ValueError:
522            return
523
524        try:
525            record_cursor = self._connector_state_converter.output_format(
526                self._connector_state_converter.parse_value(record_cursor_value)
527            )
528        except ValueError as exception:
529            if not self._logged_regarding_datetime_format_error:
530                logger.warning(
531                    "Skipping cursor update for stream '%s': failed to parse cursor field '%s' value %r: %s",
532                    self._stream_name,
533                    self._cursor_field.cursor_field_key,
534                    record_cursor_value,
535                    exception,
536                )
537                self._logged_regarding_datetime_format_error = True
538            return
539
540        self._synced_some_data = True
541        self._update_global_cursor(record_cursor)
542        if not self._use_global_cursor:
543            self._cursor_per_partition[
544                self._to_partition_key(record.associated_slice.partition)
545            ].observe(record)

Indicate to the cursor that the record has been emitted

def should_be_synced(self, record: airbyte_cdk.Record) -> bool:
588    def should_be_synced(self, record: Record) -> bool:
589        return self._get_cursor(record).should_be_synced(record)
def limit_reached(self) -> bool:
619    def limit_reached(self) -> bool:
620        return self._number_of_partitions > self.SWITCH_TO_GLOBAL_LIMIT
@staticmethod
def get_parent_state( stream_state: Optional[Mapping[str, Any]], parent_stream_name: str) -> Optional[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]:
622    @staticmethod
623    def get_parent_state(
624        stream_state: Optional[StreamState], parent_stream_name: str
625    ) -> Optional[AirbyteStateMessage]:
626        if not stream_state:
627            return None
628
629        if "parent_state" not in stream_state:
630            logger.warning(
631                f"Trying to get_parent_state for stream `{parent_stream_name}` when there are not parent state in the state"
632            )
633            return None
634        elif parent_stream_name not in stream_state["parent_state"]:
635            logger.info(
636                f"Could not find parent state for stream `{parent_stream_name}`. On parents available are {list(stream_state['parent_state'].keys())}"
637            )
638            return None
639
640        return AirbyteStateMessage(
641            type=AirbyteStateType.STREAM,
642            stream=AirbyteStreamState(
643                stream_descriptor=StreamDescriptor(parent_stream_name, None),
644                stream_state=AirbyteStateBlob(stream_state["parent_state"][parent_stream_name]),
645            ),
646        )
@staticmethod
def get_global_state( stream_state: Optional[Mapping[str, Any]], parent_stream_name: str) -> Optional[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]:
648    @staticmethod
649    def get_global_state(
650        stream_state: Optional[StreamState], parent_stream_name: str
651    ) -> Optional[AirbyteStateMessage]:
652        return (
653            AirbyteStateMessage(
654                type=AirbyteStateType.STREAM,
655                stream=AirbyteStreamState(
656                    stream_descriptor=StreamDescriptor(parent_stream_name, None),
657                    stream_state=AirbyteStateBlob(stream_state["state"]),
658                ),
659            )
660            if stream_state and "state" in stream_state
661            else None
662        )
def get_cursor_datetime_from_state(self, stream_state: Mapping[str, Any]) -> datetime.datetime | None:
664    def get_cursor_datetime_from_state(
665        self, stream_state: Mapping[str, Any]
666    ) -> datetime.datetime | None:
667        """Extract and parse the cursor datetime from the global cursor in per-partition state.
668
669        For per-partition cursors, the global cursor is stored under the "state" key.
670        This method delegates to the underlying cursor factory to parse the datetime.
671
672        Returns None if the global cursor is not present or cannot be parsed.
673        """
674        global_state = stream_state.get(self._GLOBAL_STATE_KEY)
675        if not global_state or not isinstance(global_state, dict):
676            return None
677
678        # Create a cursor to delegate the parsing
679        cursor = self._cursor_factory.create(stream_state={}, runtime_lookback_window=None)
680        return cursor.get_cursor_datetime_from_state(global_state)

Extract and parse the cursor datetime from the global cursor in per-partition state.

For per-partition cursors, the global cursor is stored under the "state" key. This method delegates to the underlying cursor factory to parse the datetime.

Returns None if the global cursor is not present or cannot be parsed.