airbyte_cdk.sources.declarative.requesters.paginators.strategies

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5from airbyte_cdk.sources.declarative.requesters.paginators.strategies.cursor_pagination_strategy import (
 6    CursorPaginationStrategy,
 7)
 8from airbyte_cdk.sources.declarative.requesters.paginators.strategies.offset_increment import (
 9    OffsetIncrement,
10)
11from airbyte_cdk.sources.declarative.requesters.paginators.strategies.page_increment import (
12    PageIncrement,
13)
14from airbyte_cdk.sources.declarative.requesters.paginators.strategies.stop_condition import (
15    CursorStopCondition,
16    StopConditionPaginationStrategyDecorator,
17)
18
19__all__ = [
20    "CursorPaginationStrategy",
21    "CursorStopCondition",
22    "OffsetIncrement",
23    "PageIncrement",
24    "StopConditionPaginationStrategyDecorator",
25]
24@dataclass
25class CursorPaginationStrategy(PaginationStrategy):
26    """
27    Pagination strategy that evaluates an interpolated string to define the next page token
28
29    Attributes:
30        page_size (Optional[int]): the number of records to request
31        cursor_value (Union[InterpolatedString, str]): template string evaluating to the cursor value
32        config (Config): connection config
33        stop_condition (Optional[InterpolatedBoolean]): template string evaluating when to stop paginating
34        decoder (Decoder): decoder to decode the response
35    """
36
37    cursor_value: Union[InterpolatedString, str]
38    config: Config
39    parameters: InitVar[Mapping[str, Any]]
40    page_size: Optional[int] = None
41    stop_condition: Optional[Union[InterpolatedBoolean, str]] = None
42    decoder: Decoder = field(
43        default_factory=lambda: PaginationDecoderDecorator(decoder=JsonDecoder(parameters={}))
44    )
45
46    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
47        if isinstance(self.cursor_value, str):
48            self._cursor_value = InterpolatedString.create(self.cursor_value, parameters=parameters)
49        else:
50            self._cursor_value = self.cursor_value
51        if isinstance(self.stop_condition, str):
52            self._stop_condition: Optional[InterpolatedBoolean] = InterpolatedBoolean(
53                condition=self.stop_condition, parameters=parameters
54            )
55        else:
56            self._stop_condition = self.stop_condition
57
58    @property
59    def initial_token(self) -> Optional[Any]:
60        """
61        CursorPaginationStrategy does not have an initial value because the next cursor is typically included
62        in the response of the first request. For Resumable Full Refresh streams that checkpoint the page
63        cursor, the next cursor should be read from the state or stream slice object.
64        """
65        return None
66
67    def next_page_token(
68        self,
69        response: requests.Response,
70        last_page_size: int,
71        last_record: Optional[Record],
72        last_page_token_value: Optional[Any] = None,
73    ) -> Optional[Any]:
74        decoded_response = next(self.decoder.decode(response))
75        # The default way that link is presented in requests.Response is a string of various links (last, next, etc). This
76        # is not indexable or useful for parsing the cursor, so we replace it with the link dictionary from response.links
77        headers: Dict[str, Any] = dict(response.headers)
78        headers["link"] = response.links
79        if self._stop_condition:
80            should_stop = self._stop_condition.eval(
81                self.config,
82                response=decoded_response,
83                headers=headers,
84                last_record=last_record,
85                last_page_size=last_page_size,
86            )
87            if should_stop:
88                return None
89        token = self._cursor_value.eval(
90            config=self.config,
91            response=decoded_response,
92            headers=headers,
93            last_record=last_record,
94            last_page_size=last_page_size,
95        )
96        return token if token else None
97
98    def get_page_size(self) -> Optional[int]:
99        return self.page_size

Pagination strategy that evaluates an interpolated string to define the next page token

Attributes:
  • page_size (Optional[int]): the number of records to request
  • cursor_value (Union[InterpolatedString, str]): template string evaluating to the cursor value
  • config (Config): connection config
  • stop_condition (Optional[InterpolatedBoolean]): template string evaluating when to stop paginating
  • decoder (Decoder): decoder to decode the response
