airbyte_cdk.sources.declarative.requesters.paginators

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5from airbyte_cdk.sources.declarative.requesters.paginators.default_paginator import (
 6    DefaultPaginator,
 7    PaginatorTestReadDecorator,
 8)
 9from airbyte_cdk.sources.declarative.requesters.paginators.no_pagination import NoPagination
10from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator
11from airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy import (
12    PaginationStrategy,
13)
14
15__all__ = [
16    "DefaultPaginator",
17    "NoPagination",
18    "PaginationStrategy",
19    "Paginator",
20    "PaginatorTestReadDecorator",
21]
@dataclass
class DefaultPaginator(airbyte_cdk.sources.declarative.requesters.paginators.Paginator):
 33@dataclass
 34class DefaultPaginator(Paginator):
 35    """
 36    Default paginator to request pages of results with a fixed size until the pagination strategy no longer returns a next_page_token
 37
 38    Examples:
 39        1.
 40        * fetches up to 10 records at a time by setting the "limit" request param to 10
 41        * updates the request path with  "{{ response._metadata.next }}"
 42        ```
 43          paginator:
 44            type: "DefaultPaginator"
 45            page_size_option:
 46              type: RequestOption
 47              inject_into: request_parameter
 48              field_name: limit
 49            page_token_option:
 50              type: RequestPath
 51              path: "location"
 52            pagination_strategy:
 53              type: "CursorPagination"
 54              cursor_value: "{{ response._metadata.next }}"
 55              page_size: 10
 56        ```
 57
 58        2.
 59        * fetches up to 5 records at a time by setting the "page_size" header to 5
 60        * increments a record counter and set the request parameter "offset" to the value of the counter
 61        ```
 62          paginator:
 63            type: "DefaultPaginator"
 64            page_size_option:
 65              type: RequestOption
 66              inject_into: header
 67              field_name: page_size
 68            pagination_strategy:
 69              type: "OffsetIncrement"
 70              page_size: 5
 71            page_token_option:
 72              option_type: "request_parameter"
 73              field_name: "offset"
 74        ```
 75
 76        3.
 77        * fetches up to 5 records at a time by setting the "page_size" request param to 5
 78        * increments a page counter and set the request parameter "page" to the value of the counter
 79        ```
 80          paginator:
 81            type: "DefaultPaginator"
 82            page_size_option:
 83              type: RequestOption
 84              inject_into: request_parameter
 85              field_name: page_size
 86            pagination_strategy:
 87              type: "PageIncrement"
 88              page_size: 5
 89            page_token_option:
 90              type: RequestOption
 91              option_type: "request_parameter"
 92              field_name: "page"
 93        ```
 94    Attributes:
 95        page_size_option (Optional[RequestOption]): the request option to set the page size. Cannot be injected in the path.
 96        page_token_option (Optional[RequestPath, RequestOption]): the request option to set the page token
 97        pagination_strategy (PaginationStrategy): Strategy defining how to get the next page token
 98        config (Config): connection config
 99        url_base (Union[InterpolatedString, str]): endpoint's base url
100        decoder (Decoder): decoder to decode the response
101    """
102
103    pagination_strategy: PaginationStrategy
104    config: Config
105    url_base: Union[InterpolatedString, str]
106    parameters: InitVar[Mapping[str, Any]]
107    decoder: Decoder = field(
108        default_factory=lambda: PaginationDecoderDecorator(decoder=JsonDecoder(parameters={}))
109    )
110    page_size_option: Optional[RequestOption] = None
111    page_token_option: Optional[Union[RequestPath, RequestOption]] = None
112
113    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
114        if self.page_size_option and not self.pagination_strategy.get_page_size():
115            raise ValueError(
116                "page_size_option cannot be set if the pagination strategy does not have a page_size"
117            )
118        if isinstance(self.url_base, str):
119            self.url_base = InterpolatedString(string=self.url_base, parameters=parameters)
120
121        if self.page_token_option and not isinstance(self.page_token_option, RequestPath):
122            _validate_component_request_option_paths(
123                self.config,
124                self.page_size_option,
125                self.page_token_option,
126            )
127
128    def get_initial_token(self) -> Optional[Any]:
129        """
130        Return the page token that should be used for the first request of a stream
131
132        WARNING: get_initial_token() should not be used by streams that use RFR that perform checkpointing
133        of state using page numbers. Because paginators are stateless
134        """
135        return self.pagination_strategy.initial_token
136
137    def next_page_token(
138        self,
139        response: requests.Response,
140        last_page_size: int,
141        last_record: Optional[Record],
142        last_page_token_value: Optional[Any] = None,
143    ) -> Optional[Mapping[str, Any]]:
144        next_page_token = self.pagination_strategy.next_page_token(
145            response=response,
146            last_page_size=last_page_size,
147            last_record=last_record,
148            last_page_token_value=last_page_token_value,
149        )
150        if next_page_token:
151            return {"next_page_token": next_page_token}
152        else:
153            return None
154
155    def path(
156        self,
157        next_page_token: Optional[Mapping[str, Any]],
158        stream_state: Optional[Mapping[str, Any]] = None,
159        stream_slice: Optional[StreamSlice] = None,
160    ) -> Optional[str]:
161        token = next_page_token.get("next_page_token") if next_page_token else None
162        if token and self.page_token_option and isinstance(self.page_token_option, RequestPath):
163            return str(token)
164        else:
165            return None
166
167    def get_request_params(
168        self,
169        *,
170        stream_state: Optional[StreamState] = None,
171        stream_slice: Optional[StreamSlice] = None,
172        next_page_token: Optional[Mapping[str, Any]] = None,
173    ) -> MutableMapping[str, Any]:
174        return self._get_request_options(RequestOptionType.request_parameter, next_page_token)
175
176    def get_request_headers(
177        self,
178        *,
179        stream_state: Optional[StreamState] = None,
180        stream_slice: Optional[StreamSlice] = None,
181        next_page_token: Optional[Mapping[str, Any]] = None,
182    ) -> Mapping[str, str]:
183        return self._get_request_options(RequestOptionType.header, next_page_token)
184
185    def get_request_body_data(
186        self,
187        *,
188        stream_state: Optional[StreamState] = None,
189        stream_slice: Optional[StreamSlice] = None,
190        next_page_token: Optional[Mapping[str, Any]] = None,
191    ) -> Mapping[str, Any]:
192        return self._get_request_options(RequestOptionType.body_data, next_page_token)
193
194    def get_request_body_json(
195        self,
196        *,
197        stream_state: Optional[StreamState] = None,
198        stream_slice: Optional[StreamSlice] = None,
199        next_page_token: Optional[Mapping[str, Any]] = None,
200    ) -> Mapping[str, Any]:
201        return self._get_request_options(RequestOptionType.body_json, next_page_token)
202
203    def _get_request_options(
204        self, option_type: RequestOptionType, next_page_token: Optional[Mapping[str, Any]]
205    ) -> MutableMapping[str, Any]:
206        options: MutableMapping[str, Any] = {}
207
208        token = next_page_token.get("next_page_token") if next_page_token else None
209        if (
210            self.page_token_option
211            and token is not None
212            and isinstance(self.page_token_option, RequestOption)
213            and self.page_token_option.inject_into == option_type
214        ):
215            self.page_token_option.inject_into_request(options, token, self.config)
216
217        if (
218            self.page_size_option
219            and self.pagination_strategy.get_page_size()
220            and self.page_size_option.inject_into == option_type
221        ):
222            page_size = self.pagination_strategy.get_page_size()
223            self.page_size_option.inject_into_request(options, page_size, self.config)
224
225        return options

