airbyte_cdk.sources.declarative.requesters.paginators.strategies

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5from airbyte_cdk.sources.declarative.requesters.paginators.strategies.cursor_pagination_strategy import (
 6    CursorPaginationStrategy,
 7)
 8from airbyte_cdk.sources.declarative.requesters.paginators.strategies.offset_increment import (
 9    OffsetIncrement,
10)
11from airbyte_cdk.sources.declarative.requesters.paginators.strategies.page_increment import (
12    PageIncrement,
13)
14from airbyte_cdk.sources.declarative.requesters.paginators.strategies.stop_condition import (
15    CursorStopCondition,
16    StopConditionPaginationStrategyDecorator,
17)
18
19__all__ = [
20    "CursorPaginationStrategy",
21    "CursorStopCondition",
22    "OffsetIncrement",
23    "PageIncrement",
24    "StopConditionPaginationStrategyDecorator",
25]
24@dataclass
25class CursorPaginationStrategy(PaginationStrategy):
26    """
27    Pagination strategy that evaluates an interpolated string to define the next page token
28
29    Attributes:
30        page_size (Optional[int]): the number of records to request
31        cursor_value (Union[InterpolatedString, str]): template string evaluating to the cursor value
32        config (Config): connection config
33        stop_condition (Optional[InterpolatedBoolean]): template string evaluating when to stop paginating
34        decoder (Decoder): decoder to decode the response
35    """
36
37    cursor_value: Union[InterpolatedString, str]
38    config: Config
39    parameters: InitVar[Mapping[str, Any]]
40    page_size: Optional[int] = None
41    stop_condition: Optional[Union[InterpolatedBoolean, str]] = None
42    decoder: Decoder = field(
43        default_factory=lambda: PaginationDecoderDecorator(decoder=JsonDecoder(parameters={}))
44    )
45
46    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
47        if isinstance(self.cursor_value, str):
48            self._cursor_value = InterpolatedString.create(self.cursor_value, parameters=parameters)
49        else:
50            self._cursor_value = self.cursor_value
51        if isinstance(self.stop_condition, str):
52            self._stop_condition: Optional[InterpolatedBoolean] = InterpolatedBoolean(
53                condition=self.stop_condition, parameters=parameters
54            )
55        else:
56            self._stop_condition = self.stop_condition
57
58    @property
59    def initial_token(self) -> Optional[Any]:
60        """
61        CursorPaginationStrategy does not have an initial value because the next cursor is typically included
62        in the response of the first request. For Resumable Full Refresh streams that checkpoint the page
63        cursor, the next cursor should be read from the state or stream slice object.
64        """
65        return None
66
67    def next_page_token(
68        self,
69        response: requests.Response,
70        last_page_size: int,
71        last_record: Optional[Record],
72        last_page_token_value: Optional[Any] = None,
73    ) -> Optional[Any]:
74        decoded_response = next(self.decoder.decode(response))
75        # The default way that link is presented in requests.Response is a string of various links (last, next, etc). This
76        # is not indexable or useful for parsing the cursor, so we replace it with the link dictionary from response.links
77        headers: Dict[str, Any] = dict(response.headers)
78        headers["link"] = response.links
79        if self._stop_condition:
80            should_stop = self._stop_condition.eval(
81                self.config,
82                response=decoded_response,
83                headers=headers,
84                last_record=last_record,
85                last_page_size=last_page_size,
86            )
87            if should_stop:
88                return None
89        token = self._cursor_value.eval(
90            config=self.config,
91            response=decoded_response,
92            headers=headers,
93            last_record=last_record,
94            last_page_size=last_page_size,
95        )
96        return token if token else None
97
98    def get_page_size(self) -> Optional[int]:
99        return self.page_size

Pagination strategy that evaluates an interpolated string to define the next page token

Attributes:
  • page_size (Optional[int]): the number of records to request
  • cursor_value (Union[InterpolatedString, str]): template string evaluating to the cursor value
  • config (Config): connection config
  • stop_condition (Optional[InterpolatedBoolean]): template string evaluating when to stop paginating
  • decoder (Decoder): decoder to decode the response