CursorPaginationStrategy( cursor_value: Union[airbyte_cdk.InterpolatedString, str], config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], page_size: Optional[int] = None, stop_condition: Union[airbyte_cdk.InterpolatedBoolean, str, NoneType] = None, decoder: airbyte_cdk.Decoder = <factory>)
cursor_value: Union[airbyte_cdk.InterpolatedString, str]
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
page_size: Optional[int] = None
stop_condition: Union[airbyte_cdk.InterpolatedBoolean, str, NoneType] = None
initial_token: Optional[Any]
58    @property
59    def initial_token(self) -> Optional[Any]:
60        """
61        CursorPaginationStrategy does not have an initial value because the next cursor is typically included
62        in the response of the first request. For Resumable Full Refresh streams that checkpoint the page
63        cursor, the next cursor should be read from the state or stream slice object.
64        """
65        return None

CursorPaginationStrategy does not have an initial value because the next cursor is typically included in the response of the first request. For Resumable Full Refresh streams that checkpoint the page cursor, the next cursor should be read from the state or stream slice object.

def next_page_token( self, response: requests.models.Response, last_page_size: int, last_record: Optional[airbyte_cdk.Record], last_page_token_value: Optional[Any] = None) -> Optional[Any]:
67    def next_page_token(
68        self,
69        response: requests.Response,
70        last_page_size: int,
71        last_record: Optional[Record],
72        last_page_token_value: Optional[Any] = None,
73    ) -> Optional[Any]:
74        decoded_response = next(self.decoder.decode(response))
75        # The default way that link is presented in requests.Response is a string of various links (last, next, etc). This
76        # is not indexable or useful for parsing the cursor, so we replace it with the link dictionary from response.links
77        headers: Dict[str, Any] = dict(response.headers)
78        headers["link"] = response.links
79        if self._stop_condition:
80            should_stop = self._stop_condition.eval(
81                self.config,
82                response=decoded_response,
83                headers=headers,
84                last_record=last_record,
85                last_page_size=last_page_size,
86            )
87            if should_stop:
88                return None
89        token = self._cursor_value.eval(
90            config=self.config,
91            response=decoded_response,
92            headers=headers,
93            last_record=last_record,
94            last_page_size=last_page_size,
95        )
96        return token if token else None
Parameters
  • response: response to process
  • last_page_size: the number of records read from the response
  • last_record: the last record extracted from the response
  • last_page_token_value: The current value of the page token made on the last request
Returns

next page token. Returns None if there are no more pages to fetch

def get_page_size(self) -> Optional[int]:
98    def get_page_size(self) -> Optional[int]:
99        return self.page_size
Returns

page size: The number of records to fetch in a page. Returns None if unspecified

30class CursorStopCondition(PaginationStopCondition):
31    def __init__(
32        self,
33        cursor: DeclarativeCursor
34        | ConcurrentCursor,  # migrate to use both old and concurrent versions
35    ):
36        self._cursor = cursor
37
38    def is_met(self, record: Record) -> bool:
39        return not self._cursor.should_be_synced(record)

Helper class that provides a standard way to create an ABC using inheritance.

31    def __init__(
32        self,
33        cursor: DeclarativeCursor
34        | ConcurrentCursor,  # migrate to use both old and concurrent versions
35    ):
36        self._cursor = cursor
def is_met(self, record: airbyte_cdk.Record) -> bool:
38    def is_met(self, record: Record) -> bool:
39        return not self._cursor.should_be_synced(record)

Given a condition is met, the pagination will stop