Default paginator to request pages of results with a fixed size until the pagination strategy no longer returns a next_page_token

Examples:

1.

  • fetches up to 10 records at a time by setting the "limit" request param to 10
  • updates the request path with "{{ response._metadata.next }}"
  paginator:
    type: "DefaultPaginator"
    page_size_option:
      type: RequestOption
      inject_into: request_parameter
      field_name: limit
    page_token_option:
      type: RequestPath
      path: "location"
    pagination_strategy:
      type: "CursorPagination"
      cursor_value: "{{ response._metadata.next }}"
      page_size: 10

2.

  • fetches up to 5 records at a time by setting the "page_size" header to 5
  • increments a record counter and set the request parameter "offset" to the value of the counter
  paginator:
    type: "DefaultPaginator"
    page_size_option:
      type: RequestOption
      inject_into: header
      field_name: page_size
    pagination_strategy:
      type: "OffsetIncrement"
      page_size: 5
    page_token_option:
      option_type: "request_parameter"
      field_name: "offset"

3.

  • fetches up to 5 records at a time by setting the "page_size" request param to 5
  • increments a page counter and set the request parameter "page" to the value of the counter
  paginator:
    type: "DefaultPaginator"
    page_size_option:
      type: RequestOption
      inject_into: request_parameter
      field_name: page_size
    pagination_strategy:
      type: "PageIncrement"
      page_size: 5
    page_token_option:
      type: RequestOption
      option_type: "request_parameter"
      field_name: "page"
