airbyte_cdk.sources.declarative.incremental

 1#
 2# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
 3#
 4
 5from airbyte_cdk.sources.declarative.incremental.concurrent_partition_cursor import (
 6    ConcurrentCursorFactory,
 7    ConcurrentPerPartitionCursor,
 8)
 9
10__all__ = [
11    "ConcurrentCursorFactory",
12    "ConcurrentPerPartitionCursor",
13]
class ConcurrentCursorFactory:
 95class ConcurrentCursorFactory:
 96    def __init__(self, create_function: Callable[..., ConcurrentCursor]):
 97        self._create_function = create_function
 98
 99    def create(
100        self, stream_state: Mapping[str, Any], runtime_lookback_window: Optional[timedelta]
101    ) -> ConcurrentCursor:
102        return self._create_function(
103            stream_state=stream_state, runtime_lookback_window=runtime_lookback_window
104        )
ConcurrentCursorFactory( create_function: Callable[..., airbyte_cdk.ConcurrentCursor])
96    def __init__(self, create_function: Callable[..., ConcurrentCursor]):
97        self._create_function = create_function
def create( self, stream_state: Mapping[str, Any], runtime_lookback_window: Optional[datetime.timedelta]) -> airbyte_cdk.ConcurrentCursor:
 99    def create(
100        self, stream_state: Mapping[str, Any], runtime_lookback_window: Optional[timedelta]
101    ) -> ConcurrentCursor:
102        return self._create_function(
103            stream_state=stream_state, runtime_lookback_window=runtime_lookback_window
104        )
class ConcurrentPerPartitionCursor(airbyte_cdk.sources.streams.concurrent.cursor.Cursor):
107class ConcurrentPerPartitionCursor(Cursor):
108    """
109    Manages state per partition when a stream has many partitions, preventing data loss or duplication.
110
111    Attributes:
112        DEFAULT_MAX_PARTITIONS_NUMBER (int): Maximum number of partitions to retain in memory (default is 10,000). This limit needs to be higher than the number of threads we might enqueue (which is represented by ThreadPoolManager.DEFAULT_MAX_QUEUE_SIZE). If not, we could have partitions that have been generated and submitted to the ThreadPool but got deleted from the ConcurrentPerPartitionCursor and when closing them, it will generate KeyError.
113
114    - **Partition Limitation Logic**
115      Ensures the number of tracked partitions does not exceed the specified limit to prevent memory overuse. Oldest partitions are removed when the limit is reached.
116
117    - **Global Cursor Fallback**
118      New partitions use global state as the initial state to progress the state for deleted or new partitions. The history data added after the initial sync will be missing.
119
120    CurrentPerPartitionCursor expects the state of the ConcurrentCursor to follow the format {cursor_field: cursor_value}.
121    """
122
123    DEFAULT_MAX_PARTITIONS_NUMBER = 25_000
124    SWITCH_TO_GLOBAL_LIMIT = 10_000
125    _NO_STATE: Mapping[str, Any] = {}
126    _NO_CURSOR_STATE: Mapping[str, Any] = {}
127    _GLOBAL_STATE_KEY = "state"
128    _PERPARTITION_STATE_KEY = "states"
129    _IS_PARTITION_DUPLICATION_LOGGED = False
130    _PARENT_STATE = 0
131    _GENERATION_SEQUENCE = 1
132
133    def __init__(
134        self,
135        cursor_factory: ConcurrentCursorFactory,
136        partition_router: PartitionRouter,
137        stream_name: str,
138        stream_namespace: Optional[str],
139        stream_state: Any,
140        message_repository: MessageRepository,
141        connector_state_manager: ConnectorStateManager,
142        connector_state_converter: AbstractStreamStateConverter,
143        cursor_field: CursorField,
144        use_global_cursor: bool = False,
145        attempt_to_create_cursor_if_not_provided: bool = False,
146    ) -> None:
147        self._global_cursor: Optional[StreamState] = {}
148        self._stream_name = stream_name
149        self._stream_namespace = stream_namespace
150        self._message_repository = message_repository
151        self._connector_state_manager = connector_state_manager
152        self._connector_state_converter = connector_state_converter
153        self._cursor_field = cursor_field
154
155        self._cursor_factory = cursor_factory
156        self._partition_router = partition_router
157
158        # The dict is ordered to ensure that once the maximum number of partitions is reached,
159        # the oldest partitions can be efficiently removed, maintaining the most recent partitions.
160        self._cursor_per_partition: OrderedDict[str, ConcurrentCursor] = OrderedDict()
161        self._semaphore_per_partition: OrderedDict[str, threading.Semaphore] = OrderedDict()
162
163        # Parent-state tracking: store each partition’s parent state in creation order
164        self._partition_parent_state_map: OrderedDict[str, tuple[Mapping[str, Any], int]] = (
165            OrderedDict()
166        )
167        self._parent_state: Optional[StreamState] = None
168
169        # Tracks when the last slice for partition is emitted
170        self._partitions_done_generating_stream_slices: set[str] = set()
171        # Used to track the index of partitions that are not closed yet
172        self._processing_partitions_indexes: List[int] = list()
173        self._generated_partitions_count: int = 0
174        # Dictionary to map partition keys to their index
175        self._partition_key_to_index: dict[str, int] = {}
176
177        self._lock = threading.Lock()
178        self._lookback_window: int = 0
179        self._new_global_cursor: Optional[StreamState] = None
180        self._number_of_partitions: int = 0
181        self._use_global_cursor: bool = use_global_cursor
182        self._partition_serializer = PerPartitionKeySerializer()
183
184        # Track the last time a state message was emitted
185        self._last_emission_time: float = 0.0
186        self._timer = Timer()
187
188        self._set_initial_state(stream_state)
189
190        # FIXME this is a temporary field the time of the migration from declarative cursors to concurrent ones
191        self._attempt_to_create_cursor_if_not_provided = attempt_to_create_cursor_if_not_provided
192        self._synced_some_data = False
193        self._logged_regarding_datetime_format_error = False
194
195    @property
196    def cursor_field(self) -> CursorField:
197        return self._cursor_field
198
199    @property
200    def state(self) -> MutableMapping[str, Any]:
201        state: dict[str, Any] = {"use_global_cursor": self._use_global_cursor}
202        if not self._use_global_cursor:
203            states = []
204            for partition_tuple, cursor in self._cursor_per_partition.items():
205                if cursor.state:
206                    states.append(
207                        {
208                            "partition": self._to_dict(partition_tuple),
209                            "cursor": copy.deepcopy(cursor.state),
210                        }
211                    )
212            state[self._PERPARTITION_STATE_KEY] = states
213
214        if self._global_cursor:
215            state[self._GLOBAL_STATE_KEY] = self._global_cursor
216        if self._lookback_window is not None:
217            state["lookback_window"] = self._lookback_window
218        if self._parent_state is not None:
219            state["parent_state"] = self._parent_state
220        return state
221
222    def close_partition(self, partition: Partition) -> None:
223        # Attempt to retrieve the stream slice
224        stream_slice: Optional[StreamSlice] = partition.to_slice()  # type: ignore[assignment]
225
226        # Ensure stream_slice is not None
227        if stream_slice is None:
228            raise ValueError("stream_slice cannot be None")
229
230        partition_key = self._to_partition_key(stream_slice.partition)
231        with self._lock:
232            self._semaphore_per_partition[partition_key].acquire()
233            if not self._use_global_cursor:
234                cursor = self._cursor_per_partition[partition_key]
235                cursor.close_partition(partition=partition)
236                if (
237                    partition_key in self._partitions_done_generating_stream_slices
238                    and self._semaphore_per_partition[partition_key]._value == 0
239                ):
240                    self._update_global_cursor(cursor.state[self.cursor_field.cursor_field_key])
241
242            # Clean up the partition if it is fully processed
243            self._cleanup_if_done(partition_key)
244
245            self._check_and_update_parent_state()
246
247            self._emit_state_message()
248
249    def _check_and_update_parent_state(self) -> None:
250        last_closed_state = None
251
252        while self._partition_parent_state_map:
253            earliest_key, (candidate_state, candidate_seq) = next(
254                iter(self._partition_parent_state_map.items())
255            )
256
257            # if any partition that started <= candidate_seq is still open, we must wait
258            if (
259                self._processing_partitions_indexes
260                and self._processing_partitions_indexes[0] <= candidate_seq
261            ):
262                break
263
264            # safe to pop
265            self._partition_parent_state_map.popitem(last=False)
266            last_closed_state = candidate_state
267
268        if last_closed_state is not None:
269            self._parent_state = last_closed_state
270
271    def ensure_at_least_one_state_emitted(self) -> None:
272        """
273        The platform expects at least one state message on successful syncs. Hence, whatever happens, we expect this method to be
274        called.
275        """
276        if not any(
277            semaphore_item[1]._value for semaphore_item in self._semaphore_per_partition.items()
278        ):
279            if self._synced_some_data:
280                # we only update those if we actually synced some data
281                self._global_cursor = self._new_global_cursor
282                self._lookback_window = self._timer.finish()
283            self._parent_state = self._partition_router.get_stream_state()
284        self._emit_state_message(throttle=False)
285
286    def _throttle_state_message(self) -> Optional[float]:
287        """
288        Throttles the state message emission to once every 600 seconds.
289        """
290        current_time = time.time()
291        if current_time - self._last_emission_time <= 600:
292            return None
293        return current_time
294
295    def _emit_state_message(self, throttle: bool = True) -> None:
296        if throttle:
297            current_time = self._throttle_state_message()
298            if current_time is None:
299                return
300            self._last_emission_time = current_time
301            # Skip state emit for global cursor if parent state is empty
302            if self._use_global_cursor and not self._parent_state:
303                return
304
305        self._connector_state_manager.update_state_for_stream(
306            self._stream_name,
307            self._stream_namespace,
308            self.state,
309        )
310        state_message = self._connector_state_manager.create_state_message(
311            self._stream_name, self._stream_namespace
312        )
313        self._message_repository.emit_message(state_message)
314
315    def stream_slices(self) -> Iterable[StreamSlice]:
316        if self._timer.is_running():
317            raise RuntimeError("stream_slices has been executed more than once.")
318
319        slices = self._partition_router.stream_slices()
320        self._timer.start()
321        for partition, last, parent_state in iterate_with_last_flag_and_state(
322            slices, self._partition_router.get_stream_state
323        ):
324            yield from self._generate_slices_from_partition(partition, parent_state)
325
326    def _generate_slices_from_partition(
327        self, partition: StreamSlice, parent_state: Mapping[str, Any]
328    ) -> Iterable[StreamSlice]:
329        # Ensure the maximum number of partitions is not exceeded
330        self._ensure_partition_limit()
331
332        partition_key = self._to_partition_key(partition.partition)
333
334        cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition))
335        if not cursor:
336            cursor = self._create_cursor(
337                self._global_cursor,
338                self._lookback_window if self._global_cursor else 0,
339            )
340            with self._lock:
341                self._number_of_partitions += 1
342                self._cursor_per_partition[partition_key] = cursor
343
344        if partition_key in self._semaphore_per_partition:
345            if not self._IS_PARTITION_DUPLICATION_LOGGED:
346                logger.warning(f"Partition duplication detected for stream {self._stream_name}")
347                self._IS_PARTITION_DUPLICATION_LOGGED = True
348            return
349        else:
350            self._semaphore_per_partition[partition_key] = threading.Semaphore(0)
351
352        with self._lock:
353            seq = self._generated_partitions_count
354            self._generated_partitions_count += 1
355            self._processing_partitions_indexes.append(seq)
356            self._partition_key_to_index[partition_key] = seq
357
358            if (
359                len(self._partition_parent_state_map) == 0
360                or self._partition_parent_state_map[
361                    next(reversed(self._partition_parent_state_map))
362                ][self._PARENT_STATE]
363                != parent_state
364            ):
365                self._partition_parent_state_map[partition_key] = (deepcopy(parent_state), seq)
366
367        for cursor_slice, is_last_slice, _ in iterate_with_last_flag_and_state(
368            cursor.stream_slices(),
369            lambda: None,
370        ):
371            self._semaphore_per_partition[partition_key].release()
372            if is_last_slice:
373                self._partitions_done_generating_stream_slices.add(partition_key)
374            yield StreamSlice(
375                partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields
376            )
377
378    def _ensure_partition_limit(self) -> None:
379        """
380        Ensure the maximum number of partitions does not exceed the predefined limit.
381
382        Steps:
383        1. Attempt to remove partitions that are marked as finished in `_finished_partitions`.
384           These partitions are considered processed and safe to delete.
385        2. If the limit is still exceeded and no finished partitions are available for removal,
386           remove the oldest partition unconditionally. We expect failed partitions to be removed.
387
388        Logging:
389        - Logs a warning each time a partition is removed, indicating whether it was finished
390          or removed due to being the oldest.
391        """
392        if not self._use_global_cursor and self.limit_reached():
393            logger.info(
394                f"Exceeded the 'SWITCH_TO_GLOBAL_LIMIT' of {self.SWITCH_TO_GLOBAL_LIMIT}. "
395                f"Switching to global cursor for {self._stream_name}."
396            )
397            self._use_global_cursor = True
398
399        with self._lock:
400            while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1:
401                # Try removing finished partitions first
402                for partition_key in list(self._cursor_per_partition.keys()):
403                    if partition_key not in self._partition_key_to_index:
404                        oldest_partition = self._cursor_per_partition.pop(
405                            partition_key
406                        )  # Remove the oldest partition
407                        logger.debug(
408                            f"The maximum number of partitions has been reached. Dropping the oldest finished partition: {oldest_partition}. Over limit: {self._number_of_partitions - self.DEFAULT_MAX_PARTITIONS_NUMBER}."
409                        )
410                        break
411                else:
412                    # If no finished partitions can be removed, fall back to removing the oldest partition
413                    oldest_partition = self._cursor_per_partition.popitem(last=False)[
414                        1
415                    ]  # Remove the oldest partition
416                    logger.warning(
417                        f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._number_of_partitions - self.DEFAULT_MAX_PARTITIONS_NUMBER}."
418                    )
419
420    def _set_initial_state(self, stream_state: StreamState) -> None:
421        """
422        Initialize the cursor's state using the provided `stream_state`.
423
424        This method supports global and per-partition state initialization.
425
426        - **Global State**: If `states` is missing, the `state` is treated as global and applied to all partitions.
427          The `global state` holds a single cursor position representing the latest processed record across all partitions.
428
429        - **Lookback Window**: Configured via `lookback_window`, it defines the period (in seconds) for reprocessing records.
430          This ensures robustness in case of upstream data delays or reordering. If not specified, it defaults to 0.
431
432        - **Per-Partition State**: If `states` is present, each partition's cursor state is initialized separately.
433
434        - **Parent State**: (if available) Used to initialize partition routers based on parent streams.
435
436        Args:
437            stream_state (StreamState): The state of the streams to be set. The format of the stream state should be:
438                {
439                    "states": [
440                        {
441                            "partition": {
442                                "partition_key": "value"
443                            },
444                            "cursor": {
445                                "last_updated": "2023-05-27T00:00:00Z"
446                            }
447                        }
448                    ],
449                    "state": {
450                        "last_updated": "2023-05-27T00:00:00Z"
451                    },
452                    lookback_window: 10,
453                    "parent_state": {
454                        "parent_stream_name": {
455                            "last_updated": "2023-05-27T00:00:00Z"
456                        }
457                    }
458                }
459        """
460        if not stream_state:
461            return
462
463        if (
464            self._PERPARTITION_STATE_KEY not in stream_state
465            and self._GLOBAL_STATE_KEY not in stream_state
466        ):
467            # We assume that `stream_state` is in a global format that can be applied to all partitions.
468            # Example: {"global_state_format_key": "global_state_format_value"}
469            self._set_global_state(stream_state)
470
471        else:
472            self._use_global_cursor = stream_state.get("use_global_cursor", False)
473
474            self._lookback_window = int(stream_state.get("lookback_window", 0))
475
476            for state in stream_state.get(self._PERPARTITION_STATE_KEY, []):
477                self._number_of_partitions += 1
478                self._cursor_per_partition[self._to_partition_key(state["partition"])] = (
479                    self._create_cursor(state["cursor"])
480                )
481
482            # set default state for missing partitions if it is per partition with fallback to global
483            if self._GLOBAL_STATE_KEY in stream_state:
484                self._set_global_state(stream_state[self._GLOBAL_STATE_KEY])
485
486        # Set initial parent state
487        if stream_state.get("parent_state"):
488            self._parent_state = stream_state["parent_state"]
489
490    def _set_global_state(self, stream_state: Mapping[str, Any]) -> None:
491        """
492        Initializes the global cursor state from the provided stream state.
493
494        If the cursor field key is present in the stream state, its value is parsed,
495        formatted, and stored as the global cursor. This ensures consistency in state
496        representation across partitions.
497        """
498        if self.cursor_field.cursor_field_key in stream_state:
499            global_state_value = stream_state[self.cursor_field.cursor_field_key]
500            final_format_global_state_value = self._connector_state_converter.output_format(
501                self._connector_state_converter.parse_value(global_state_value)
502            )
503
504            fixed_global_state = {
505                self.cursor_field.cursor_field_key: final_format_global_state_value
506            }
507
508            self._global_cursor = deepcopy(fixed_global_state)
509            self._new_global_cursor = deepcopy(fixed_global_state)
510
511    def observe(self, record: Record) -> None:
512        if not record.associated_slice:
513            raise ValueError(
514                "Invalid state as stream slices that are emitted should refer to an existing cursor"
515            )
516
517        # if the current record has no cursor value, we cannot meaningfully update the state based on it, so there is nothing more to do
518        try:
519            record_cursor_value = self._cursor_field.extract_value(record)
520        except ValueError:
521            return
522
523        try:
524            record_cursor = self._connector_state_converter.output_format(
525                self._connector_state_converter.parse_value(record_cursor_value)
526            )
527        except ValueError as exception:
528            if not self._logged_regarding_datetime_format_error:
529                logger.warning(
530                    "Skipping cursor update for stream '%s': failed to parse cursor field '%s' value %r: %s",
531                    self._stream_name,
532                    self._cursor_field.cursor_field_key,
533                    record_cursor_value,
534                    exception,
535                )
536                self._logged_regarding_datetime_format_error = True
537            return
538
539        self._synced_some_data = True
540        self._update_global_cursor(record_cursor)
541        if not self._use_global_cursor:
542            self._cursor_per_partition[
543                self._to_partition_key(record.associated_slice.partition)
544            ].observe(record)
545
546    def _update_global_cursor(self, value: Any) -> None:
547        if (
548            self._new_global_cursor is None
549            or self._new_global_cursor[self.cursor_field.cursor_field_key] < value
550        ):
551            self._new_global_cursor = {self.cursor_field.cursor_field_key: copy.deepcopy(value)}
552
553    def _cleanup_if_done(self, partition_key: str) -> None:
554        """
555        Free every in-memory structure that belonged to a completed partition:
556        cursor, semaphore, flag inside `_finished_partitions`
557        """
558        if not (
559            partition_key in self._partitions_done_generating_stream_slices
560            and self._semaphore_per_partition[partition_key]._value == 0
561        ):
562            return
563
564        self._semaphore_per_partition.pop(partition_key, None)
565        self._partitions_done_generating_stream_slices.discard(partition_key)
566
567        seq = self._partition_key_to_index.pop(partition_key)
568        self._processing_partitions_indexes.remove(seq)
569
570        logger.debug(f"Partition {partition_key} fully processed and cleaned up.")
571
572    def _to_partition_key(self, partition: Mapping[str, Any]) -> str:
573        return self._partition_serializer.to_partition_key(partition)
574
575    def _to_dict(self, partition_key: str) -> Mapping[str, Any]:
576        return self._partition_serializer.to_partition(partition_key)
577
578    def _create_cursor(
579        self, cursor_state: Any, runtime_lookback_window: int = 0
580    ) -> ConcurrentCursor:
581        cursor = self._cursor_factory.create(
582            stream_state=deepcopy(cursor_state),
583            runtime_lookback_window=timedelta(seconds=runtime_lookback_window),
584        )
585        return cursor
586
587    def should_be_synced(self, record: Record) -> bool:
588        return self._get_cursor(record).should_be_synced(record)
589
590    def _get_cursor(self, record: Record) -> ConcurrentCursor:
591        if not record.associated_slice:
592            raise ValueError(
593                "Invalid state as stream slices that are emitted should refer to an existing cursor"
594            )
595
596        if self._use_global_cursor:
597            return self._create_cursor(
598                self._global_cursor,
599                self._lookback_window if self._global_cursor else 0,
600            )
601
602        partition_key = self._to_partition_key(record.associated_slice.partition)
603        if (
604            partition_key not in self._cursor_per_partition
605            and not self._attempt_to_create_cursor_if_not_provided
606        ):
607            raise ValueError(
608                "Invalid state as stream slices that are emitted should refer to an existing cursor"
609            )
610        elif partition_key not in self._cursor_per_partition:
611            return self._create_cursor(
612                self._global_cursor,
613                self._lookback_window if self._global_cursor else 0,
614            )
615        else:
616            return self._cursor_per_partition[partition_key]
617
618    def limit_reached(self) -> bool:
619        return self._number_of_partitions > self.SWITCH_TO_GLOBAL_LIMIT
620
621    @staticmethod
622    def get_parent_state(
623        stream_state: Optional[StreamState], parent_stream_name: str
624    ) -> Optional[AirbyteStateMessage]:
625        if not stream_state:
626            return None
627
628        if "parent_state" not in stream_state:
629            logger.warning(
630                f"Trying to get_parent_state for stream `{parent_stream_name}` when there are not parent state in the state"
631            )
632            return None
633        elif parent_stream_name not in stream_state["parent_state"]:
634            logger.info(
635                f"Could not find parent state for stream `{parent_stream_name}`. On parents available are {list(stream_state['parent_state'].keys())}"
636            )
637            return None
638
639        return AirbyteStateMessage(
640            type=AirbyteStateType.STREAM,
641            stream=AirbyteStreamState(
642                stream_descriptor=StreamDescriptor(parent_stream_name, None),
643                stream_state=AirbyteStateBlob(stream_state["parent_state"][parent_stream_name]),
644            ),
645        )
646
647    @staticmethod
648    def get_global_state(
649        stream_state: Optional[StreamState], parent_stream_name: str
650    ) -> Optional[AirbyteStateMessage]:
651        return (
652            AirbyteStateMessage(
653                type=AirbyteStateType.STREAM,
654                stream=AirbyteStreamState(
655                    stream_descriptor=StreamDescriptor(parent_stream_name, None),
656                    stream_state=AirbyteStateBlob(stream_state["state"]),
657                ),
658            )
659            if stream_state and "state" in stream_state
660            else None
661        )

