airbyte_cdk.sources.declarative.retrievers.pagination_tracker

 1from typing import Optional
 2
 3from airbyte_cdk.sources.declarative.models import FailureType
 4from airbyte_cdk.sources.declarative.types import Record, StreamSlice
 5from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor
 6from airbyte_cdk.utils.traced_exception import AirbyteTracedException
 7
 8
 9class PaginationTracker:
10    _record_count: int
11    _number_of_attempt_with_same_slice: int
12
13    def __init__(
14        self, cursor: Optional[ConcurrentCursor] = None, max_number_of_records: Optional[int] = None
15    ) -> None:
16        """
17        Ideally, we would have passed the `Cursor` interface here instead of `ConcurrentCursor` but not all
18        implementations of `Cursor` can support this use case. For example, if the `ConcurrentPerPartitionCursor`
19        switch to global state, we stop keeping track of the state per partition and therefore can't get an accurate
20        view for a specific stream_slice. In order to solve that, we decided to scope this feature to use only
21        ConcurrentCursor which is the only "leaf" cursor that actually emits stream slices with `cursor_partition`.
22        """
23        self._cursor = cursor
24        self._limit = max_number_of_records
25        self._reset()
26
27        """
28        Given we have a cursor, we do not allow for the same slice to be processed twice because we assume we will
29        always process the same slice.
30
31        Given no cursor, we assume that the pagination reset is for retrying purposes and we allow to retry once.
32        """
33        self._allowed_number_of_attempt_with_same_slice = 1 if self._cursor else 2
34        self._number_of_attempt_with_same_slice = 0
35
36    def observe(self, record: Record) -> None:
37        self._record_count += 1
38        if self._cursor:
39            self._cursor.observe(record)
40
41    def has_reached_limit(self) -> bool:
42        return self._limit is not None and self._record_count >= self._limit
43
44    def _reset(self) -> None:
45        self._record_count = 0
46
47    def reduce_slice_range_if_possible(self, stream_slice: StreamSlice) -> StreamSlice:
48        new_slice = self._cursor.reduce_slice_range(stream_slice) if self._cursor else stream_slice
49
50        if new_slice == stream_slice:
51            self._number_of_attempt_with_same_slice += 1
52            if (
53                self._number_of_attempt_with_same_slice
54                >= self._allowed_number_of_attempt_with_same_slice
55            ):
56                raise AirbyteTracedException(
57                    internal_message=f"There were {self._number_of_attempt_with_same_slice} attempts with the same slice already while the max allowed is {self._allowed_number_of_attempt_with_same_slice}",
58                    failure_type=FailureType.system_error,
59                )
60        else:
61            self._number_of_attempt_with_same_slice = 0
62
63        self._reset()
64        return new_slice
class PaginationTracker:
10class PaginationTracker:
11    _record_count: int
12    _number_of_attempt_with_same_slice: int
13
14    def __init__(
15        self, cursor: Optional[ConcurrentCursor] = None, max_number_of_records: Optional[int] = None
16    ) -> None:
17        """
18        Ideally, we would have passed the `Cursor` interface here instead of `ConcurrentCursor` but not all
19        implementations of `Cursor` can support this use case. For example, if the `ConcurrentPerPartitionCursor`
20        switch to global state, we stop keeping track of the state per partition and therefore can't get an accurate
21        view for a specific stream_slice. In order to solve that, we decided to scope this feature to use only
22        ConcurrentCursor which is the only "leaf" cursor that actually emits stream slices with `cursor_partition`.
23        """
24        self._cursor = cursor
25        self._limit = max_number_of_records
26        self._reset()
27
28        """
29        Given we have a cursor, we do not allow for the same slice to be processed twice because we assume we will
30        always process the same slice.
31
32        Given no cursor, we assume that the pagination reset is for retrying purposes and we allow to retry once.
33        """
34        self._allowed_number_of_attempt_with_same_slice = 1 if self._cursor else 2
35        self._number_of_attempt_with_same_slice = 0
36
37    def observe(self, record: Record) -> None:
38        self._record_count += 1
39        if self._cursor:
40            self._cursor.observe(record)
41
42    def has_reached_limit(self) -> bool:
43        return self._limit is not None and self._record_count >= self._limit
44
45    def _reset(self) -> None:
46        self._record_count = 0
47
48    def reduce_slice_range_if_possible(self, stream_slice: StreamSlice) -> StreamSlice:
49        new_slice = self._cursor.reduce_slice_range(stream_slice) if self._cursor else stream_slice
50
51        if new_slice == stream_slice:
52            self._number_of_attempt_with_same_slice += 1
53            if (
54                self._number_of_attempt_with_same_slice
55                >= self._allowed_number_of_attempt_with_same_slice
56            ):
57                raise AirbyteTracedException(
58                    internal_message=f"There were {self._number_of_attempt_with_same_slice} attempts with the same slice already while the max allowed is {self._allowed_number_of_attempt_with_same_slice}",
59                    failure_type=FailureType.system_error,
60                )
61        else:
62            self._number_of_attempt_with_same_slice = 0
63
64        self._reset()
65        return new_slice
PaginationTracker( cursor: Optional[airbyte_cdk.ConcurrentCursor] = None, max_number_of_records: Optional[int] = None)
14    def __init__(
15        self, cursor: Optional[ConcurrentCursor] = None, max_number_of_records: Optional[int] = None
16    ) -> None:
17        """
18        Ideally, we would have passed the `Cursor` interface here instead of `ConcurrentCursor` but not all
19        implementations of `Cursor` can support this use case. For example, if the `ConcurrentPerPartitionCursor`
20        switch to global state, we stop keeping track of the state per partition and therefore can't get an accurate
21        view for a specific stream_slice. In order to solve that, we decided to scope this feature to use only
22        ConcurrentCursor which is the only "leaf" cursor that actually emits stream slices with `cursor_partition`.
23        """
24        self._cursor = cursor
25        self._limit = max_number_of_records
26        self._reset()
27
28        """
29        Given we have a cursor, we do not allow for the same slice to be processed twice because we assume we will
30        always process the same slice.
31
32        Given no cursor, we assume that the pagination reset is for retrying purposes and we allow to retry once.
33        """
34        self._allowed_number_of_attempt_with_same_slice = 1 if self._cursor else 2
35        self._number_of_attempt_with_same_slice = 0