Attributes:
  • page_size_option (Optional[RequestOption]): the request option to set the page size. Cannot be injected in the path.
  • page_token_option (Optional[RequestPath, RequestOption]): the request option to set the page token
  • pagination_strategy (PaginationStrategy): Strategy defining how to get the next page token
  • config (Config): connection config
  • url_base (Union[InterpolatedString, str]): endpoint's base url
  • decoder (Decoder): decoder to decode the response
DefaultPaginator( pagination_strategy: PaginationStrategy, config: Mapping[str, Any], url_base: Union[airbyte_cdk.InterpolatedString, str], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], decoder: airbyte_cdk.Decoder = <factory>, page_size_option: Optional[airbyte_cdk.RequestOption] = None, page_token_option: Union[airbyte_cdk.sources.declarative.requesters.request_path.RequestPath, airbyte_cdk.RequestOption, NoneType] = None)
pagination_strategy: PaginationStrategy
config: Mapping[str, Any]
url_base: Union[airbyte_cdk.InterpolatedString, str]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
page_size_option: Optional[airbyte_cdk.RequestOption] = None
def get_initial_token(self) -> Optional[Any]:
128    def get_initial_token(self) -> Optional[Any]:
129        """
130        Return the page token that should be used for the first request of a stream
131
132        WARNING: get_initial_token() should not be used by streams that use RFR that perform checkpointing
133        of state using page numbers. Because paginators are stateless
134        """
135        return self.pagination_strategy.initial_token

Return the page token that should be used for the first request of a stream

WARNING: get_initial_token() should not be used by streams that use RFR that perform checkpointing of state using page numbers. Because paginators are stateless

def next_page_token( self, response: requests.models.Response, last_page_size: int, last_record: Optional[airbyte_cdk.Record], last_page_token_value: Optional[Any] = None) -> Optional[Mapping[str, Any]]:
137    def next_page_token(
138        self,
139        response: requests.Response,
140        last_page_size: int,
141        last_record: Optional[Record],
142        last_page_token_value: Optional[Any] = None,
143    ) -> Optional[Mapping[str, Any]]:
144        next_page_token = self.pagination_strategy.next_page_token(
145            response=response,
146            last_page_size=last_page_size,
147            last_record=last_record,
148            last_page_token_value=last_page_token_value,
149        )
150        if next_page_token:
151            return {"next_page_token": next_page_token}
152        else:
153            return None

Returns the next_page_token to use to fetch the next page of records.

Parameters
  • response: the response to process
  • last_page_size: the number of records read from the response
  • last_record: the last record extracted from the response
  • last_page_token_value: The current value of the page token made on the last request
Returns

A mapping {"next_page_token": } for the next page from the input response object. Returning None means there are no more pages to read in this response.

