airbyte_cdk.sources.declarative.retrievers.pagination_tracker
1from typing import Optional 2 3from airbyte_cdk.sources.declarative.models import FailureType 4from airbyte_cdk.sources.declarative.types import Record, StreamSlice 5from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor 6from airbyte_cdk.utils.traced_exception import AirbyteTracedException 7 8 9class PaginationTracker: 10 _record_count: int 11 _number_of_attempt_with_same_slice: int 12 13 def __init__( 14 self, cursor: Optional[ConcurrentCursor] = None, max_number_of_records: Optional[int] = None 15 ) -> None: 16 """ 17 Ideally, we would have passed the `Cursor` interface here instead of `ConcurrentCursor` but not all 18 implementations of `Cursor` can support this use case. For example, if the `ConcurrentPerPartitionCursor` 19 switch to global state, we stop keeping track of the state per partition and therefore can't get an accurate 20 view for a specific stream_slice. In order to solve that, we decided to scope this feature to use only 21 ConcurrentCursor which is the only "leaf" cursor that actually emits stream slices with `cursor_partition`. 22 """ 23 self._cursor = cursor 24 self._limit = max_number_of_records 25 self._reset() 26 27 """ 28 Given we have a cursor, we do not allow for the same slice to be processed twice because we assume we will 29 always process the same slice. 30 31 Given no cursor, we assume that the pagination reset is for retrying purposes and we allow to retry once. 32 """ 33 self._allowed_number_of_attempt_with_same_slice = 1 if self._cursor else 2 34 self._number_of_attempt_with_same_slice = 0 35 36 def observe(self, record: Record) -> None: 37 self._record_count += 1 38 if self._cursor: 39 self._cursor.observe(record) 40 41 def has_reached_limit(self) -> bool: 42 return self._limit is not None and self._record_count >= self._limit 43 44 def _reset(self) -> None: 45 self._record_count = 0 46 47 def reduce_slice_range_if_possible(self, stream_slice: StreamSlice) -> StreamSlice: 48 new_slice = self._cursor.reduce_slice_range(stream_slice) if self._cursor else stream_slice 49 50 if new_slice == stream_slice: 51 self._number_of_attempt_with_same_slice += 1 52 if ( 53 self._number_of_attempt_with_same_slice 54 >= self._allowed_number_of_attempt_with_same_slice 55 ): 56 raise AirbyteTracedException( 57 internal_message=f"There were {self._number_of_attempt_with_same_slice} attempts with the same slice already while the max allowed is {self._allowed_number_of_attempt_with_same_slice}", 58 failure_type=FailureType.system_error, 59 ) 60 else: 61 self._number_of_attempt_with_same_slice = 0 62 63 self._reset() 64 return new_slice
class
PaginationTracker:
10class PaginationTracker: 11 _record_count: int 12 _number_of_attempt_with_same_slice: int 13 14 def __init__( 15 self, cursor: Optional[ConcurrentCursor] = None, max_number_of_records: Optional[int] = None 16 ) -> None: 17 """ 18 Ideally, we would have passed the `Cursor` interface here instead of `ConcurrentCursor` but not all 19 implementations of `Cursor` can support this use case. For example, if the `ConcurrentPerPartitionCursor` 20 switch to global state, we stop keeping track of the state per partition and therefore can't get an accurate 21 view for a specific stream_slice. In order to solve that, we decided to scope this feature to use only 22 ConcurrentCursor which is the only "leaf" cursor that actually emits stream slices with `cursor_partition`. 23 """ 24 self._cursor = cursor 25 self._limit = max_number_of_records 26 self._reset() 27 28 """ 29 Given we have a cursor, we do not allow for the same slice to be processed twice because we assume we will 30 always process the same slice. 31 32 Given no cursor, we assume that the pagination reset is for retrying purposes and we allow to retry once. 33 """ 34 self._allowed_number_of_attempt_with_same_slice = 1 if self._cursor else 2 35 self._number_of_attempt_with_same_slice = 0 36 37 def observe(self, record: Record) -> None: 38 self._record_count += 1 39 if self._cursor: 40 self._cursor.observe(record) 41 42 def has_reached_limit(self) -> bool: 43 return self._limit is not None and self._record_count >= self._limit 44 45 def _reset(self) -> None: 46 self._record_count = 0 47 48 def reduce_slice_range_if_possible(self, stream_slice: StreamSlice) -> StreamSlice: 49 new_slice = self._cursor.reduce_slice_range(stream_slice) if self._cursor else stream_slice 50 51 if new_slice == stream_slice: 52 self._number_of_attempt_with_same_slice += 1 53 if ( 54 self._number_of_attempt_with_same_slice 55 >= self._allowed_number_of_attempt_with_same_slice 56 ): 57 raise AirbyteTracedException( 58 internal_message=f"There were {self._number_of_attempt_with_same_slice} attempts with the same slice already while the max allowed is {self._allowed_number_of_attempt_with_same_slice}", 59 failure_type=FailureType.system_error, 60 ) 61 else: 62 self._number_of_attempt_with_same_slice = 0 63 64 self._reset() 65 return new_slice
PaginationTracker( cursor: Optional[airbyte_cdk.ConcurrentCursor] = None, max_number_of_records: Optional[int] = None)
14 def __init__( 15 self, cursor: Optional[ConcurrentCursor] = None, max_number_of_records: Optional[int] = None 16 ) -> None: 17 """ 18 Ideally, we would have passed the `Cursor` interface here instead of `ConcurrentCursor` but not all 19 implementations of `Cursor` can support this use case. For example, if the `ConcurrentPerPartitionCursor` 20 switch to global state, we stop keeping track of the state per partition and therefore can't get an accurate 21 view for a specific stream_slice. In order to solve that, we decided to scope this feature to use only 22 ConcurrentCursor which is the only "leaf" cursor that actually emits stream slices with `cursor_partition`. 23 """ 24 self._cursor = cursor 25 self._limit = max_number_of_records 26 self._reset() 27 28 """ 29 Given we have a cursor, we do not allow for the same slice to be processed twice because we assume we will 30 always process the same slice. 31 32 Given no cursor, we assume that the pagination reset is for retrying purposes and we allow to retry once. 33 """ 34 self._allowed_number_of_attempt_with_same_slice = 1 if self._cursor else 2 35 self._number_of_attempt_with_same_slice = 0
Ideally, we would have passed the Cursor
interface here instead of ConcurrentCursor
but not all
implementations of Cursor
can support this use case. For example, if the ConcurrentPerPartitionCursor
switch to global state, we stop keeping track of the state per partition and therefore can't get an accurate
view for a specific stream_slice. In order to solve that, we decided to scope this feature to use only
ConcurrentCursor which is the only "leaf" cursor that actually emits stream slices with cursor_partition
.
def
reduce_slice_range_if_possible( self, stream_slice: airbyte_cdk.StreamSlice) -> airbyte_cdk.StreamSlice:
48 def reduce_slice_range_if_possible(self, stream_slice: StreamSlice) -> StreamSlice: 49 new_slice = self._cursor.reduce_slice_range(stream_slice) if self._cursor else stream_slice 50 51 if new_slice == stream_slice: 52 self._number_of_attempt_with_same_slice += 1 53 if ( 54 self._number_of_attempt_with_same_slice 55 >= self._allowed_number_of_attempt_with_same_slice 56 ): 57 raise AirbyteTracedException( 58 internal_message=f"There were {self._number_of_attempt_with_same_slice} attempts with the same slice already while the max allowed is {self._allowed_number_of_attempt_with_same_slice}", 59 failure_type=FailureType.system_error, 60 ) 61 else: 62 self._number_of_attempt_with_same_slice = 0 63 64 self._reset() 65 return new_slice