Ideally, we would have passed the Cursor interface here instead of ConcurrentCursor but not all implementations of Cursor can support this use case. For example, if the ConcurrentPerPartitionCursor switch to global state, we stop keeping track of the state per partition and therefore can't get an accurate view for a specific stream_slice. In order to solve that, we decided to scope this feature to use only ConcurrentCursor which is the only "leaf" cursor that actually emits stream slices with cursor_partition.

def observe(self, record: airbyte_cdk.Record) -> None:
37    def observe(self, record: Record) -> None:
38        self._record_count += 1
39        if self._cursor:
40            self._cursor.observe(record)
def has_reached_limit(self) -> bool:
42    def has_reached_limit(self) -> bool:
43        return self._limit is not None and self._record_count >= self._limit
def reduce_slice_range_if_possible( self, stream_slice: airbyte_cdk.StreamSlice) -> airbyte_cdk.StreamSlice:
48    def reduce_slice_range_if_possible(self, stream_slice: StreamSlice) -> StreamSlice:
49        new_slice = self._cursor.reduce_slice_range(stream_slice) if self._cursor else stream_slice
50
51        if new_slice == stream_slice:
52            self._number_of_attempt_with_same_slice += 1
53            if (
54                self._number_of_attempt_with_same_slice
55                >= self._allowed_number_of_attempt_with_same_slice
56            ):
57                raise AirbyteTracedException(
58                    internal_message=f"There were {self._number_of_attempt_with_same_slice} attempts with the same slice already while the max allowed is {self._allowed_number_of_attempt_with_same_slice}",
59                    failure_type=FailureType.system_error,
60                )
61        else:
62            self._number_of_attempt_with_same_slice = 0
63
64        self._reset()
65        return new_slice