Parameters
  • record: a record used to evaluate the condition
 23@dataclass
 24class OffsetIncrement(PaginationStrategy):
 25    """
 26    Pagination strategy that returns the number of records reads so far and returns it as the next page token
 27    Examples:
 28        # page_size to be a constant integer value
 29        pagination_strategy:
 30          type: OffsetIncrement
 31          page_size: 2
 32
 33        # page_size to be a constant string value
 34        pagination_strategy:
 35          type: OffsetIncrement
 36          page_size: "2"
 37
 38        # page_size to be an interpolated string value
 39        pagination_strategy:
 40          type: OffsetIncrement
 41          page_size: "{{ parameters['items_per_page'] }}"
 42
 43    Attributes:
 44        page_size (InterpolatedString): the number of records to request
 45    """
 46
 47    config: Config
 48    page_size: Optional[Union[str, int]]
 49    parameters: InitVar[Mapping[str, Any]]
 50    decoder: Decoder = field(
 51        default_factory=lambda: PaginationDecoderDecorator(decoder=JsonDecoder(parameters={}))
 52    )
 53    inject_on_first_request: bool = False
 54
 55    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 56        page_size = str(self.page_size) if isinstance(self.page_size, int) else self.page_size
 57        if page_size:
 58            self._page_size: Optional[InterpolatedString] = InterpolatedString(
 59                page_size, parameters=parameters
 60            )
 61        else:
 62            self._page_size = None
 63
 64    @property
 65    def initial_token(self) -> Optional[Any]:
 66        if self.inject_on_first_request:
 67            return 0
 68        return None
 69
 70    def next_page_token(
 71        self,
 72        response: requests.Response,
 73        last_page_size: int,
 74        last_record: Optional[Record],
 75        last_page_token_value: Optional[Any] = None,
 76    ) -> Optional[Any]:
 77        decoded_response = next(self.decoder.decode(response))
 78
 79        # Stop paginating when there are fewer records than the page size or the current page has no records
 80        if (
 81            self._page_size
 82            and last_page_size < self._page_size.eval(self.config, response=decoded_response)
 83        ) or last_page_size == 0:
 84            return None
 85        elif last_page_token_value is None:
 86            # If the OffsetIncrement strategy does not inject on the first request, the incoming last_page_token_value
 87            # will be None. For this case, we assume that None was the first page and progress to the next offset
 88            return 0 + last_page_size
 89        elif not isinstance(last_page_token_value, int):
 90            raise ValueError(
 91                f"Last page token value {last_page_token_value} for OffsetIncrement pagination strategy was not an integer"
 92            )
 93        else:
 94            return last_page_token_value + last_page_size
 95
 96    def get_page_size(self) -> Optional[int]:
 97        if self._page_size:
 98            page_size = self._page_size.eval(self.config)
 99            if not isinstance(page_size, int):
100                raise Exception(f"{page_size} is of type {type(page_size)}. Expected {int}")
101            return page_size
102        else:
103            return None

Pagination strategy that returns the number of records reads so far and returns it as the next page token

Examples:

page_size to be a constant integer value

pagination_strategy: type: OffsetIncrement page_size: 2

page_size to be a constant string value

pagination_strategy: type: OffsetIncrement page_size: "2"

page_size to be an interpolated string value

pagination_strategy: type: OffsetIncrement page_size: "{{ parameters['items_per_page'] }}"

Attributes:
  • page_size (InterpolatedString): the number of records to request
OffsetIncrement( config: Mapping[str, Any], page_size: Union[str, int, NoneType], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], decoder: airbyte_cdk.Decoder = <factory>, inject_on_first_request: bool = False)
config: Mapping[str, Any]
page_size: Union[str, int, NoneType]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
inject_on_first_request: bool = False
initial_token: Optional[Any]
64    @property
65    def initial_token(self) -> Optional[Any]:
66        if self.inject_on_first_request:
67            return 0
68        return None

Return the initial value of the token

