airbyte_cdk.sources.declarative.requesters.paginators.strategies.stop_condition
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from abc import ABC, abstractmethod 6from typing import Any, Optional 7 8import requests 9 10from airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy import ( 11 PaginationStrategy, 12) 13from airbyte_cdk.sources.streams.concurrent.cursor import Cursor 14from airbyte_cdk.sources.types import Record 15 16 17class PaginationStopCondition(ABC): 18 @abstractmethod 19 def is_met(self, record: Record) -> bool: 20 """ 21 Given a condition is met, the pagination will stop 22 23 :param record: a record used to evaluate the condition 24 """ 25 raise NotImplementedError() 26 27 28class CursorStopCondition(PaginationStopCondition): 29 def __init__( 30 self, 31 cursor: Cursor, 32 ): 33 self._cursor = cursor 34 35 def is_met(self, record: Record) -> bool: 36 return not self._cursor.should_be_synced(record) 37 38 39class StopConditionPaginationStrategyDecorator(PaginationStrategy): 40 def __init__(self, _delegate: PaginationStrategy, stop_condition: PaginationStopCondition): 41 self._delegate = _delegate 42 self._stop_condition = stop_condition 43 44 def next_page_token( 45 self, 46 response: requests.Response, 47 last_page_size: int, 48 last_record: Optional[Record], 49 last_page_token_value: Optional[Any] = None, 50 ) -> Optional[Any]: 51 # We evaluate in reverse order because the assumption is that most of the APIs using data feed structure 52 # will return records in descending order. In terms of performance/memory, we return the records lazily 53 if last_record and self._stop_condition.is_met(last_record): 54 return None 55 return self._delegate.next_page_token( 56 response, last_page_size, last_record, last_page_token_value 57 ) 58 59 def get_page_size(self) -> Optional[int]: 60 return self._delegate.get_page_size() 61 62 @property 63 def initial_token(self) -> Optional[Any]: 64 return self._delegate.initial_token
class
PaginationStopCondition(abc.ABC):
18class PaginationStopCondition(ABC): 19 @abstractmethod 20 def is_met(self, record: Record) -> bool: 21 """ 22 Given a condition is met, the pagination will stop 23 24 :param record: a record used to evaluate the condition 25 """ 26 raise NotImplementedError()
Helper class that provides a standard way to create an ABC using inheritance.
19 @abstractmethod 20 def is_met(self, record: Record) -> bool: 21 """ 22 Given a condition is met, the pagination will stop 23 24 :param record: a record used to evaluate the condition 25 """ 26 raise NotImplementedError()
Given a condition is met, the pagination will stop
Parameters
- record: a record used to evaluate the condition
29class CursorStopCondition(PaginationStopCondition): 30 def __init__( 31 self, 32 cursor: Cursor, 33 ): 34 self._cursor = cursor 35 36 def is_met(self, record: Record) -> bool: 37 return not self._cursor.should_be_synced(record)
Helper class that provides a standard way to create an ABC using inheritance.
CursorStopCondition(cursor: airbyte_cdk.Cursor)
class
StopConditionPaginationStrategyDecorator(airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy.PaginationStrategy):
40class StopConditionPaginationStrategyDecorator(PaginationStrategy): 41 def __init__(self, _delegate: PaginationStrategy, stop_condition: PaginationStopCondition): 42 self._delegate = _delegate 43 self._stop_condition = stop_condition 44 45 def next_page_token( 46 self, 47 response: requests.Response, 48 last_page_size: int, 49 last_record: Optional[Record], 50 last_page_token_value: Optional[Any] = None, 51 ) -> Optional[Any]: 52 # We evaluate in reverse order because the assumption is that most of the APIs using data feed structure 53 # will return records in descending order. In terms of performance/memory, we return the records lazily 54 if last_record and self._stop_condition.is_met(last_record): 55 return None 56 return self._delegate.next_page_token( 57 response, last_page_size, last_record, last_page_token_value 58 ) 59 60 def get_page_size(self) -> Optional[int]: 61 return self._delegate.get_page_size() 62 63 @property 64 def initial_token(self) -> Optional[Any]: 65 return self._delegate.initial_token
Defines how to get the next page token
StopConditionPaginationStrategyDecorator( _delegate: airbyte_cdk.PaginationStrategy, stop_condition: PaginationStopCondition)
def
next_page_token( self, response: requests.models.Response, last_page_size: int, last_record: Optional[airbyte_cdk.Record], last_page_token_value: Optional[Any] = None) -> Optional[Any]:
45 def next_page_token( 46 self, 47 response: requests.Response, 48 last_page_size: int, 49 last_record: Optional[Record], 50 last_page_token_value: Optional[Any] = None, 51 ) -> Optional[Any]: 52 # We evaluate in reverse order because the assumption is that most of the APIs using data feed structure 53 # will return records in descending order. In terms of performance/memory, we return the records lazily 54 if last_record and self._stop_condition.is_met(last_record): 55 return None 56 return self._delegate.next_page_token( 57 response, last_page_size, last_record, last_page_token_value 58 )
Parameters
- response: response to process
- last_page_size: the number of records read from the response
- last_record: the last record extracted from the response
- last_page_token_value: The current value of the page token made on the last request
Returns
next page token. Returns None if there are no more pages to fetch