Manages state per partition when a stream has many partitions, preventing data loss or duplication.

Attributes:
  • DEFAULT_MAX_PARTITIONS_NUMBER (int): Maximum number of partitions to retain in memory (default is 10,000). This limit needs to be higher than the number of threads we might enqueue (which is represented by ThreadPoolManager.DEFAULT_MAX_QUEUE_SIZE). If not, we could have partitions that have been generated and submitted to the ThreadPool but got deleted from the ConcurrentPerPartitionCursor and when closing them, it will generate KeyError.

    • Partition Limitation Logic Ensures the number of tracked partitions does not exceed the specified limit to prevent memory overuse. Oldest partitions are removed when the limit is reached.

    • Global Cursor Fallback New partitions use global state as the initial state to progress the state for deleted or new partitions. The history data added after the initial sync will be missing.

CurrentPerPartitionCursor expects the state of the ConcurrentCursor to follow the format {cursor_field: cursor_value}.

ConcurrentPerPartitionCursor( cursor_factory: ConcurrentCursorFactory, partition_router: airbyte_cdk.sources.declarative.partition_routers.PartitionRouter, stream_name: str, stream_namespace: Optional[str], stream_state: Any, message_repository: airbyte_cdk.MessageRepository, connector_state_manager: airbyte_cdk.ConnectorStateManager, connector_state_converter: airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter.AbstractStreamStateConverter, cursor_field: airbyte_cdk.CursorField, use_global_cursor: bool = False, attempt_to_create_cursor_if_not_provided: bool = False)
133    def __init__(
134        self,
135        cursor_factory: ConcurrentCursorFactory,
136        partition_router: PartitionRouter,
137        stream_name: str,
138        stream_namespace: Optional[str],
139        stream_state: Any,
140        message_repository: MessageRepository,
141        connector_state_manager: ConnectorStateManager,
142        connector_state_converter: AbstractStreamStateConverter,
143        cursor_field: CursorField,
144        use_global_cursor: bool = False,
145        attempt_to_create_cursor_if_not_provided: bool = False,
146    ) -> None:
147        self._global_cursor: Optional[StreamState] = {}
148        self._stream_name = stream_name
149        self._stream_namespace = stream_namespace
150        self._message_repository = message_repository
151        self._connector_state_manager = connector_state_manager
152        self._connector_state_converter = connector_state_converter
153        self._cursor_field = cursor_field
154
155        self._cursor_factory = cursor_factory
156        self._partition_router = partition_router
157
158        # The dict is ordered to ensure that once the maximum number of partitions is reached,
159        # the oldest partitions can be efficiently removed, maintaining the most recent partitions.
160        self._cursor_per_partition: OrderedDict[str, ConcurrentCursor] = OrderedDict()
161        self._semaphore_per_partition: OrderedDict[str, threading.Semaphore] = OrderedDict()
162
163        # Parent-state tracking: store each partition’s parent state in creation order
164        self._partition_parent_state_map: OrderedDict[str, tuple[Mapping[str, Any], int]] = (
165            OrderedDict()
166        )
167        self._parent_state: Optional[StreamState] = None
168
169        # Tracks when the last slice for partition is emitted
170        self._partitions_done_generating_stream_slices: set[str] = set()
171        # Used to track the index of partitions that are not closed yet
172        self._processing_partitions_indexes: List[int] = list()
173        self._generated_partitions_count: int = 0
174        # Dictionary to map partition keys to their index
175        self._partition_key_to_index: dict[str, int] = {}
176
177        self._lock = threading.Lock()
178        self._lookback_window: int = 0
179        self._new_global_cursor: Optional[StreamState] = None
180        self._number_of_partitions: int = 0
181        self._use_global_cursor: bool = use_global_cursor
182        self._partition_serializer = PerPartitionKeySerializer()
183
184        # Track the last time a state message was emitted
185        self._last_emission_time: float = 0.0
186        self._timer = Timer()
187
188        self._set_initial_state(stream_state)
189
190        # FIXME this is a temporary field the time of the migration from declarative cursors to concurrent ones
191        self._attempt_to_create_cursor_if_not_provided = attempt_to_create_cursor_if_not_provided
192        self._synced_some_data = False
193        self._logged_regarding_datetime_format_error = False
DEFAULT_MAX_PARTITIONS_NUMBER = 25000
SWITCH_TO_GLOBAL_LIMIT = 10000
cursor_field: airbyte_cdk.CursorField
195    @property
196    def cursor_field(self) -> CursorField:
197        return self._cursor_field
state: MutableMapping[str, Any]
199    @property
200    def state(self) -> MutableMapping[str, Any]:
201        state: dict[str, Any] = {"use_global_cursor": self._use_global_cursor}
202        if not self._use_global_cursor:
203            states = []
204            for partition_tuple, cursor in self._cursor_per_partition.items():
205                if cursor.state:
206                    states.append(
207                        {
208                            "partition": self._to_dict(partition_tuple),
209                            "cursor": copy.deepcopy(cursor.state),
210                        }
211                    )
212            state[self._PERPARTITION_STATE_KEY] = states
213
214        if self._global_cursor:
215            state[self._GLOBAL_STATE_KEY] = self._global_cursor
216        if self._lookback_window is not None:
217            state["lookback_window"] = self._lookback_window
218        if self._parent_state is not None:
219            state["parent_state"] = self._parent_state
220        return state
def close_partition( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition) -> None:
222    def close_partition(self, partition: Partition) -> None:
223        # Attempt to retrieve the stream slice
224        stream_slice: Optional[StreamSlice] = partition.to_slice()  # type: ignore[assignment]
225
226        # Ensure stream_slice is not None
227        if stream_slice is None:
228            raise ValueError("stream_slice cannot be None")
229
230        partition_key = self._to_partition_key(stream_slice.partition)
231        with self._lock:
232            self._semaphore_per_partition[partition_key].acquire()
233            if not self._use_global_cursor:
234                cursor = self._cursor_per_partition[partition_key]
235                cursor.close_partition(partition=partition)
236                if (
237                    partition_key in self._partitions_done_generating_stream_slices
238                    and self._semaphore_per_partition[partition_key]._value == 0
239                ):
240                    self._update_global_cursor(cursor.state[self.cursor_field.cursor_field_key])
241
242            # Clean up the partition if it is fully processed
243            self._cleanup_if_done(partition_key)
244
245            self._check_and_update_parent_state()
246
247            self._emit_state_message()