def next_page_token( self, response: requests.models.Response, last_page_size: int, last_record: Optional[airbyte_cdk.Record], last_page_token_value: Optional[Any] = None) -> Optional[Any]:
70    def next_page_token(
71        self,
72        response: requests.Response,
73        last_page_size: int,
74        last_record: Optional[Record],
75        last_page_token_value: Optional[Any] = None,
76    ) -> Optional[Any]:
77        decoded_response = next(self.decoder.decode(response))
78
79        # Stop paginating when there are fewer records than the page size or the current page has no records
80        if (
81            self._page_size
82            and last_page_size < self._page_size.eval(self.config, response=decoded_response)
83        ) or last_page_size == 0:
84            return None
85        elif last_page_token_value is None:
86            # If the OffsetIncrement strategy does not inject on the first request, the incoming last_page_token_value
87            # will be None. For this case, we assume that None was the first page and progress to the next offset
88            return 0 + last_page_size
89        elif not isinstance(last_page_token_value, int):
90            raise ValueError(
91                f"Last page token value {last_page_token_value} for OffsetIncrement pagination strategy was not an integer"
92            )
93        else:
94            return last_page_token_value + last_page_size
Parameters
  • response: response to process
  • last_page_size: the number of records read from the response
  • last_record: the last record extracted from the response
  • last_page_token_value: The current value of the page token made on the last request
Returns

next page token. Returns None if there are no more pages to fetch

def get_page_size(self) -> Optional[int]:
 96    def get_page_size(self) -> Optional[int]:
 97        if self._page_size:
 98            page_size = self._page_size.eval(self.config)
 99            if not isinstance(page_size, int):
100                raise Exception(f"{page_size} is of type {type(page_size)}. Expected {int}")
101            return page_size
102        else:
103            return None
Returns

page size: The number of records to fetch in a page. Returns None if unspecified

18@dataclass
19class PageIncrement(PaginationStrategy):
20    """
21    Pagination strategy that returns the number of pages reads so far and returns it as the next page token
22
23    Attributes:
24        page_size (int): the number of records to request
25        start_from_page (int): number of the initial page
26    """
27
28    config: Config
29    page_size: Optional[Union[str, int]]
30    parameters: InitVar[Mapping[str, Any]]
31    start_from_page: int = 0
32    inject_on_first_request: bool = False
33
34    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
35        if isinstance(self.page_size, int) or (self.page_size is None):
36            self._page_size = self.page_size
37        else:
38            page_size = InterpolatedString(self.page_size, parameters=parameters).eval(self.config)
39            if not isinstance(page_size, int):
40                raise Exception(f"{page_size} is of type {type(page_size)}. Expected {int}")
41            self._page_size = page_size
42
43    @property
44    def initial_token(self) -> Optional[Any]:
45        if self.inject_on_first_request:
46            return self.start_from_page
47        return None
48
49    def next_page_token(
50        self,
51        response: requests.Response,
52        last_page_size: int,
53        last_record: Optional[Record],
54        last_page_token_value: Optional[Any],
55    ) -> Optional[Any]:
56        # Stop paginating when there are fewer records than the page size or the current page has no records
57        if (self._page_size and last_page_size < self._page_size) or last_page_size == 0:
58            return None
59        elif last_page_token_value is None:
60            # If the PageIncrement strategy does not inject on the first request, the incoming last_page_token_value
61            # may be None. When this is the case, we assume we've already requested the first page specified by
62            # start_from_page and must now get the next page
63            return self.start_from_page + 1
64        elif not isinstance(last_page_token_value, int):
65            raise ValueError(
66                f"Last page token value {last_page_token_value} for PageIncrement pagination strategy was not an integer"
67            )
68        else:
69            return last_page_token_value + 1
70
71    def get_page_size(self) -> Optional[int]:
72        return self._page_size

Pagination strategy that returns the number of pages reads so far and returns it as the next page token

Attributes:
  • page_size (int): the number of records to request
  • start_from_page (int): number of the initial page
PageIncrement( config: Mapping[str, Any], page_size: Union[str, int, NoneType], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], start_from_page: int = 0, inject_on_first_request: bool = False)
config: Mapping[str, Any]
page_size: Union[str, int, NoneType]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
start_from_page: int = 0
inject_on_first_request: bool = False
initial_token: Optional[Any]
43    @property
44    def initial_token(self) -> Optional[Any]:
45        if self.inject_on_first_request:
46            return self.start_from_page
47        return None

Return the initial value of the token