CursorPaginationStrategy( cursor_value: Union[airbyte_cdk.InterpolatedString, str], config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], page_size: Optional[int] = None, stop_condition: Union[airbyte_cdk.InterpolatedBoolean, str, NoneType] = None, decoder: airbyte_cdk.Decoder = <factory>)
cursor_value: Union[airbyte_cdk.InterpolatedString, str]
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
page_size: Optional[int] = None
stop_condition: Union[airbyte_cdk.InterpolatedBoolean, str, NoneType] = None
initial_token: Optional[Any]
58    @property
59    def initial_token(self) -> Optional[Any]:
60        """
61        CursorPaginationStrategy does not have an initial value because the next cursor is typically included
62        in the response of the first request. For Resumable Full Refresh streams that checkpoint the page
63        cursor, the next cursor should be read from the state or stream slice object.
64        """
65        return None

CursorPaginationStrategy does not have an initial value because the next cursor is typically included in the response of the first request. For Resumable Full Refresh streams that checkpoint the page cursor, the next cursor should be read from the state or stream slice object.

def next_page_token( self, response: requests.models.Response, last_page_size: int, last_record: Optional[airbyte_cdk.Record], last_page_token_value: Optional[Any] = None) -> Optional[Any]:
67    def next_page_token(
68        self,
69        response: requests.Response,
70        last_page_size: int,
71        last_record: Optional[Record],
72        last_page_token_value: Optional[Any] = None,
73    ) -> Optional[Any]:
74        decoded_response = next(self.decoder.decode(response))
75        # The default way that link is presented in requests.Response is a string of various links (last, next, etc). This
76        # is not indexable or useful for parsing the cursor, so we replace it with the link dictionary from response.links
77        headers: Dict[str, Any] = dict(response.headers)
78        headers["link"] = response.links
79        if self._stop_condition:
80            should_stop = self._stop_condition.eval(
81                self.config,
82                response=decoded_response,
83                headers=headers,
84                last_record=last_record,
85                last_page_size=last_page_size,
86            )
87            if should_stop:
88                return None
89        token = self._cursor_value.eval(
90            config=self.config,
91            response=decoded_response,
92            headers=headers,
93            last_record=last_record,
94            last_page_size=last_page_size,
95        )
96        return token if token else None
Parameters
  • response: response to process
  • last_page_size: the number of records read from the response
  • last_record: the last record extracted from the response
  • last_page_token_value: The current value of the page token made on the last request
Returns

next page token. Returns None if there are no more pages to fetch

def get_page_size(self) -> Optional[int]:
98    def get_page_size(self) -> Optional[int]:
99        return self.page_size
Returns

page size: The number of records to fetch in a page. Returns None if unspecified

30class CursorStopCondition(PaginationStopCondition):
31    def __init__(
32        self,
33        cursor: DeclarativeCursor
34        | ConcurrentCursor,  # migrate to use both old and concurrent versions
35    ):
36        self._cursor = cursor
37
38    def is_met(self, record: Record) -> bool:
39        return not self._cursor.should_be_synced(record)

Helper class that provides a standard way to create an ABC using inheritance.

31    def __init__(
32        self,
33        cursor: DeclarativeCursor
34        | ConcurrentCursor,  # migrate to use both old and concurrent versions
35    ):
36        self._cursor = cursor
def is_met(self, record: airbyte_cdk.Record) -> bool:
38    def is_met(self, record: Record) -> bool:
39        return not self._cursor.should_be_synced(record)

Given a condition is met, the pagination will stop