Indicate to the cursor that the partition has been successfully processed

def ensure_at_least_one_state_emitted(self) -> None:
271    def ensure_at_least_one_state_emitted(self) -> None:
272        """
273        The platform expects at least one state message on successful syncs. Hence, whatever happens, we expect this method to be
274        called.
275        """
276        if not any(
277            semaphore_item[1]._value for semaphore_item in self._semaphore_per_partition.items()
278        ):
279            if self._synced_some_data:
280                # we only update those if we actually synced some data
281                self._global_cursor = self._new_global_cursor
282                self._lookback_window = self._timer.finish()
283            self._parent_state = self._partition_router.get_stream_state()
284        self._emit_state_message(throttle=False)

The platform expects at least one state message on successful syncs. Hence, whatever happens, we expect this method to be called.

def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
315    def stream_slices(self) -> Iterable[StreamSlice]:
316        if self._timer.is_running():
317            raise RuntimeError("stream_slices has been executed more than once.")
318
319        slices = self._partition_router.stream_slices()
320        self._timer.start()
321        for partition, last, parent_state in iterate_with_last_flag_and_state(
322            slices, self._partition_router.get_stream_state
323        ):
324            yield from self._generate_slices_from_partition(partition, parent_state)

Default placeholder implementation of generate_slices. Subclasses can override this method to provide actual behavior.

