airbyte_cdk.sources.declarative.retrievers.pagination_tracker
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2 3from typing import Optional 4 5from airbyte_cdk.models import FailureType 6from airbyte_cdk.sources.declarative.types import Record, StreamSlice 7from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor 8from airbyte_cdk.utils.traced_exception import AirbyteTracedException 9 10 11class PaginationTracker: 12 _record_count: int 13 _number_of_attempt_with_same_slice: int 14 15 def __init__( 16 self, cursor: Optional[ConcurrentCursor] = None, max_number_of_records: Optional[int] = None 17 ) -> None: 18 """ 19 Ideally, we would have passed the `Cursor` interface here instead of `ConcurrentCursor` but not all 20 implementations of `Cursor` can support this use case. For example, if the `ConcurrentPerPartitionCursor` 21 switch to global state, we stop keeping track of the state per partition and therefore can't get an accurate 22 view for a specific stream_slice. In order to solve that, we decided to scope this feature to use only 23 ConcurrentCursor which is the only "leaf" cursor that actually emits stream slices with `cursor_partition`. 24 """ 25 self._cursor = cursor 26 self._limit = max_number_of_records 27 self._reset() 28 29 """ 30 Given we have a cursor, we do not allow for the same slice to be processed twice because we assume we will 31 always process the same slice. 32 33 Given no cursor, we assume that the pagination reset is for retrying purposes and we allow to retry once. 34 """ 35 self._allowed_number_of_attempt_with_same_slice = 1 if self._cursor else 2 36 self._number_of_attempt_with_same_slice = 0 37 38 def observe(self, record: Record) -> None: 39 self._record_count += 1 40 if self._cursor: 41 self._cursor.observe(record) 42 43 def has_reached_limit(self) -> bool: 44 return self._limit is not None and self._record_count >= self._limit 45 46 def _reset(self) -> None: 47 self._record_count = 0 48 49 def reduce_slice_range_if_possible( 50 self, previous_stream_slice: StreamSlice, original_stream_slice: StreamSlice 51 ) -> StreamSlice: 52 """ 53 :param previous_stream_slice: Stream slice that was just processed (It can be the same as original_stream_slice or already reduced) 54 :param original_stream_slice: The original stream slice before any reduction 55 :return: Reduced stream slice 56 """ 57 new_slice = ( 58 self._cursor.reduce_slice_range(original_stream_slice) 59 if self._cursor 60 else previous_stream_slice 61 ) 62 63 if new_slice == previous_stream_slice: 64 self._number_of_attempt_with_same_slice += 1 65 if ( 66 self._number_of_attempt_with_same_slice 67 >= self._allowed_number_of_attempt_with_same_slice 68 ): 69 raise AirbyteTracedException( 70 internal_message=f"There were {self._number_of_attempt_with_same_slice} attempts with the same slice already while the max allowed is {self._allowed_number_of_attempt_with_same_slice}", 71 failure_type=FailureType.system_error, 72 ) 73 else: 74 self._number_of_attempt_with_same_slice = 0 75 76 self._reset() 77 return new_slice
class
PaginationTracker:
12class PaginationTracker: 13 _record_count: int 14 _number_of_attempt_with_same_slice: int 15 16 def __init__( 17 self, cursor: Optional[ConcurrentCursor] = None, max_number_of_records: Optional[int] = None 18 ) -> None: 19 """ 20 Ideally, we would have passed the `Cursor` interface here instead of `ConcurrentCursor` but not all 21 implementations of `Cursor` can support this use case. For example, if the `ConcurrentPerPartitionCursor` 22 switch to global state, we stop keeping track of the state per partition and therefore can't get an accurate 23 view for a specific stream_slice. In order to solve that, we decided to scope this feature to use only 24 ConcurrentCursor which is the only "leaf" cursor that actually emits stream slices with `cursor_partition`. 25 """ 26 self._cursor = cursor 27 self._limit = max_number_of_records 28 self._reset() 29 30 """ 31 Given we have a cursor, we do not allow for the same slice to be processed twice because we assume we will 32 always process the same slice. 33 34 Given no cursor, we assume that the pagination reset is for retrying purposes and we allow to retry once. 35 """ 36 self._allowed_number_of_attempt_with_same_slice = 1 if self._cursor else 2 37 self._number_of_attempt_with_same_slice = 0 38 39 def observe(self, record: Record) -> None: 40 self._record_count += 1 41 if self._cursor: 42 self._cursor.observe(record) 43 44 def has_reached_limit(self) -> bool: 45 return self._limit is not None and self._record_count >= self._limit 46 47 def _reset(self) -> None: 48 self._record_count = 0 49 50 def reduce_slice_range_if_possible( 51 self, previous_stream_slice: StreamSlice, original_stream_slice: StreamSlice 52 ) -> StreamSlice: 53 """ 54 :param previous_stream_slice: Stream slice that was just processed (It can be the same as original_stream_slice or already reduced) 55 :param original_stream_slice: The original stream slice before any reduction 56 :return: Reduced stream slice 57 """ 58 new_slice = ( 59 self._cursor.reduce_slice_range(original_stream_slice) 60 if self._cursor 61 else previous_stream_slice 62 ) 63 64 if new_slice == previous_stream_slice: 65 self._number_of_attempt_with_same_slice += 1 66 if ( 67 self._number_of_attempt_with_same_slice 68 >= self._allowed_number_of_attempt_with_same_slice 69 ): 70 raise AirbyteTracedException( 71 internal_message=f"There were {self._number_of_attempt_with_same_slice} attempts with the same slice already while the max allowed is {self._allowed_number_of_attempt_with_same_slice}", 72 failure_type=FailureType.system_error, 73 ) 74 else: 75 self._number_of_attempt_with_same_slice = 0 76 77 self._reset() 78 return new_slice
PaginationTracker( cursor: Optional[airbyte_cdk.ConcurrentCursor] = None, max_number_of_records: Optional[int] = None)
16 def __init__( 17 self, cursor: Optional[ConcurrentCursor] = None, max_number_of_records: Optional[int] = None 18 ) -> None: 19 """ 20 Ideally, we would have passed the `Cursor` interface here instead of `ConcurrentCursor` but not all 21 implementations of `Cursor` can support this use case. For example, if the `ConcurrentPerPartitionCursor` 22 switch to global state, we stop keeping track of the state per partition and therefore can't get an accurate 23 view for a specific stream_slice. In order to solve that, we decided to scope this feature to use only 24 ConcurrentCursor which is the only "leaf" cursor that actually emits stream slices with `cursor_partition`. 25 """ 26 self._cursor = cursor 27 self._limit = max_number_of_records 28 self._reset() 29 30 """ 31 Given we have a cursor, we do not allow for the same slice to be processed twice because we assume we will 32 always process the same slice. 33 34 Given no cursor, we assume that the pagination reset is for retrying purposes and we allow to retry once. 35 """ 36 self._allowed_number_of_attempt_with_same_slice = 1 if self._cursor else 2 37 self._number_of_attempt_with_same_slice = 0
Ideally, we would have passed the Cursor interface here instead of ConcurrentCursor but not all
implementations of Cursor can support this use case. For example, if the ConcurrentPerPartitionCursor
switch to global state, we stop keeping track of the state per partition and therefore can't get an accurate
view for a specific stream_slice. In order to solve that, we decided to scope this feature to use only
ConcurrentCursor which is the only "leaf" cursor that actually emits stream slices with cursor_partition.
def
reduce_slice_range_if_possible( self, previous_stream_slice: airbyte_cdk.StreamSlice, original_stream_slice: airbyte_cdk.StreamSlice) -> airbyte_cdk.StreamSlice:
50 def reduce_slice_range_if_possible( 51 self, previous_stream_slice: StreamSlice, original_stream_slice: StreamSlice 52 ) -> StreamSlice: 53 """ 54 :param previous_stream_slice: Stream slice that was just processed (It can be the same as original_stream_slice or already reduced) 55 :param original_stream_slice: The original stream slice before any reduction 56 :return: Reduced stream slice 57 """ 58 new_slice = ( 59 self._cursor.reduce_slice_range(original_stream_slice) 60 if self._cursor 61 else previous_stream_slice 62 ) 63 64 if new_slice == previous_stream_slice: 65 self._number_of_attempt_with_same_slice += 1 66 if ( 67 self._number_of_attempt_with_same_slice 68 >= self._allowed_number_of_attempt_with_same_slice 69 ): 70 raise AirbyteTracedException( 71 internal_message=f"There were {self._number_of_attempt_with_same_slice} attempts with the same slice already while the max allowed is {self._allowed_number_of_attempt_with_same_slice}", 72 failure_type=FailureType.system_error, 73 ) 74 else: 75 self._number_of_attempt_with_same_slice = 0 76 77 self._reset() 78 return new_slice
Parameters
- previous_stream_slice: Stream slice that was just processed (It can be the same as original_stream_slice or already reduced)
- original_stream_slice: The original stream slice before any reduction
Returns
Reduced stream slice