Parameters
  • record: a record used to evaluate the condition
 24@dataclass
 25class OffsetIncrement(PaginationStrategy):
 26    """
 27    Pagination strategy that returns the number of records reads so far and returns it as the next page token
 28    Examples:
 29        # page_size to be a constant integer value
 30        pagination_strategy:
 31          type: OffsetIncrement
 32          page_size: 2
 33
 34        # page_size to be a constant string value
 35        pagination_strategy:
 36          type: OffsetIncrement
 37          page_size: "2"
 38
 39        # page_size to be an interpolated string value
 40        pagination_strategy:
 41          type: OffsetIncrement
 42          page_size: "{{ parameters['items_per_page'] }}"
 43
 44    Attributes:
 45        page_size (InterpolatedString): the number of records to request
 46    """
 47
 48    config: Config
 49    page_size: Optional[Union[str, int]]
 50    parameters: InitVar[Mapping[str, Any]]
 51    extractor: Optional[RecordExtractor]
 52    decoder: Decoder = field(
 53        default_factory=lambda: PaginationDecoderDecorator(decoder=JsonDecoder(parameters={}))
 54    )
 55    inject_on_first_request: bool = False
 56
 57    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 58        page_size = str(self.page_size) if isinstance(self.page_size, int) else self.page_size
 59        if page_size:
 60            self._page_size: Optional[InterpolatedString] = InterpolatedString(
 61                page_size, parameters=parameters
 62            )
 63        else:
 64            self._page_size = None
 65
 66    @property
 67    def initial_token(self) -> Optional[Any]:
 68        if self.inject_on_first_request:
 69            return 0
 70        return None
 71
 72    def next_page_token(
 73        self,
 74        response: requests.Response,
 75        last_page_size: int,
 76        last_record: Optional[Record],
 77        last_page_token_value: Optional[Any] = None,
 78    ) -> Optional[Any]:
 79        decoded_response = next(self.decoder.decode(response))
 80
 81        if self.extractor:
 82            page_size_from_response = len(list(self.extractor.extract_records(response=response)))
 83            # The extractor could return 0 records which is valid, but evaluates to False. Our fallback in other
 84            # cases as the best effort option is to use the incoming last_page_size
 85            last_page_size = (
 86                page_size_from_response if page_size_from_response is not None else last_page_size
 87            )
 88
 89        # Stop paginating when there are fewer records than the page size or the current page has no records
 90        if (
 91            self._page_size
 92            and last_page_size < self._page_size.eval(self.config, response=decoded_response)
 93        ) or last_page_size == 0:
 94            return None
 95        elif last_page_token_value is None:
 96            # If the OffsetIncrement strategy does not inject on the first request, the incoming last_page_token_value
 97            # will be None. For this case, we assume that None was the first page and progress to the next offset
 98            return 0 + last_page_size
 99        elif not isinstance(last_page_token_value, int):
100            raise ValueError(
101                f"Last page token value {last_page_token_value} for OffsetIncrement pagination strategy was not an integer"
102            )
103        else:
104            return last_page_token_value + last_page_size
105
106    def get_page_size(self) -> Optional[int]:
107        if self._page_size:
108            page_size = self._page_size.eval(self.config)
109            if not isinstance(page_size, int):
110                raise Exception(f"{page_size} is of type {type(page_size)}. Expected {int}")
111            return page_size
112        else:
113            return None

Pagination strategy that returns the number of records reads so far and returns it as the next page token

Examples:

page_size to be a constant integer value

pagination_strategy: type: OffsetIncrement page_size: 2

page_size to be a constant string value

pagination_strategy: type: OffsetIncrement page_size: "2"

page_size to be an interpolated string value

pagination_strategy: type: OffsetIncrement page_size: "{{ parameters['items_per_page'] }}"

Attributes:
  • page_size (InterpolatedString): the number of records to request
OffsetIncrement( config: Mapping[str, Any], page_size: Union[str, int, NoneType], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], extractor: Optional[airbyte_cdk.RecordExtractor], decoder: airbyte_cdk.Decoder = <factory>, inject_on_first_request: bool = False)
config: Mapping[str, Any]
page_size: Union[str, int, NoneType]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
extractor: Optional[airbyte_cdk.RecordExtractor]
inject_on_first_request: bool = False
initial_token: Optional[Any]
66    @property
67    def initial_token(self) -> Optional[Any]:
68        if self.inject_on_first_request:
69            return 0
70        return None

Return the initial value of the token

