airbyte_cdk.sources.declarative.retrievers.pagination_tracker

 1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
 2
 3from typing import Optional
 4
 5from airbyte_cdk.models import FailureType
 6from airbyte_cdk.sources.declarative.types import Record, StreamSlice
 7from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor
 8from airbyte_cdk.utils.traced_exception import AirbyteTracedException
 9
10
11class PaginationTracker:
12    _record_count: int
13    _number_of_attempt_with_same_slice: int
14
15    def __init__(
16        self, cursor: Optional[ConcurrentCursor] = None, max_number_of_records: Optional[int] = None
17    ) -> None:
18        """
19        Ideally, we would have passed the `Cursor` interface here instead of `ConcurrentCursor` but not all
20        implementations of `Cursor` can support this use case. For example, if the `ConcurrentPerPartitionCursor`
21        switch to global state, we stop keeping track of the state per partition and therefore can't get an accurate
22        view for a specific stream_slice. In order to solve that, we decided to scope this feature to use only
23        ConcurrentCursor which is the only "leaf" cursor that actually emits stream slices with `cursor_partition`.
24        """
25        self._cursor = cursor
26        self._limit = max_number_of_records
27        self._reset()
28
29        """
30        Given we have a cursor, we do not allow for the same slice to be processed twice because we assume we will
31        always process the same slice.
32
33        Given no cursor, we assume that the pagination reset is for retrying purposes and we allow to retry once.
34        """
35        self._allowed_number_of_attempt_with_same_slice = 1 if self._cursor else 2
36        self._number_of_attempt_with_same_slice = 0
37
38    def observe(self, record: Record) -> None:
39        self._record_count += 1
40        if self._cursor:
41            self._cursor.observe(record)
42
43    def has_reached_limit(self) -> bool:
44        return self._limit is not None and self._record_count >= self._limit
45
46    def _reset(self) -> None:
47        self._record_count = 0
48
49    def reduce_slice_range_if_possible(
50        self, previous_stream_slice: StreamSlice, original_stream_slice: StreamSlice
51    ) -> StreamSlice:
52        """
53        :param previous_stream_slice: Stream slice that was just processed (It can be the same as original_stream_slice or already reduced)
54        :param original_stream_slice: The original stream slice before any reduction
55        :return: Reduced stream slice
56        """
57        new_slice = (
58            self._cursor.reduce_slice_range(original_stream_slice)
59            if self._cursor
60            else previous_stream_slice
61        )
62
63        if new_slice == previous_stream_slice:
64            self._number_of_attempt_with_same_slice += 1
65            if (
66                self._number_of_attempt_with_same_slice
67                >= self._allowed_number_of_attempt_with_same_slice
68            ):
69                raise AirbyteTracedException(
70                    internal_message=f"There were {self._number_of_attempt_with_same_slice} attempts with the same slice already while the max allowed is {self._allowed_number_of_attempt_with_same_slice}",
71                    failure_type=FailureType.system_error,
72                )
73        else:
74            self._number_of_attempt_with_same_slice = 0
75
76        self._reset()
77        return new_slice
class PaginationTracker:
12class PaginationTracker:
13    _record_count: int
14    _number_of_attempt_with_same_slice: int
15
16    def __init__(
17        self, cursor: Optional[ConcurrentCursor] = None, max_number_of_records: Optional[int] = None
18    ) -> None:
19        """
20        Ideally, we would have passed the `Cursor` interface here instead of `ConcurrentCursor` but not all
21        implementations of `Cursor` can support this use case. For example, if the `ConcurrentPerPartitionCursor`
22        switch to global state, we stop keeping track of the state per partition and therefore can't get an accurate
23        view for a specific stream_slice. In order to solve that, we decided to scope this feature to use only
24        ConcurrentCursor which is the only "leaf" cursor that actually emits stream slices with `cursor_partition`.
25        """
26        self._cursor = cursor
27        self._limit = max_number_of_records
28        self._reset()
29
30        """
31        Given we have a cursor, we do not allow for the same slice to be processed twice because we assume we will
32        always process the same slice.
33
34        Given no cursor, we assume that the pagination reset is for retrying purposes and we allow to retry once.
35        """
36        self._allowed_number_of_attempt_with_same_slice = 1 if self._cursor else 2
37        self._number_of_attempt_with_same_slice = 0
38
39    def observe(self, record: Record) -> None:
40        self._record_count += 1
41        if self._cursor:
42            self._cursor.observe(record)
43
44    def has_reached_limit(self) -> bool:
45        return self._limit is not None and self._record_count >= self._limit
46
47    def _reset(self) -> None:
48        self._record_count = 0
49
50    def reduce_slice_range_if_possible(
51        self, previous_stream_slice: StreamSlice, original_stream_slice: StreamSlice
52    ) -> StreamSlice:
53        """
54        :param previous_stream_slice: Stream slice that was just processed (It can be the same as original_stream_slice or already reduced)
55        :param original_stream_slice: The original stream slice before any reduction
56        :return: Reduced stream slice
57        """
58        new_slice = (
59            self._cursor.reduce_slice_range(original_stream_slice)
60            if self._cursor
61            else previous_stream_slice
62        )
63
64        if new_slice == previous_stream_slice:
65            self._number_of_attempt_with_same_slice += 1
66            if (
67                self._number_of_attempt_with_same_slice
68                >= self._allowed_number_of_attempt_with_same_slice
69            ):
70                raise AirbyteTracedException(
71                    internal_message=f"There were {self._number_of_attempt_with_same_slice} attempts with the same slice already while the max allowed is {self._allowed_number_of_attempt_with_same_slice}",
72                    failure_type=FailureType.system_error,
73                )
74        else:
75            self._number_of_attempt_with_same_slice = 0
76
77        self._reset()
78        return new_slice
PaginationTracker( cursor: Optional[airbyte_cdk.ConcurrentCursor] = None, max_number_of_records: Optional[int] = None)
16    def __init__(
17        self, cursor: Optional[ConcurrentCursor] = None, max_number_of_records: Optional[int] = None
18    ) -> None:
19        """
20        Ideally, we would have passed the `Cursor` interface here instead of `ConcurrentCursor` but not all
21        implementations of `Cursor` can support this use case. For example, if the `ConcurrentPerPartitionCursor`
22        switch to global state, we stop keeping track of the state per partition and therefore can't get an accurate
23        view for a specific stream_slice. In order to solve that, we decided to scope this feature to use only
24        ConcurrentCursor which is the only "leaf" cursor that actually emits stream slices with `cursor_partition`.
25        """
26        self._cursor = cursor
27        self._limit = max_number_of_records
28        self._reset()
29
30        """
31        Given we have a cursor, we do not allow for the same slice to be processed twice because we assume we will
32        always process the same slice.
33
34        Given no cursor, we assume that the pagination reset is for retrying purposes and we allow to retry once.
35        """
36        self._allowed_number_of_attempt_with_same_slice = 1 if self._cursor else 2
37        self._number_of_attempt_with_same_slice = 0