def observe(self, record: airbyte_cdk.Record) -> None:
511    def observe(self, record: Record) -> None:
512        if not record.associated_slice:
513            raise ValueError(
514                "Invalid state as stream slices that are emitted should refer to an existing cursor"
515            )
516
517        # if the current record has no cursor value, we cannot meaningfully update the state based on it, so there is nothing more to do
518        try:
519            record_cursor_value = self._cursor_field.extract_value(record)
520        except ValueError:
521            return
522
523        try:
524            record_cursor = self._connector_state_converter.output_format(
525                self._connector_state_converter.parse_value(record_cursor_value)
526            )
527        except ValueError as exception:
528            if not self._logged_regarding_datetime_format_error:
529                logger.warning(
530                    "Skipping cursor update for stream '%s': failed to parse cursor field '%s' value %r: %s",
531                    self._stream_name,
532                    self._cursor_field.cursor_field_key,
533                    record_cursor_value,
534                    exception,
535                )
536                self._logged_regarding_datetime_format_error = True
537            return
538
539        self._synced_some_data = True
540        self._update_global_cursor(record_cursor)
541        if not self._use_global_cursor:
542            self._cursor_per_partition[
543                self._to_partition_key(record.associated_slice.partition)
544            ].observe(record)

Indicate to the cursor that the record has been emitted