def next_page_token( self, response: requests.models.Response, last_page_size: int, last_record: Optional[airbyte_cdk.Record], last_page_token_value: Optional[Any]) -> Optional[Any]:
49    def next_page_token(
50        self,
51        response: requests.Response,
52        last_page_size: int,
53        last_record: Optional[Record],
54        last_page_token_value: Optional[Any],
55    ) -> Optional[Any]:
56        # Stop paginating when there are fewer records than the page size or the current page has no records
57        if (self._page_size and last_page_size < self._page_size) or last_page_size == 0:
58            return None
59        elif last_page_token_value is None:
60            # If the PageIncrement strategy does not inject on the first request, the incoming last_page_token_value
61            # may be None. When this is the case, we assume we've already requested the first page specified by
62            # start_from_page and must now get the next page
63            return self.start_from_page + 1
64        elif not isinstance(last_page_token_value, int):
65            raise ValueError(
66                f"Last page token value {last_page_token_value} for PageIncrement pagination strategy was not an integer"
67            )
68        else:
69            return last_page_token_value + 1
Parameters
  • response: response to process
  • last_page_size: the number of records read from the response
  • last_record: the last record extracted from the response
  • last_page_token_value: The current value of the page token made on the last request
Returns

next page token. Returns None if there are no more pages to fetch

def get_page_size(self) -> Optional[int]:
71    def get_page_size(self) -> Optional[int]:
72        return self._page_size
Returns

page size: The number of records to fetch in a page. Returns None if unspecified

42class StopConditionPaginationStrategyDecorator(PaginationStrategy):
43    def __init__(self, _delegate: PaginationStrategy, stop_condition: PaginationStopCondition):
44        self._delegate = _delegate
45        self._stop_condition = stop_condition
46
47    def next_page_token(
48        self,
49        response: requests.Response,
50        last_page_size: int,
51        last_record: Optional[Record],
52        last_page_token_value: Optional[Any] = None,
53    ) -> Optional[Any]:
54        # We evaluate in reverse order because the assumption is that most of the APIs using data feed structure
55        # will return records in descending order. In terms of performance/memory, we return the records lazily
56        if last_record and self._stop_condition.is_met(last_record):
57            return None
58        return self._delegate.next_page_token(
59            response, last_page_size, last_record, last_page_token_value
60        )
61
62    def get_page_size(self) -> Optional[int]:
63        return self._delegate.get_page_size()
64
65    @property
66    def initial_token(self) -> Optional[Any]:
67        return self._delegate.initial_token

Defines how to get the next page token

43    def __init__(self, _delegate: PaginationStrategy, stop_condition: PaginationStopCondition):
44        self._delegate = _delegate
45        self._stop_condition = stop_condition
def next_page_token( self, response: requests.models.Response, last_page_size: int, last_record: Optional[airbyte_cdk.Record], last_page_token_value: Optional[Any] = None) -> Optional[Any]:
47    def next_page_token(
48        self,
49        response: requests.Response,
50        last_page_size: int,
51        last_record: Optional[Record],
52        last_page_token_value: Optional[Any] = None,
53    ) -> Optional[Any]:
54        # We evaluate in reverse order because the assumption is that most of the APIs using data feed structure
55        # will return records in descending order. In terms of performance/memory, we return the records lazily
56        if last_record and self._stop_condition.is_met(last_record):
57            return None
58        return self._delegate.next_page_token(
59            response, last_page_size, last_record, last_page_token_value
60        )
Parameters
  • response: response to process
  • last_page_size: the number of records read from the response
  • last_record: the last record extracted from the response
  • last_page_token_value: The current value of the page token made on the last request
Returns

next page token. Returns None if there are no more pages to fetch

def get_page_size(self) -> Optional[int]:
62    def get_page_size(self) -> Optional[int]:
63        return self._delegate.get_page_size()
Returns

page size: The number of records to fetch in a page. Returns None if unspecified

initial_token: Optional[Any]
65    @property
66    def initial_token(self) -> Optional[Any]:
67        return self._delegate.initial_token

Return the initial value of the token