def path( self, next_page_token: Optional[Mapping[str, Any]], stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Optional[str]:
155    def path(
156        self,
157        next_page_token: Optional[Mapping[str, Any]],
158        stream_state: Optional[Mapping[str, Any]] = None,
159        stream_slice: Optional[StreamSlice] = None,
160    ) -> Optional[str]:
161        token = next_page_token.get("next_page_token") if next_page_token else None
162        if token and self.page_token_option and isinstance(self.page_token_option, RequestPath):
163            return str(token)
164        else:
165            return None

Returns the URL path to hit to fetch the next page of records

e.g: if you wanted to hit https://myapi.com/v1/some_entity then this will return "some_entity"

Returns

path to hit to fetch the next request. Returning None means the path is not defined by the next_page_token

def get_request_params( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> MutableMapping[str, Any]:
167    def get_request_params(
168        self,
169        *,
170        stream_state: Optional[StreamState] = None,
171        stream_slice: Optional[StreamSlice] = None,
172        next_page_token: Optional[Mapping[str, Any]] = None,
173    ) -> MutableMapping[str, Any]:
174        return self._get_request_options(RequestOptionType.request_parameter, next_page_token)

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def get_request_headers( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, str]:
176    def get_request_headers(
177        self,
178        *,
179        stream_state: Optional[StreamState] = None,
180        stream_slice: Optional[StreamSlice] = None,
181        next_page_token: Optional[Mapping[str, Any]] = None,
182    ) -> Mapping[str, str]:
183        return self._get_request_options(RequestOptionType.header, next_page_token)

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def get_request_body_data( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
185    def get_request_body_data(
186        self,
187        *,
188        stream_state: Optional[StreamState] = None,
189        stream_slice: Optional[StreamSlice] = None,
190        next_page_token: Optional[Mapping[str, Any]] = None,
191    ) -> Mapping[str, Any]:
192        return self._get_request_options(RequestOptionType.body_data, next_page_token)

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_request_body_json( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
194    def get_request_body_json(
195        self,
196        *,
197        stream_state: Optional[StreamState] = None,
198        stream_slice: Optional[StreamSlice] = None,
199        next_page_token: Optional[Mapping[str, Any]] = None,
200    ) -> Mapping[str, Any]:
201        return self._get_request_options(RequestOptionType.body_json, next_page_token)

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

@dataclass
class NoPagination(airbyte_cdk.sources.declarative.requesters.paginators.Paginator):
15@dataclass
16class NoPagination(Paginator):
17    """
18    Pagination implementation that never returns a next page.
19    """
20
21    parameters: InitVar[Mapping[str, Any]]
22
23    def path(
24        self,
25        next_page_token: Optional[Mapping[str, Any]],
26        stream_state: Optional[Mapping[str, Any]] = None,
27        stream_slice: Optional[StreamSlice] = None,
28    ) -> Optional[str]:
29        return None
30
31    def get_request_params(
32        self,
33        *,
34        stream_state: Optional[StreamState] = None,
35        stream_slice: Optional[StreamSlice] = None,
36        next_page_token: Optional[Mapping[str, Any]] = None,
37    ) -> MutableMapping[str, Any]:
38        return {}
39
40    def get_request_headers(
41        self,
42        *,
43        stream_state: Optional[StreamState] = None,
44        stream_slice: Optional[StreamSlice] = None,
45        next_page_token: Optional[Mapping[str, Any]] = None,
46    ) -> Mapping[str, str]:
47        return {}
48
49    def get_request_body_data(
50        self,
51        *,
52        stream_state: Optional[StreamState] = None,
53        stream_slice: Optional[StreamSlice] = None,
54        next_page_token: Optional[Mapping[str, Any]] = None,
55    ) -> Union[Mapping[str, Any], str]:
56        return {}
57
58    def get_request_body_json(
59        self,
60        *,
61        stream_state: Optional[StreamState] = None,
62        stream_slice: Optional[StreamSlice] = None,
63        next_page_token: Optional[Mapping[str, Any]] = None,
64    ) -> Mapping[str, Any]:
65        return {}
66
67    def get_initial_token(self) -> Optional[Any]:
68        return None
69
70    def next_page_token(
71        self,
72        response: requests.Response,
73        last_page_size: int,
74        last_record: Optional[Record],
75        last_page_token_value: Optional[Any],
76    ) -> Optional[Mapping[str, Any]]:
77        return {}

Pagination implementation that never returns a next page.

NoPagination(parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def path( self, next_page_token: Optional[Mapping[str, Any]], stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Optional[str]:
23    def path(
24        self,
25        next_page_token: Optional[Mapping[str, Any]],
26        stream_state: Optional[Mapping[str, Any]] = None,
27        stream_slice: Optional[StreamSlice] = None,
28    ) -> Optional[str]:
29        return None

Returns the URL path to hit to fetch the next page of records

e.g: if you wanted to hit https://myapi.com/v1/some_entity then this will return "some_entity"

Returns

path to hit to fetch the next request. Returning None means the path is not defined by the next_page_token

def get_request_params( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> MutableMapping[str, Any]:
31    def get_request_params(
32        self,
33        *,
34        stream_state: Optional[StreamState] = None,
35        stream_slice: Optional[StreamSlice] = None,
36        next_page_token: Optional[Mapping[str, Any]] = None,
37    ) -> MutableMapping[str, Any]:
38        return {}

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def get_request_headers( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, str]:
40    def get_request_headers(
41        self,
42        *,
43        stream_state: Optional[StreamState] = None,
44        stream_slice: Optional[StreamSlice] = None,
45        next_page_token: Optional[Mapping[str, Any]] = None,
46    ) -> Mapping[str, str]:
47        return {}

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def get_request_body_data( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Union[Mapping[str, Any], str]:
49    def get_request_body_data(
50        self,
51        *,
52        stream_state: Optional[StreamState] = None,
53        stream_slice: Optional[StreamSlice] = None,
54        next_page_token: Optional[Mapping[str, Any]] = None,
55    ) -> Union[Mapping[str, Any], str]:
56        return {}

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_request_body_json( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
58    def get_request_body_json(
59        self,
60        *,
61        stream_state: Optional[StreamState] = None,
62        stream_slice: Optional[StreamSlice] = None,
63        next_page_token: Optional[Mapping[str, Any]] = None,
64    ) -> Mapping[str, Any]:
65        return {}

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_initial_token(self) -> Optional[Any]:
67    def get_initial_token(self) -> Optional[Any]:
68        return None

Get the page token that should be included in the request to get the first page of records

def next_page_token( self, response: requests.models.Response, last_page_size: int, last_record: Optional[airbyte_cdk.Record], last_page_token_value: Optional[Any]) -> Optional[Mapping[str, Any]]:
70    def next_page_token(
71        self,
72        response: requests.Response,
73        last_page_size: int,
74        last_record: Optional[Record],
75        last_page_token_value: Optional[Any],
76    ) -> Optional[Mapping[str, Any]]:
77        return {}

Returns the next_page_token to use to fetch the next page of records.

Parameters
  • response: the response to process
  • last_page_size: the number of records read from the response
  • last_record: the last record extracted from the response
  • last_page_token_value: The current value of the page token made on the last request
Returns

A mapping {"next_page_token": } for the next page from the input response object. Returning None means there are no more pages to read in this response.

@dataclass
class PaginationStrategy:
15@dataclass
16class PaginationStrategy:
17    """
18    Defines how to get the next page token
19    """
20
21    @property
22    @abstractmethod
23    def initial_token(self) -> Optional[Any]:
24        """
25        Return the initial value of the token
26        """
27
28    @abstractmethod
29    def next_page_token(
30        self,
31        response: requests.Response,
32        last_page_size: int,
33        last_record: Optional[Record],
34        last_page_token_value: Optional[Any],
35    ) -> Optional[Any]:
36        """
37        :param response: response to process
38        :param last_page_size: the number of records read from the response
39        :param last_record: the last record extracted from the response
40        :param last_page_token_value: The current value of the page token made on the last request
41        :return: next page token. Returns None if there are no more pages to fetch
42        """
43        pass
44
45    @abstractmethod
46    def get_page_size(self) -> Optional[int]:
47        """
48        :return: page size: The number of records to fetch in a page. Returns None if unspecified
49        """

Defines how to get the next page token

initial_token: Optional[Any]
21    @property
22    @abstractmethod
23    def initial_token(self) -> Optional[Any]:
24        """
25        Return the initial value of the token
26        """

Return the initial value of the token

@abstractmethod
def next_page_token( self, response: requests.models.Response, last_page_size: int, last_record: Optional[airbyte_cdk.Record], last_page_token_value: Optional[Any]) -> Optional[Any]:
28    @abstractmethod
29    def next_page_token(
30        self,
31        response: requests.Response,
32        last_page_size: int,
33        last_record: Optional[Record],
34        last_page_token_value: Optional[Any],
35    ) -> Optional[Any]:
36        """
37        :param response: response to process
38        :param last_page_size: the number of records read from the response
39        :param last_record: the last record extracted from the response
40        :param last_page_token_value: The current value of the page token made on the last request
41        :return: next page token. Returns None if there are no more pages to fetch
42        """
43        pass
Parameters
  • response: response to process
  • last_page_size: the number of records read from the response
  • last_record: the last record extracted from the response
  • last_page_token_value: The current value of the page token made on the last request
Returns

next page token. Returns None if there are no more pages to fetch

@abstractmethod
def get_page_size(self) -> Optional[int]:
45    @abstractmethod
46    def get_page_size(self) -> Optional[int]:
47        """
48        :return: page size: The number of records to fetch in a page. Returns None if unspecified
49        """
Returns

page size: The number of records to fetch in a page. Returns None if unspecified

18@dataclass
19class Paginator(ABC, RequestOptionsProvider):
20    """
21    Defines the token to use to fetch the next page of records from the API.
22
23    If needed, the Paginator will set request options to be set on the HTTP request to fetch the next page of records.
24    If the next_page_token is the path to the next page of records, then it should be accessed through the `path` method
25    """
26
27    @abstractmethod
28    def get_initial_token(self) -> Optional[Any]:
29        """
30        Get the page token that should be included in the request to get the first page of records
31        """
32
33    @abstractmethod
34    def next_page_token(
35        self,
36        response: requests.Response,
37        last_page_size: int,
38        last_record: Optional[Record],
39        last_page_token_value: Optional[Any],
40    ) -> Optional[Mapping[str, Any]]:
41        """
42        Returns the next_page_token to use to fetch the next page of records.
43
44        :param response: the response to process
45        :param last_page_size: the number of records read from the response
46        :param last_record: the last record extracted from the response
47        :param last_page_token_value: The current value of the page token made on the last request
48        :return: A mapping {"next_page_token": <token>} for the next page from the input response object. Returning None means there are no more pages to read in this response.
49        """
50        pass
51
52    @abstractmethod
53    def path(
54        self,
55        next_page_token: Optional[Mapping[str, Any]],
56        stream_state: Optional[Mapping[str, Any]] = None,
57        stream_slice: Optional[StreamSlice] = None,
58    ) -> Optional[str]:
59        """
60        Returns the URL path to hit to fetch the next page of records
61
62        e.g: if you wanted to hit https://myapi.com/v1/some_entity then this will return "some_entity"
63
64        :return: path to hit to fetch the next request. Returning None means the path is not defined by the next_page_token
65        """
66        pass

Defines the token to use to fetch the next page of records from the API.

If needed, the Paginator will set request options to be set on the HTTP request to fetch the next page of records. If the next_page_token is the path to the next page of records, then it should be accessed through the path method

@abstractmethod
def get_initial_token(self) -> Optional[Any]:
27    @abstractmethod
28    def get_initial_token(self) -> Optional[Any]:
29        """
30        Get the page token that should be included in the request to get the first page of records
31        """

Get the page token that should be included in the request to get the first page of records

@abstractmethod
def next_page_token( self, response: requests.models.Response, last_page_size: int, last_record: Optional[airbyte_cdk.Record], last_page_token_value: Optional[Any]) -> Optional[Mapping[str, Any]]:
33    @abstractmethod
34    def next_page_token(
35        self,
36        response: requests.Response,
37        last_page_size: int,
38        last_record: Optional[Record],
39        last_page_token_value: Optional[Any],
40    ) -> Optional[Mapping[str, Any]]:
41        """
42        Returns the next_page_token to use to fetch the next page of records.
43
44        :param response: the response to process
45        :param last_page_size: the number of records read from the response
46        :param last_record: the last record extracted from the response
47        :param last_page_token_value: The current value of the page token made on the last request
48        :return: A mapping {"next_page_token": <token>} for the next page from the input response object. Returning None means there are no more pages to read in this response.
49        """
50        pass

Returns the next_page_token to use to fetch the next page of records.

Parameters
  • response: the response to process
  • last_page_size: the number of records read from the response
  • last_record: the last record extracted from the response
  • last_page_token_value: The current value of the page token made on the last request
Returns

A mapping {"next_page_token": } for the next page from the input response object. Returning None means there are no more pages to read in this response.

@abstractmethod
def path( self, next_page_token: Optional[Mapping[str, Any]], stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Optional[str]:
52    @abstractmethod
53    def path(
54        self,
55        next_page_token: Optional[Mapping[str, Any]],
56        stream_state: Optional[Mapping[str, Any]] = None,
57        stream_slice: Optional[StreamSlice] = None,
58    ) -> Optional[str]:
59        """
60        Returns the URL path to hit to fetch the next page of records
61
62        e.g: if you wanted to hit https://myapi.com/v1/some_entity then this will return "some_entity"
63
64        :return: path to hit to fetch the next request. Returning None means the path is not defined by the next_page_token
65        """
66        pass

Returns the URL path to hit to fetch the next page of records

e.g: if you wanted to hit https://myapi.com/v1/some_entity then this will return "some_entity"

Returns

path to hit to fetch the next request. Returning None means the path is not defined by the next_page_token

class PaginatorTestReadDecorator(airbyte_cdk.sources.declarative.requesters.paginators.Paginator):
228class PaginatorTestReadDecorator(Paginator):
229    """
230    In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of
231    pages that are queried throughout a read command.
232
233    WARNING: This decorator is not currently thread-safe like the rest of the low-code framework because it has
234    an internal state to track the current number of pages counted so that it can exit early during a test read
235    """
236
237    _PAGE_COUNT_BEFORE_FIRST_NEXT_CALL = 1
238
239    def __init__(self, decorated: Paginator, maximum_number_of_pages: int = 5) -> None:
240        if maximum_number_of_pages and maximum_number_of_pages < 1:
241            raise ValueError(
242                f"The maximum number of pages on a test read needs to be strictly positive. Got {maximum_number_of_pages}"
243            )
244        self._maximum_number_of_pages = maximum_number_of_pages
245        self._decorated = decorated
246        self._page_count = self._PAGE_COUNT_BEFORE_FIRST_NEXT_CALL
247
248    def get_initial_token(self) -> Optional[Any]:
249        self._page_count = self._PAGE_COUNT_BEFORE_FIRST_NEXT_CALL
250        return self._decorated.get_initial_token()
251
252    def next_page_token(
253        self,
254        response: requests.Response,
255        last_page_size: int,
256        last_record: Optional[Record],
257        last_page_token_value: Optional[Any] = None,
258    ) -> Optional[Mapping[str, Any]]:
259        if self._page_count >= self._maximum_number_of_pages:
260            return None
261
262        self._page_count += 1
263        return self._decorated.next_page_token(
264            response, last_page_size, last_record, last_page_token_value
265        )
266
267    def path(
268        self,
269        next_page_token: Optional[Mapping[str, Any]],
270        stream_state: Optional[Mapping[str, Any]] = None,
271        stream_slice: Optional[StreamSlice] = None,
272    ) -> Optional[str]:
273        return self._decorated.path(
274            next_page_token=next_page_token,
275            stream_state=stream_state,
276            stream_slice=stream_slice,
277        )
278
279    def get_request_params(
280        self,
281        *,
282        stream_state: Optional[StreamState] = None,
283        stream_slice: Optional[StreamSlice] = None,
284        next_page_token: Optional[Mapping[str, Any]] = None,
285    ) -> Mapping[str, Any]:
286        return self._decorated.get_request_params(
287            stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
288        )
289
290    def get_request_headers(
291        self,
292        *,
293        stream_state: Optional[StreamState] = None,
294        stream_slice: Optional[StreamSlice] = None,
295        next_page_token: Optional[Mapping[str, Any]] = None,
296    ) -> Mapping[str, str]:
297        return self._decorated.get_request_headers(
298            stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
299        )
300
301    def get_request_body_data(
302        self,
303        *,
304        stream_state: Optional[StreamState] = None,
305        stream_slice: Optional[StreamSlice] = None,
306        next_page_token: Optional[Mapping[str, Any]] = None,
307    ) -> Union[Mapping[str, Any], str]:
308        return self._decorated.get_request_body_data(
309            stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
310        )
311
312    def get_request_body_json(
313        self,
314        *,
315        stream_state: Optional[StreamState] = None,
316        stream_slice: Optional[StreamSlice] = None,
317        next_page_token: Optional[Mapping[str, Any]] = None,
318    ) -> Mapping[str, Any]:
319        return self._decorated.get_request_body_json(
320            stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
321        )

In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of pages that are queried throughout a read command.

WARNING: This decorator is not currently thread-safe like the rest of the low-code framework because it has an internal state to track the current number of pages counted so that it can exit early during a test read

PaginatorTestReadDecorator( decorated: Paginator, maximum_number_of_pages: int = 5)
239    def __init__(self, decorated: Paginator, maximum_number_of_pages: int = 5) -> None:
240        if maximum_number_of_pages and maximum_number_of_pages < 1:
241            raise ValueError(
242                f"The maximum number of pages on a test read needs to be strictly positive. Got {maximum_number_of_pages}"
243            )
244        self._maximum_number_of_pages = maximum_number_of_pages
245        self._decorated = decorated
246        self._page_count = self._PAGE_COUNT_BEFORE_FIRST_NEXT_CALL
def get_initial_token(self) -> Optional[Any]:
248    def get_initial_token(self) -> Optional[Any]:
249        self._page_count = self._PAGE_COUNT_BEFORE_FIRST_NEXT_CALL
250        return self._decorated.get_initial_token()

Get the page token that should be included in the request to get the first page of records

def next_page_token( self, response: requests.models.Response, last_page_size: int, last_record: Optional[airbyte_cdk.Record], last_page_token_value: Optional[Any] = None) -> Optional[Mapping[str, Any]]:
252    def next_page_token(
253        self,
254        response: requests.Response,
255        last_page_size: int,
256        last_record: Optional[Record],
257        last_page_token_value: Optional[Any] = None,
258    ) -> Optional[Mapping[str, Any]]:
259        if self._page_count >= self._maximum_number_of_pages:
260            return None
261
262        self._page_count += 1
263        return self._decorated.next_page_token(
264            response, last_page_size, last_record, last_page_token_value
265        )

Returns the next_page_token to use to fetch the next page of records.

Parameters
  • response: the response to process
  • last_page_size: the number of records read from the response
  • last_record: the last record extracted from the response
  • last_page_token_value: The current value of the page token made on the last request
Returns

A mapping {"next_page_token": } for the next page from the input response object. Returning None means there are no more pages to read in this response.

def path( self, next_page_token: Optional[Mapping[str, Any]], stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Optional[str]:
267    def path(
268        self,
269        next_page_token: Optional[Mapping[str, Any]],
270        stream_state: Optional[Mapping[str, Any]] = None,
271        stream_slice: Optional[StreamSlice] = None,
272    ) -> Optional[str]:
273        return self._decorated.path(
274            next_page_token=next_page_token,
275            stream_state=stream_state,
276            stream_slice=stream_slice,
277        )

Returns the URL path to hit to fetch the next page of records

e.g: if you wanted to hit https://myapi.com/v1/some_entity then this will return "some_entity"

Returns

path to hit to fetch the next request. Returning None means the path is not defined by the next_page_token

def get_request_params( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
279    def get_request_params(
280        self,
281        *,
282        stream_state: Optional[StreamState] = None,
283        stream_slice: Optional[StreamSlice] = None,
284        next_page_token: Optional[Mapping[str, Any]] = None,
285    ) -> Mapping[str, Any]:
286        return self._decorated.get_request_params(
287            stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
288        )

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def get_request_headers( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, str]:
290    def get_request_headers(
291        self,
292        *,
293        stream_state: Optional[StreamState] = None,
294        stream_slice: Optional[StreamSlice] = None,
295        next_page_token: Optional[Mapping[str, Any]] = None,
296    ) -> Mapping[str, str]:
297        return self._decorated.get_request_headers(
298            stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
299        )

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def get_request_body_data( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Union[Mapping[str, Any], str]:
301    def get_request_body_data(
302        self,
303        *,
304        stream_state: Optional[StreamState] = None,
305        stream_slice: Optional[StreamSlice] = None,
306        next_page_token: Optional[Mapping[str, Any]] = None,
307    ) -> Union[Mapping[str, Any], str]:
308        return self._decorated.get_request_body_data(
309            stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
310        )

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_request_body_json( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
312    def get_request_body_json(
313        self,
314        *,
315        stream_state: Optional[StreamState] = None,
316        stream_slice: Optional[StreamSlice] = None,
317        next_page_token: Optional[Mapping[str, Any]] = None,
318    ) -> Mapping[str, Any]:
319        return self._decorated.get_request_body_json(
320            stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
321        )

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.