def next_page_token( self, response: requests.models.Response, last_page_size: int, last_record: Optional[airbyte_cdk.Record], last_page_token_value: Optional[Any] = None) -> Optional[Any]:
 72    def next_page_token(
 73        self,
 74        response: requests.Response,
 75        last_page_size: int,
 76        last_record: Optional[Record],
 77        last_page_token_value: Optional[Any] = None,
 78    ) -> Optional[Any]:
 79        decoded_response = next(self.decoder.decode(response))
 80
 81        if self.extractor:
 82            page_size_from_response = len(list(self.extractor.extract_records(response=response)))
 83            # The extractor could return 0 records which is valid, but evaluates to False. Our fallback in other
 84            # cases as the best effort option is to use the incoming last_page_size
 85            last_page_size = (
 86                page_size_from_response if page_size_from_response is not None else last_page_size
 87            )
 88
 89        # Stop paginating when there are fewer records than the page size or the current page has no records
 90        if (
 91            self._page_size
 92            and last_page_size < self._page_size.eval(self.config, response=decoded_response)
 93        ) or last_page_size == 0:
 94            return None
 95        elif last_page_token_value is None:
 96            # If the OffsetIncrement strategy does not inject on the first request, the incoming last_page_token_value
 97            # will be None. For this case, we assume that None was the first page and progress to the next offset
 98            return 0 + last_page_size
 99        elif not isinstance(last_page_token_value, int):
100            raise ValueError(
101                f"Last page token value {last_page_token_value} for OffsetIncrement pagination strategy was not an integer"
102            )
103        else:
104            return last_page_token_value + last_page_size
Parameters
  • response: response to process
  • last_page_size: the number of records read from the response
  • last_record: the last record extracted from the response
  • last_page_token_value: The current value of the page token made on the last request
Returns

next page token. Returns None if there are no more pages to fetch

def get_page_size(self) -> Optional[int]:
106    def get_page_size(self) -> Optional[int]:
107        if self._page_size:
108            page_size = self._page_size.eval(self.config)
109            if not isinstance(page_size, int):
110                raise Exception(f"{page_size} is of type {type(page_size)}. Expected {int}")
111            return page_size
112        else:
113            return None
Returns

page size: The number of records to fetch in a page. Returns None if unspecified

18@dataclass
19class PageIncrement(PaginationStrategy):
20    """
21    Pagination strategy that returns the number of pages reads so far and returns it as the next page token
22
23    Attributes:
24        page_size (int): the number of records to request
25        start_from_page (int): number of the initial page
26    """
27
28    config: Config
29    page_size: Optional[Union[str, int]]
30    parameters: InitVar[Mapping[str, Any]]
31    start_from_page: int = 0
32    inject_on_first_request: bool = False
33
34    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
35        if isinstance(self.page_size, int) or (self.page_size is None):
36            self._page_size = self.page_size
37        else:
38            page_size = InterpolatedString(self.page_size, parameters=parameters).eval(self.config)
39            if not isinstance(page_size, int):
40                raise Exception(f"{page_size} is of type {type(page_size)}. Expected {int}")
41            self._page_size = page_size
42
43    @property
44    def initial_token(self) -> Optional[Any]:
45        if self.inject_on_first_request:
46            return self.start_from_page
47        return None
48
49    def next_page_token(
50        self,
51        response: requests.Response,
52        last_page_size: int,
53        last_record: Optional[Record],
54        last_page_token_value: Optional[Any],
55    ) -> Optional[Any]:
56        # Stop paginating when there are fewer records than the page size or the current page has no records
57        if (self._page_size and last_page_size < self._page_size) or last_page_size == 0:
58            return None
59        elif last_page_token_value is None:
60            # If the PageIncrement strategy does not inject on the first request, the incoming last_page_token_value
61            # may be None. When this is the case, we assume we've already requested the first page specified by
62            # start_from_page and must now get the next page
63            return self.start_from_page + 1
64        elif not isinstance(last_page_token_value, int):
65            raise ValueError(
66                f"Last page token value {last_page_token_value} for PageIncrement pagination strategy was not an integer"
67            )
68        else:
69            return last_page_token_value + 1
70
71    def get_page_size(self) -> Optional[int]:
72        return self._page_size

Pagination strategy that returns the number of pages reads so far and returns it as the next page token

Attributes:
  • page_size (int): the number of records to request
  • start_from_page (int): number of the initial page
PageIncrement( config: Mapping[str, Any], page_size: Union[str, int, NoneType], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], start_from_page: int = 0, inject_on_first_request: bool = False)
config: Mapping[str, Any]
page_size: Union[str, int, NoneType]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
start_from_page: int = 0
inject_on_first_request: bool = False
initial_token: Optional[Any]
43    @property
44    def initial_token(self) -> Optional[Any]:
45        if self.inject_on_first_request:
46            return self.start_from_page
47        return None