Ideally, we would have passed the Cursor interface here instead of ConcurrentCursor but not all implementations of Cursor can support this use case. For example, if the ConcurrentPerPartitionCursor switch to global state, we stop keeping track of the state per partition and therefore can't get an accurate view for a specific stream_slice. In order to solve that, we decided to scope this feature to use only ConcurrentCursor which is the only "leaf" cursor that actually emits stream slices with cursor_partition.

def observe(self, record: airbyte_cdk.Record) -> None:
39    def observe(self, record: Record) -> None:
40        self._record_count += 1
41        if self._cursor:
42            self._cursor.observe(record)
def has_reached_limit(self) -> bool:
44    def has_reached_limit(self) -> bool:
45        return self._limit is not None and self._record_count >= self._limit
def reduce_slice_range_if_possible( self, previous_stream_slice: airbyte_cdk.StreamSlice, original_stream_slice: airbyte_cdk.StreamSlice) -> airbyte_cdk.StreamSlice:
50    def reduce_slice_range_if_possible(
51        self, previous_stream_slice: StreamSlice, original_stream_slice: StreamSlice
52    ) -> StreamSlice:
53        """
54        :param previous_stream_slice: Stream slice that was just processed (It can be the same as original_stream_slice or already reduced)
55        :param original_stream_slice: The original stream slice before any reduction
56        :return: Reduced stream slice
57        """
58        new_slice = (
59            self._cursor.reduce_slice_range(original_stream_slice)
60            if self._cursor
61            else previous_stream_slice
62        )
63
64        if new_slice == previous_stream_slice:
65            self._number_of_attempt_with_same_slice += 1
66            if (
67                self._number_of_attempt_with_same_slice
68                >= self._allowed_number_of_attempt_with_same_slice
69            ):
70                raise AirbyteTracedException(
71                    internal_message=f"There were {self._number_of_attempt_with_same_slice} attempts with the same slice already while the max allowed is {self._allowed_number_of_attempt_with_same_slice}",
72                    failure_type=FailureType.system_error,
73                )
74        else:
75            self._number_of_attempt_with_same_slice = 0
76
77        self._reset()
78        return new_slice
Parameters
  • previous_stream_slice: Stream slice that was just processed (It can be the same as original_stream_slice or already reduced)
  • original_stream_slice: The original stream slice before any reduction
Returns

Reduced stream slice