airbyte_cdk.sources.declarative.requesters.paginators.strategies.stop_condition
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from abc import ABC, abstractmethod 6from typing import Any, Optional 7 8import requests 9 10from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor 11from airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy import ( 12 PaginationStrategy, 13) 14from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor 15from airbyte_cdk.sources.types import Record 16 17 18class PaginationStopCondition(ABC): 19 @abstractmethod 20 def is_met(self, record: Record) -> bool: 21 """ 22 Given a condition is met, the pagination will stop 23 24 :param record: a record used to evaluate the condition 25 """ 26 raise NotImplementedError() 27 28 29class CursorStopCondition(PaginationStopCondition): 30 def __init__( 31 self, 32 cursor: DeclarativeCursor 33 | ConcurrentCursor, # migrate to use both old and concurrent versions 34 ): 35 self._cursor = cursor 36 37 def is_met(self, record: Record) -> bool: 38 return not self._cursor.should_be_synced(record) 39 40 41class StopConditionPaginationStrategyDecorator(PaginationStrategy): 42 def __init__(self, _delegate: PaginationStrategy, stop_condition: PaginationStopCondition): 43 self._delegate = _delegate 44 self._stop_condition = stop_condition 45 46 def next_page_token( 47 self, 48 response: requests.Response, 49 last_page_size: int, 50 last_record: Optional[Record], 51 last_page_token_value: Optional[Any] = None, 52 ) -> Optional[Any]: 53 # We evaluate in reverse order because the assumption is that most of the APIs using data feed structure 54 # will return records in descending order. In terms of performance/memory, we return the records lazily 55 if last_record and self._stop_condition.is_met(last_record): 56 return None 57 return self._delegate.next_page_token( 58 response, last_page_size, last_record, last_page_token_value 59 ) 60 61 def get_page_size(self) -> Optional[int]: 62 return self._delegate.get_page_size() 63 64 @property 65 def initial_token(self) -> Optional[Any]: 66 return self._delegate.initial_token
class
PaginationStopCondition(abc.ABC):
19class PaginationStopCondition(ABC): 20 @abstractmethod 21 def is_met(self, record: Record) -> bool: 22 """ 23 Given a condition is met, the pagination will stop 24 25 :param record: a record used to evaluate the condition 26 """ 27 raise NotImplementedError()
Helper class that provides a standard way to create an ABC using inheritance.
20 @abstractmethod 21 def is_met(self, record: Record) -> bool: 22 """ 23 Given a condition is met, the pagination will stop 24 25 :param record: a record used to evaluate the condition 26 """ 27 raise NotImplementedError()
Given a condition is met, the pagination will stop
Parameters
- record: a record used to evaluate the condition
30class CursorStopCondition(PaginationStopCondition): 31 def __init__( 32 self, 33 cursor: DeclarativeCursor 34 | ConcurrentCursor, # migrate to use both old and concurrent versions 35 ): 36 self._cursor = cursor 37 38 def is_met(self, record: Record) -> bool: 39 return not self._cursor.should_be_synced(record)
Helper class that provides a standard way to create an ABC using inheritance.
CursorStopCondition( cursor: airbyte_cdk.sources.declarative.incremental.DeclarativeCursor | airbyte_cdk.ConcurrentCursor)
class
StopConditionPaginationStrategyDecorator(airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy.PaginationStrategy):
42class StopConditionPaginationStrategyDecorator(PaginationStrategy): 43 def __init__(self, _delegate: PaginationStrategy, stop_condition: PaginationStopCondition): 44 self._delegate = _delegate 45 self._stop_condition = stop_condition 46 47 def next_page_token( 48 self, 49 response: requests.Response, 50 last_page_size: int, 51 last_record: Optional[Record], 52 last_page_token_value: Optional[Any] = None, 53 ) -> Optional[Any]: 54 # We evaluate in reverse order because the assumption is that most of the APIs using data feed structure 55 # will return records in descending order. In terms of performance/memory, we return the records lazily 56 if last_record and self._stop_condition.is_met(last_record): 57 return None 58 return self._delegate.next_page_token( 59 response, last_page_size, last_record, last_page_token_value 60 ) 61 62 def get_page_size(self) -> Optional[int]: 63 return self._delegate.get_page_size() 64 65 @property 66 def initial_token(self) -> Optional[Any]: 67 return self._delegate.initial_token
Defines how to get the next page token
StopConditionPaginationStrategyDecorator( _delegate: airbyte_cdk.PaginationStrategy, stop_condition: PaginationStopCondition)
def
next_page_token( self, response: requests.models.Response, last_page_size: int, last_record: Optional[airbyte_cdk.Record], last_page_token_value: Optional[Any] = None) -> Optional[Any]:
47 def next_page_token( 48 self, 49 response: requests.Response, 50 last_page_size: int, 51 last_record: Optional[Record], 52 last_page_token_value: Optional[Any] = None, 53 ) -> Optional[Any]: 54 # We evaluate in reverse order because the assumption is that most of the APIs using data feed structure 55 # will return records in descending order. In terms of performance/memory, we return the records lazily 56 if last_record and self._stop_condition.is_met(last_record): 57 return None 58 return self._delegate.next_page_token( 59 response, last_page_size, last_record, last_page_token_value 60 )
Parameters
- response: response to process
- last_page_size: the number of records read from the response
- last_record: the last record extracted from the response
- last_page_token_value: The current value of the page token made on the last request
Returns
next page token. Returns None if there are no more pages to fetch