def should_be_synced(self, record: airbyte_cdk.Record) -> bool:
587    def should_be_synced(self, record: Record) -> bool:
588        return self._get_cursor(record).should_be_synced(record)
def limit_reached(self) -> bool:
618    def limit_reached(self) -> bool:
619        return self._number_of_partitions > self.SWITCH_TO_GLOBAL_LIMIT
@staticmethod
def get_parent_state( stream_state: Optional[Mapping[str, Any]], parent_stream_name: str) -> Optional[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]:
621    @staticmethod
622    def get_parent_state(
623        stream_state: Optional[StreamState], parent_stream_name: str
624    ) -> Optional[AirbyteStateMessage]:
625        if not stream_state:
626            return None
627
628        if "parent_state" not in stream_state:
629            logger.warning(
630                f"Trying to get_parent_state for stream `{parent_stream_name}` when there are not parent state in the state"
631            )
632            return None
633        elif parent_stream_name not in stream_state["parent_state"]:
634            logger.info(
635                f"Could not find parent state for stream `{parent_stream_name}`. On parents available are {list(stream_state['parent_state'].keys())}"
636            )
637            return None
638
639        return AirbyteStateMessage(
640            type=AirbyteStateType.STREAM,
641            stream=AirbyteStreamState(
642                stream_descriptor=StreamDescriptor(parent_stream_name, None),
643                stream_state=AirbyteStateBlob(stream_state["parent_state"][parent_stream_name]),
644            ),
645        )
@staticmethod
def get_global_state( stream_state: Optional[Mapping[str, Any]], parent_stream_name: str) -> Optional[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]:
647    @staticmethod
648    def get_global_state(
649        stream_state: Optional[StreamState], parent_stream_name: str
650    ) -> Optional[AirbyteStateMessage]:
651        return (
652            AirbyteStateMessage(
653                type=AirbyteStateType.STREAM,
654                stream=AirbyteStreamState(
655                    stream_descriptor=StreamDescriptor(parent_stream_name, None),
656                    stream_state=AirbyteStateBlob(stream_state["state"]),
657                ),
658            )
659            if stream_state and "state" in stream_state
660            else None
661        )