Return the initial value of the token

def next_page_token( self, response: requests.models.Response, last_page_size: int, last_record: Optional[airbyte_cdk.Record], last_page_token_value: Optional[Any]) -> Optional[Any]:
49    def next_page_token(
50        self,
51        response: requests.Response,
52        last_page_size: int,
53        last_record: Optional[Record],
54        last_page_token_value: Optional[Any],
55    ) -> Optional[Any]:
56        # Stop paginating when there are fewer records than the page size or the current page has no records
57        if (self._page_size and last_page_size < self._page_size) or last_page_size == 0:
58            return None
59        elif last_page_token_value is None:
60            # If the PageIncrement strategy does not inject on the first request, the incoming last_page_token_value
61            # may be None. When this is the case, we assume we've already requested the first page specified by
62            # start_from_page and must now get the next page
63            return self.start_from_page + 1
64        elif not isinstance(last_page_token_value, int):
65            raise ValueError(
66                f"Last page token value {last_page_token_value} for PageIncrement pagination strategy was not an integer"
67            )
68        else:
69            return last_page_token_value + 1
Parameters
  • response: response to process
  • last_page_size: the number of records read from the response
  • last_record: the last record extracted from the response
  • last_page_token_value: The current value of the page token made on the last request
Returns

next page token. Returns None if there are no more pages to fetch

def get_page_size(self) -> Optional[int]:
71    def get_page_size(self) -> Optional[int]:
72        return self._page_size
Returns

page size: The number of records to fetch in a page. Returns None if unspecified

42class StopConditionPaginationStrategyDecorator(PaginationStrategy):
43    def __init__(self, _delegate: PaginationStrategy, stop_condition: PaginationStopCondition):
44        self._delegate = _delegate
45        self._stop_condition = stop_condition
46
47    def next_page_token(
48        self,
49        response: requests.Response,
50        last_page_size: int,
51        last_record: Optional[Record],
52        last_page_token_value: Optional[Any] = None,
53    ) -> Optional[Any]:
54        # We evaluate in reverse order because the assumption is that most of the APIs using data feed structure
55        # will return records in descending order. In terms of performance/memory, we return the records lazily
56        if last_record and self._stop_condition.is_met(last_record):
57            return None
58        return self._delegate.next_page_token(
59            response, last_page_size, last_record, last_page_token_value
60        )
61
62    def get_page_size(self) -> Optional[int]:
63        return self._delegate.get_page_size()
64
65    @property
66    def initial_token(self) -> Optional[Any]:
67        return self._delegate.initial_token

Defines how to get the next page token

43    def __init__(self, _delegate: PaginationStrategy, stop_condition: PaginationStopCondition):
44        self._delegate = _delegate
45        self._stop_condition = stop_condition
def next_page_token( self, response: requests.models.Response, last_page_size: int, last_record: Optional[airbyte_cdk.Record], last_page_token_value: Optional[Any] = None) -> Optional[Any]:
47    def next_page_token(
48        self,
49        response: requests.Response,
50        last_page_size: int,
51        last_record: Optional[Record],
52        last_page_token_value: Optional[Any] = None,
53    ) -> Optional[Any]:
54        # We evaluate in reverse order because the assumption is that most of the APIs using data feed structure
55        # will return records in descending order. In terms of performance/memory, we return the records lazily
56        if last_record and self._stop_condition.is_met(last_record):
57            return None
58        return self._delegate.next_page_token(
59            response, last_page_size, last_record, last_page_token_value
60        )
Parameters
  • response: response to process
  • last_page_size: the number of records read from the response
  • last_record: the last record extracted from the response
  • last_page_token_value: The current value of the page token made on the last request
Returns

next page token. Returns None if there are no more pages to fetch

def get_page_size(self) -> Optional[int]:
62    def get_page_size(self) -> Optional[int]:
63        return self._delegate.get_page_size()
Returns

page size: The number of records to fetch in a page. Returns None if unspecified

initial_token: Optional[Any]
65    @property
66    def initial_token(self) -> Optional[Any]:
67        return self._delegate.initial_token

Return the initial value of the token