airbyte_cdk.sources.declarative.requesters.paginators.strategies.stop_condition

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5from abc import ABC, abstractmethod
 6from typing import Any, Optional
 7
 8import requests
 9
10from airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy import (
11    PaginationStrategy,
12)
13from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
14from airbyte_cdk.sources.types import Record
15
16
17class PaginationStopCondition(ABC):
18    @abstractmethod
19    def is_met(self, record: Record) -> bool:
20        """
21        Given a condition is met, the pagination will stop
22
23        :param record: a record used to evaluate the condition
24        """
25        raise NotImplementedError()
26
27
28class CursorStopCondition(PaginationStopCondition):
29    def __init__(
30        self,
31        cursor: Cursor,
32    ):
33        self._cursor = cursor
34
35    def is_met(self, record: Record) -> bool:
36        return not self._cursor.should_be_synced(record)
37
38
39class StopConditionPaginationStrategyDecorator(PaginationStrategy):
40    def __init__(self, _delegate: PaginationStrategy, stop_condition: PaginationStopCondition):
41        self._delegate = _delegate
42        self._stop_condition = stop_condition
43
44    def next_page_token(
45        self,
46        response: requests.Response,
47        last_page_size: int,
48        last_record: Optional[Record],
49        last_page_token_value: Optional[Any] = None,
50    ) -> Optional[Any]:
51        # We evaluate in reverse order because the assumption is that most of the APIs using data feed structure
52        # will return records in descending order. In terms of performance/memory, we return the records lazily
53        if last_record and self._stop_condition.is_met(last_record):
54            return None
55        return self._delegate.next_page_token(
56            response, last_page_size, last_record, last_page_token_value
57        )
58
59    def get_page_size(self) -> Optional[int]:
60        return self._delegate.get_page_size()
61
62    @property
63    def initial_token(self) -> Optional[Any]:
64        return self._delegate.initial_token
class PaginationStopCondition(abc.ABC):
18class PaginationStopCondition(ABC):
19    @abstractmethod
20    def is_met(self, record: Record) -> bool:
21        """
22        Given a condition is met, the pagination will stop
23
24        :param record: a record used to evaluate the condition
25        """
26        raise NotImplementedError()

Helper class that provides a standard way to create an ABC using inheritance.

@abstractmethod
def is_met(self, record: airbyte_cdk.Record) -> bool:
19    @abstractmethod
20    def is_met(self, record: Record) -> bool:
21        """
22        Given a condition is met, the pagination will stop
23
24        :param record: a record used to evaluate the condition
25        """
26        raise NotImplementedError()

Given a condition is met, the pagination will stop

Parameters
  • record: a record used to evaluate the condition
class CursorStopCondition(PaginationStopCondition):
29class CursorStopCondition(PaginationStopCondition):
30    def __init__(
31        self,
32        cursor: Cursor,
33    ):
34        self._cursor = cursor
35
36    def is_met(self, record: Record) -> bool:
37        return not self._cursor.should_be_synced(record)

Helper class that provides a standard way to create an ABC using inheritance.

CursorStopCondition(cursor: airbyte_cdk.Cursor)
30    def __init__(
31        self,
32        cursor: Cursor,
33    ):
34        self._cursor = cursor
def is_met(self, record: airbyte_cdk.Record) -> bool:
36    def is_met(self, record: Record) -> bool:
37        return not self._cursor.should_be_synced(record)

Given a condition is met, the pagination will stop

Parameters
  • record: a record used to evaluate the condition
40class StopConditionPaginationStrategyDecorator(PaginationStrategy):
41    def __init__(self, _delegate: PaginationStrategy, stop_condition: PaginationStopCondition):
42        self._delegate = _delegate
43        self._stop_condition = stop_condition
44
45    def next_page_token(
46        self,
47        response: requests.Response,
48        last_page_size: int,
49        last_record: Optional[Record],
50        last_page_token_value: Optional[Any] = None,
51    ) -> Optional[Any]:
52        # We evaluate in reverse order because the assumption is that most of the APIs using data feed structure
53        # will return records in descending order. In terms of performance/memory, we return the records lazily
54        if last_record and self._stop_condition.is_met(last_record):
55            return None
56        return self._delegate.next_page_token(
57            response, last_page_size, last_record, last_page_token_value
58        )
59
60    def get_page_size(self) -> Optional[int]:
61        return self._delegate.get_page_size()
62
63    @property
64    def initial_token(self) -> Optional[Any]:
65        return self._delegate.initial_token

Defines how to get the next page token

StopConditionPaginationStrategyDecorator( _delegate: airbyte_cdk.PaginationStrategy, stop_condition: PaginationStopCondition)
41    def __init__(self, _delegate: PaginationStrategy, stop_condition: PaginationStopCondition):
42        self._delegate = _delegate
43        self._stop_condition = stop_condition
def next_page_token( self, response: requests.models.Response, last_page_size: int, last_record: Optional[airbyte_cdk.Record], last_page_token_value: Optional[Any] = None) -> Optional[Any]:
45    def next_page_token(
46        self,
47        response: requests.Response,
48        last_page_size: int,
49        last_record: Optional[Record],
50        last_page_token_value: Optional[Any] = None,
51    ) -> Optional[Any]:
52        # We evaluate in reverse order because the assumption is that most of the APIs using data feed structure
53        # will return records in descending order. In terms of performance/memory, we return the records lazily
54        if last_record and self._stop_condition.is_met(last_record):
55            return None
56        return self._delegate.next_page_token(
57            response, last_page_size, last_record, last_page_token_value
58        )
Parameters
  • response: response to process
  • last_page_size: the number of records read from the response
  • last_record: the last record extracted from the response
  • last_page_token_value: The current value of the page token made on the last request
Returns

next page token. Returns None if there are no more pages to fetch

def get_page_size(self) -> Optional[int]:
60    def get_page_size(self) -> Optional[int]:
61        return self._delegate.get_page_size()
Returns

page size: The number of records to fetch in a page. Returns None if unspecified

initial_token: Optional[Any]
63    @property
64    def initial_token(self) -> Optional[Any]:
65        return self._delegate.initial_token

Return the initial value of the token