airbyte_cdk.sources.declarative.requesters.paginators.strategies.stop_condition

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5from abc import ABC, abstractmethod
 6from typing import Any, Optional
 7
 8import requests
 9
10from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
11from airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy import (
12    PaginationStrategy,
13)
14from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor
15from airbyte_cdk.sources.types import Record
16
17
18class PaginationStopCondition(ABC):
19    @abstractmethod
20    def is_met(self, record: Record) -> bool:
21        """
22        Given a condition is met, the pagination will stop
23
24        :param record: a record used to evaluate the condition
25        """
26        raise NotImplementedError()
27
28
29class CursorStopCondition(PaginationStopCondition):
30    def __init__(
31        self,
32        cursor: DeclarativeCursor
33        | ConcurrentCursor,  # migrate to use both old and concurrent versions
34    ):
35        self._cursor = cursor
36
37    def is_met(self, record: Record) -> bool:
38        return not self._cursor.should_be_synced(record)
39
40
41class StopConditionPaginationStrategyDecorator(PaginationStrategy):
42    def __init__(self, _delegate: PaginationStrategy, stop_condition: PaginationStopCondition):
43        self._delegate = _delegate
44        self._stop_condition = stop_condition
45
46    def next_page_token(
47        self,
48        response: requests.Response,
49        last_page_size: int,
50        last_record: Optional[Record],
51        last_page_token_value: Optional[Any] = None,
52    ) -> Optional[Any]:
53        # We evaluate in reverse order because the assumption is that most of the APIs using data feed structure
54        # will return records in descending order. In terms of performance/memory, we return the records lazily
55        if last_record and self._stop_condition.is_met(last_record):
56            return None
57        return self._delegate.next_page_token(
58            response, last_page_size, last_record, last_page_token_value
59        )
60
61    def get_page_size(self) -> Optional[int]:
62        return self._delegate.get_page_size()
63
64    @property
65    def initial_token(self) -> Optional[Any]:
66        return self._delegate.initial_token
class PaginationStopCondition(abc.ABC):
19class PaginationStopCondition(ABC):
20    @abstractmethod
21    def is_met(self, record: Record) -> bool:
22        """
23        Given a condition is met, the pagination will stop
24
25        :param record: a record used to evaluate the condition
26        """
27        raise NotImplementedError()

Helper class that provides a standard way to create an ABC using inheritance.

@abstractmethod
def is_met(self, record: airbyte_cdk.Record) -> bool:
20    @abstractmethod
21    def is_met(self, record: Record) -> bool:
22        """
23        Given a condition is met, the pagination will stop
24
25        :param record: a record used to evaluate the condition
26        """
27        raise NotImplementedError()

Given a condition is met, the pagination will stop

Parameters
  • record: a record used to evaluate the condition
class CursorStopCondition(PaginationStopCondition):
30class CursorStopCondition(PaginationStopCondition):
31    def __init__(
32        self,
33        cursor: DeclarativeCursor
34        | ConcurrentCursor,  # migrate to use both old and concurrent versions
35    ):
36        self._cursor = cursor
37
38    def is_met(self, record: Record) -> bool:
39        return not self._cursor.should_be_synced(record)

Helper class that provides a standard way to create an ABC using inheritance.

31    def __init__(
32        self,
33        cursor: DeclarativeCursor
34        | ConcurrentCursor,  # migrate to use both old and concurrent versions
35    ):
36        self._cursor = cursor
def is_met(self, record: airbyte_cdk.Record) -> bool:
38    def is_met(self, record: Record) -> bool:
39        return not self._cursor.should_be_synced(record)

Given a condition is met, the pagination will stop

Parameters
  • record: a record used to evaluate the condition
42class StopConditionPaginationStrategyDecorator(PaginationStrategy):
43    def __init__(self, _delegate: PaginationStrategy, stop_condition: PaginationStopCondition):
44        self._delegate = _delegate
45        self._stop_condition = stop_condition
46
47    def next_page_token(
48        self,
49        response: requests.Response,
50        last_page_size: int,
51        last_record: Optional[Record],
52        last_page_token_value: Optional[Any] = None,
53    ) -> Optional[Any]:
54        # We evaluate in reverse order because the assumption is that most of the APIs using data feed structure
55        # will return records in descending order. In terms of performance/memory, we return the records lazily
56        if last_record and self._stop_condition.is_met(last_record):
57            return None
58        return self._delegate.next_page_token(
59            response, last_page_size, last_record, last_page_token_value
60        )
61
62    def get_page_size(self) -> Optional[int]:
63        return self._delegate.get_page_size()
64
65    @property
66    def initial_token(self) -> Optional[Any]:
67        return self._delegate.initial_token

Defines how to get the next page token

StopConditionPaginationStrategyDecorator( _delegate: airbyte_cdk.PaginationStrategy, stop_condition: PaginationStopCondition)
43    def __init__(self, _delegate: PaginationStrategy, stop_condition: PaginationStopCondition):
44        self._delegate = _delegate
45        self._stop_condition = stop_condition
def next_page_token( self, response: requests.models.Response, last_page_size: int, last_record: Optional[airbyte_cdk.Record], last_page_token_value: Optional[Any] = None) -> Optional[Any]:
47    def next_page_token(
48        self,
49        response: requests.Response,
50        last_page_size: int,
51        last_record: Optional[Record],
52        last_page_token_value: Optional[Any] = None,
53    ) -> Optional[Any]:
54        # We evaluate in reverse order because the assumption is that most of the APIs using data feed structure
55        # will return records in descending order. In terms of performance/memory, we return the records lazily
56        if last_record and self._stop_condition.is_met(last_record):
57            return None
58        return self._delegate.next_page_token(
59            response, last_page_size, last_record, last_page_token_value
60        )
Parameters
  • response: response to process
  • last_page_size: the number of records read from the response
  • last_record: the last record extracted from the response
  • last_page_token_value: The current value of the page token made on the last request
Returns

next page token. Returns None if there are no more pages to fetch

def get_page_size(self) -> Optional[int]:
62    def get_page_size(self) -> Optional[int]:
63        return self._delegate.get_page_size()
Returns

page size: The number of records to fetch in a page. Returns None if unspecified

initial_token: Optional[Any]
65    @property
66    def initial_token(self) -> Optional[Any]:
67        return self._delegate.initial_token

Return the initial value of the token