airbyte_cdk.sources.declarative.requesters.paginators

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5from airbyte_cdk.sources.declarative.requesters.paginators.default_paginator import (
 6    DefaultPaginator,
 7    PaginatorTestReadDecorator,
 8)
 9from airbyte_cdk.sources.declarative.requesters.paginators.no_pagination import NoPagination
10from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator
11from airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy import (
12    PaginationStrategy,
13)
14
15__all__ = [
16    "DefaultPaginator",
17    "NoPagination",
18    "PaginationStrategy",
19    "Paginator",
20    "PaginatorTestReadDecorator",
21]
@dataclass
class DefaultPaginator(airbyte_cdk.sources.declarative.requesters.paginators.Paginator):
 33@dataclass
 34class DefaultPaginator(Paginator):
 35    """
 36    Default paginator to request pages of results with a fixed size until the pagination strategy no longer returns a next_page_token
 37
 38    Examples:
 39        1.
 40        * fetches up to 10 records at a time by setting the "limit" request param to 10
 41        * updates the request path with  "{{ response._metadata.next }}"
 42        ```
 43          paginator:
 44            type: "DefaultPaginator"
 45            page_size_option:
 46              type: RequestOption
 47              inject_into: request_parameter
 48              field_name: limit
 49            page_token_option:
 50              type: RequestPath
 51              path: "location"
 52            pagination_strategy:
 53              type: "CursorPagination"
 54              cursor_value: "{{ response._metadata.next }}"
 55              page_size: 10
 56        ```
 57
 58        2.
 59        * fetches up to 5 records at a time by setting the "page_size" header to 5
 60        * increments a record counter and set the request parameter "offset" to the value of the counter
 61        ```
 62          paginator:
 63            type: "DefaultPaginator"
 64            page_size_option:
 65              type: RequestOption
 66              inject_into: header
 67              field_name: page_size
 68            pagination_strategy:
 69              type: "OffsetIncrement"
 70              page_size: 5
 71            page_token_option:
 72              option_type: "request_parameter"
 73              field_name: "offset"
 74        ```
 75
 76        3.
 77        * fetches up to 5 records at a time by setting the "page_size" request param to 5
 78        * increments a page counter and set the request parameter "page" to the value of the counter
 79        ```
 80          paginator:
 81            type: "DefaultPaginator"
 82            page_size_option:
 83              type: RequestOption
 84              inject_into: request_parameter
 85              field_name: page_size
 86            pagination_strategy:
 87              type: "PageIncrement"
 88              page_size: 5
 89            page_token_option:
 90              type: RequestOption
 91              option_type: "request_parameter"
 92              field_name: "page"
 93        ```
 94    Attributes:
 95        page_size_option (Optional[RequestOption]): the request option to set the page size. Cannot be injected in the path.
 96        page_token_option (Optional[RequestPath, RequestOption]): the request option to set the page token
 97        pagination_strategy (PaginationStrategy): Strategy defining how to get the next page token
 98        config (Config): connection config
 99        url_base (Union[InterpolatedString, str]): endpoint's base url
100        decoder (Decoder): decoder to decode the response
101    """
102
103    pagination_strategy: PaginationStrategy
104    config: Config
105    url_base: Union[InterpolatedString, str]
106    parameters: InitVar[Mapping[str, Any]]
107    decoder: Decoder = field(
108        default_factory=lambda: PaginationDecoderDecorator(decoder=JsonDecoder(parameters={}))
109    )
110    page_size_option: Optional[RequestOption] = None
111    page_token_option: Optional[Union[RequestPath, RequestOption]] = None
112
113    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
114        if self.page_size_option and not self.pagination_strategy.get_page_size():
115            raise ValueError(
116                "page_size_option cannot be set if the pagination strategy does not have a page_size"
117            )
118        if isinstance(self.url_base, str):
119            self.url_base = InterpolatedString(string=self.url_base, parameters=parameters)
120
121        if self.page_token_option and not isinstance(self.page_token_option, RequestPath):
122            _validate_component_request_option_paths(
123                self.config,
124                self.page_size_option,
125                self.page_token_option,
126            )
127
128    def get_initial_token(self) -> Optional[Any]:
129        """
130        Return the page token that should be used for the first request of a stream
131
132        WARNING: get_initial_token() should not be used by streams that use RFR that perform checkpointing
133        of state using page numbers. Because paginators are stateless
134        """
135        return self.pagination_strategy.initial_token
136
137    def next_page_token(
138        self,
139        response: requests.Response,
140        last_page_size: int,
141        last_record: Optional[Record],
142        last_page_token_value: Optional[Any] = None,
143    ) -> Optional[Mapping[str, Any]]:
144        next_page_token = self.pagination_strategy.next_page_token(
145            response=response,
146            last_page_size=last_page_size,
147            last_record=last_record,
148            last_page_token_value=last_page_token_value,
149        )
150        if next_page_token:
151            return {"next_page_token": next_page_token}
152        else:
153            return None
154
155    def path(
156        self,
157        next_page_token: Optional[Mapping[str, Any]],
158        stream_state: Optional[Mapping[str, Any]] = None,
159        stream_slice: Optional[StreamSlice] = None,
160    ) -> Optional[str]:
161        token = next_page_token.get("next_page_token") if next_page_token else None
162        if token and self.page_token_option and isinstance(self.page_token_option, RequestPath):
163            # make additional interpolation context
164            interpolation_context = get_interpolation_context(
165                stream_state=stream_state,
166                stream_slice=stream_slice,
167                next_page_token=next_page_token,
168            )
169            # Replace url base to only return the path
170            return str(token).replace(self.url_base.eval(self.config, **interpolation_context), "")  # type: ignore # url_base is casted to a InterpolatedString in __post_init__
171        else:
172            return None
173
174    def get_request_params(
175        self,
176        *,
177        stream_state: Optional[StreamState] = None,
178        stream_slice: Optional[StreamSlice] = None,
179        next_page_token: Optional[Mapping[str, Any]] = None,
180    ) -> MutableMapping[str, Any]:
181        return self._get_request_options(RequestOptionType.request_parameter, next_page_token)
182
183    def get_request_headers(
184        self,
185        *,
186        stream_state: Optional[StreamState] = None,
187        stream_slice: Optional[StreamSlice] = None,
188        next_page_token: Optional[Mapping[str, Any]] = None,
189    ) -> Mapping[str, str]:
190        return self._get_request_options(RequestOptionType.header, next_page_token)
191
192    def get_request_body_data(
193        self,
194        *,
195        stream_state: Optional[StreamState] = None,
196        stream_slice: Optional[StreamSlice] = None,
197        next_page_token: Optional[Mapping[str, Any]] = None,
198    ) -> Mapping[str, Any]:
199        return self._get_request_options(RequestOptionType.body_data, next_page_token)
200
201    def get_request_body_json(
202        self,
203        *,
204        stream_state: Optional[StreamState] = None,
205        stream_slice: Optional[StreamSlice] = None,
206        next_page_token: Optional[Mapping[str, Any]] = None,
207    ) -> Mapping[str, Any]:
208        return self._get_request_options(RequestOptionType.body_json, next_page_token)
209
210    def _get_request_options(
211        self, option_type: RequestOptionType, next_page_token: Optional[Mapping[str, Any]]
212    ) -> MutableMapping[str, Any]:
213        options: MutableMapping[str, Any] = {}
214
215        token = next_page_token.get("next_page_token") if next_page_token else None
216        if (
217            self.page_token_option
218            and token is not None
219            and isinstance(self.page_token_option, RequestOption)
220            and self.page_token_option.inject_into == option_type
221        ):
222            self.page_token_option.inject_into_request(options, token, self.config)
223
224        if (
225            self.page_size_option
226            and self.pagination_strategy.get_page_size()
227            and self.page_size_option.inject_into == option_type
228        ):
229            page_size = self.pagination_strategy.get_page_size()
230            self.page_size_option.inject_into_request(options, page_size, self.config)
231
232        return options

Default paginator to request pages of results with a fixed size until the pagination strategy no longer returns a next_page_token

Examples:

1.

  • fetches up to 10 records at a time by setting the "limit" request param to 10
  • updates the request path with "{{ response._metadata.next }}"
  paginator:
    type: "DefaultPaginator"
    page_size_option:
      type: RequestOption
      inject_into: request_parameter
      field_name: limit
    page_token_option:
      type: RequestPath
      path: "location"
    pagination_strategy:
      type: "CursorPagination"
      cursor_value: "{{ response._metadata.next }}"
      page_size: 10

2.

  • fetches up to 5 records at a time by setting the "page_size" header to 5
  • increments a record counter and set the request parameter "offset" to the value of the counter
  paginator:
    type: "DefaultPaginator"
    page_size_option:
      type: RequestOption
      inject_into: header
      field_name: page_size
    pagination_strategy:
      type: "OffsetIncrement"
      page_size: 5
    page_token_option:
      option_type: "request_parameter"
      field_name: "offset"

3.

  • fetches up to 5 records at a time by setting the "page_size" request param to 5
  • increments a page counter and set the request parameter "page" to the value of the counter
  paginator:
    type: "DefaultPaginator"
    page_size_option:
      type: RequestOption
      inject_into: request_parameter
      field_name: page_size
    pagination_strategy:
      type: "PageIncrement"
      page_size: 5
    page_token_option:
      type: RequestOption
      option_type: "request_parameter"
      field_name: "page"
Attributes:
  • page_size_option (Optional[RequestOption]): the request option to set the page size. Cannot be injected in the path.
  • page_token_option (Optional[RequestPath, RequestOption]): the request option to set the page token
  • pagination_strategy (PaginationStrategy): Strategy defining how to get the next page token
  • config (Config): connection config
  • url_base (Union[InterpolatedString, str]): endpoint's base url
  • decoder (Decoder): decoder to decode the response
DefaultPaginator( pagination_strategy: PaginationStrategy, config: Mapping[str, Any], url_base: Union[airbyte_cdk.InterpolatedString, str], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], decoder: airbyte_cdk.Decoder = <factory>, page_size_option: Optional[airbyte_cdk.RequestOption] = None, page_token_option: Union[airbyte_cdk.sources.declarative.requesters.request_path.RequestPath, airbyte_cdk.RequestOption, NoneType] = None)
pagination_strategy: PaginationStrategy
config: Mapping[str, Any]
url_base: Union[airbyte_cdk.InterpolatedString, str]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
page_size_option: Optional[airbyte_cdk.RequestOption] = None
def get_initial_token(self) -> Optional[Any]:
128    def get_initial_token(self) -> Optional[Any]:
129        """
130        Return the page token that should be used for the first request of a stream
131
132        WARNING: get_initial_token() should not be used by streams that use RFR that perform checkpointing
133        of state using page numbers. Because paginators are stateless
134        """
135        return self.pagination_strategy.initial_token

Return the page token that should be used for the first request of a stream

WARNING: get_initial_token() should not be used by streams that use RFR that perform checkpointing of state using page numbers. Because paginators are stateless

def next_page_token( self, response: requests.models.Response, last_page_size: int, last_record: Optional[airbyte_cdk.Record], last_page_token_value: Optional[Any] = None) -> Optional[Mapping[str, Any]]:
137    def next_page_token(
138        self,
139        response: requests.Response,
140        last_page_size: int,
141        last_record: Optional[Record],
142        last_page_token_value: Optional[Any] = None,
143    ) -> Optional[Mapping[str, Any]]:
144        next_page_token = self.pagination_strategy.next_page_token(
145            response=response,
146            last_page_size=last_page_size,
147            last_record=last_record,
148            last_page_token_value=last_page_token_value,
149        )
150        if next_page_token:
151            return {"next_page_token": next_page_token}
152        else:
153            return None

Returns the next_page_token to use to fetch the next page of records.

Parameters
  • response: the response to process
  • last_page_size: the number of records read from the response
  • last_record: the last record extracted from the response
  • last_page_token_value: The current value of the page token made on the last request
Returns

A mapping {"next_page_token": } for the next page from the input response object. Returning None means there are no more pages to read in this response.

def path( self, next_page_token: Optional[Mapping[str, Any]], stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Optional[str]:
155    def path(
156        self,
157        next_page_token: Optional[Mapping[str, Any]],
158        stream_state: Optional[Mapping[str, Any]] = None,
159        stream_slice: Optional[StreamSlice] = None,
160    ) -> Optional[str]:
161        token = next_page_token.get("next_page_token") if next_page_token else None
162        if token and self.page_token_option and isinstance(self.page_token_option, RequestPath):
163            # make additional interpolation context
164            interpolation_context = get_interpolation_context(
165                stream_state=stream_state,
166                stream_slice=stream_slice,
167                next_page_token=next_page_token,
168            )
169            # Replace url base to only return the path
170            return str(token).replace(self.url_base.eval(self.config, **interpolation_context), "")  # type: ignore # url_base is casted to a InterpolatedString in __post_init__
171        else:
172            return None

Returns the URL path to hit to fetch the next page of records

e.g: if you wanted to hit https://myapi.com/v1/some_entity then this will return "some_entity"

Returns

path to hit to fetch the next request. Returning None means the path is not defined by the next_page_token

def get_request_params( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> MutableMapping[str, Any]:
174    def get_request_params(
175        self,
176        *,
177        stream_state: Optional[StreamState] = None,
178        stream_slice: Optional[StreamSlice] = None,
179        next_page_token: Optional[Mapping[str, Any]] = None,
180    ) -> MutableMapping[str, Any]:
181        return self._get_request_options(RequestOptionType.request_parameter, next_page_token)

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def get_request_headers( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, str]:
183    def get_request_headers(
184        self,
185        *,
186        stream_state: Optional[StreamState] = None,
187        stream_slice: Optional[StreamSlice] = None,
188        next_page_token: Optional[Mapping[str, Any]] = None,
189    ) -> Mapping[str, str]:
190        return self._get_request_options(RequestOptionType.header, next_page_token)

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def get_request_body_data( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
192    def get_request_body_data(
193        self,
194        *,
195        stream_state: Optional[StreamState] = None,
196        stream_slice: Optional[StreamSlice] = None,
197        next_page_token: Optional[Mapping[str, Any]] = None,
198    ) -> Mapping[str, Any]:
199        return self._get_request_options(RequestOptionType.body_data, next_page_token)

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_request_body_json( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
201    def get_request_body_json(
202        self,
203        *,
204        stream_state: Optional[StreamState] = None,
205        stream_slice: Optional[StreamSlice] = None,
206        next_page_token: Optional[Mapping[str, Any]] = None,
207    ) -> Mapping[str, Any]:
208        return self._get_request_options(RequestOptionType.body_json, next_page_token)

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

@dataclass
class NoPagination(airbyte_cdk.sources.declarative.requesters.paginators.Paginator):
15@dataclass
16class NoPagination(Paginator):
17    """
18    Pagination implementation that never returns a next page.
19    """
20
21    parameters: InitVar[Mapping[str, Any]]
22
23    def path(
24        self,
25        next_page_token: Optional[Mapping[str, Any]],
26        stream_state: Optional[Mapping[str, Any]] = None,
27        stream_slice: Optional[StreamSlice] = None,
28    ) -> Optional[str]:
29        return None
30
31    def get_request_params(
32        self,
33        *,
34        stream_state: Optional[StreamState] = None,
35        stream_slice: Optional[StreamSlice] = None,
36        next_page_token: Optional[Mapping[str, Any]] = None,
37    ) -> MutableMapping[str, Any]:
38        return {}
39
40    def get_request_headers(
41        self,
42        *,
43        stream_state: Optional[StreamState] = None,
44        stream_slice: Optional[StreamSlice] = None,
45        next_page_token: Optional[Mapping[str, Any]] = None,
46    ) -> Mapping[str, str]:
47        return {}
48
49    def get_request_body_data(
50        self,
51        *,
52        stream_state: Optional[StreamState] = None,
53        stream_slice: Optional[StreamSlice] = None,
54        next_page_token: Optional[Mapping[str, Any]] = None,
55    ) -> Union[Mapping[str, Any], str]:
56        return {}
57
58    def get_request_body_json(
59        self,
60        *,
61        stream_state: Optional[StreamState] = None,
62        stream_slice: Optional[StreamSlice] = None,
63        next_page_token: Optional[Mapping[str, Any]] = None,
64    ) -> Mapping[str, Any]:
65        return {}
66
67    def get_initial_token(self) -> Optional[Any]:
68        return None
69
70    def next_page_token(
71        self,
72        response: requests.Response,
73        last_page_size: int,
74        last_record: Optional[Record],
75        last_page_token_value: Optional[Any],
76    ) -> Optional[Mapping[str, Any]]:
77        return {}

Pagination implementation that never returns a next page.

NoPagination(parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def path( self, next_page_token: Optional[Mapping[str, Any]], stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Optional[str]:
23    def path(
24        self,
25        next_page_token: Optional[Mapping[str, Any]],
26        stream_state: Optional[Mapping[str, Any]] = None,
27        stream_slice: Optional[StreamSlice] = None,
28    ) -> Optional[str]:
29        return None

Returns the URL path to hit to fetch the next page of records

e.g: if you wanted to hit https://myapi.com/v1/some_entity then this will return "some_entity"

Returns

path to hit to fetch the next request. Returning None means the path is not defined by the next_page_token

def get_request_params( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> MutableMapping[str, Any]:
31    def get_request_params(
32        self,
33        *,
34        stream_state: Optional[StreamState] = None,
35        stream_slice: Optional[StreamSlice] = None,
36        next_page_token: Optional[Mapping[str, Any]] = None,
37    ) -> MutableMapping[str, Any]:
38        return {}

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def get_request_headers( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, str]:
40    def get_request_headers(
41        self,
42        *,
43        stream_state: Optional[StreamState] = None,
44        stream_slice: Optional[StreamSlice] = None,
45        next_page_token: Optional[Mapping[str, Any]] = None,
46    ) -> Mapping[str, str]:
47        return {}

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def get_request_body_data( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Union[Mapping[str, Any], str]:
49    def get_request_body_data(
50        self,
51        *,
52        stream_state: Optional[StreamState] = None,
53        stream_slice: Optional[StreamSlice] = None,
54        next_page_token: Optional[Mapping[str, Any]] = None,
55    ) -> Union[Mapping[str, Any], str]:
56        return {}

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_request_body_json( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
58    def get_request_body_json(
59        self,
60        *,
61        stream_state: Optional[StreamState] = None,
62        stream_slice: Optional[StreamSlice] = None,
63        next_page_token: Optional[Mapping[str, Any]] = None,
64    ) -> Mapping[str, Any]:
65        return {}

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_initial_token(self) -> Optional[Any]:
67    def get_initial_token(self) -> Optional[Any]:
68        return None

Get the page token that should be included in the request to get the first page of records

def next_page_token( self, response: requests.models.Response, last_page_size: int, last_record: Optional[airbyte_cdk.Record], last_page_token_value: Optional[Any]) -> Optional[Mapping[str, Any]]:
70    def next_page_token(
71        self,
72        response: requests.Response,
73        last_page_size: int,
74        last_record: Optional[Record],
75        last_page_token_value: Optional[Any],
76    ) -> Optional[Mapping[str, Any]]:
77        return {}

Returns the next_page_token to use to fetch the next page of records.

Parameters
  • response: the response to process
  • last_page_size: the number of records read from the response
  • last_record: the last record extracted from the response
  • last_page_token_value: The current value of the page token made on the last request
Returns

A mapping {"next_page_token": } for the next page from the input response object. Returning None means there are no more pages to read in this response.

@dataclass
class PaginationStrategy:
15@dataclass
16class PaginationStrategy:
17    """
18    Defines how to get the next page token
19    """
20
21    @property
22    @abstractmethod
23    def initial_token(self) -> Optional[Any]:
24        """
25        Return the initial value of the token
26        """
27
28    @abstractmethod
29    def next_page_token(
30        self,
31        response: requests.Response,
32        last_page_size: int,
33        last_record: Optional[Record],
34        last_page_token_value: Optional[Any],
35    ) -> Optional[Any]:
36        """
37        :param response: response to process
38        :param last_page_size: the number of records read from the response
39        :param last_record: the last record extracted from the response
40        :param last_page_token_value: The current value of the page token made on the last request
41        :return: next page token. Returns None if there are no more pages to fetch
42        """
43        pass
44
45    @abstractmethod
46    def get_page_size(self) -> Optional[int]:
47        """
48        :return: page size: The number of records to fetch in a page. Returns None if unspecified
49        """

Defines how to get the next page token

initial_token: Optional[Any]
21    @property
22    @abstractmethod
23    def initial_token(self) -> Optional[Any]:
24        """
25        Return the initial value of the token
26        """

Return the initial value of the token

@abstractmethod
def next_page_token( self, response: requests.models.Response, last_page_size: int, last_record: Optional[airbyte_cdk.Record], last_page_token_value: Optional[Any]) -> Optional[Any]:
28    @abstractmethod
29    def next_page_token(
30        self,
31        response: requests.Response,
32        last_page_size: int,
33        last_record: Optional[Record],
34        last_page_token_value: Optional[Any],
35    ) -> Optional[Any]:
36        """
37        :param response: response to process
38        :param last_page_size: the number of records read from the response
39        :param last_record: the last record extracted from the response
40        :param last_page_token_value: The current value of the page token made on the last request
41        :return: next page token. Returns None if there are no more pages to fetch
42        """
43        pass
Parameters
  • response: response to process
  • last_page_size: the number of records read from the response
  • last_record: the last record extracted from the response
  • last_page_token_value: The current value of the page token made on the last request
Returns

next page token. Returns None if there are no more pages to fetch

@abstractmethod
def get_page_size(self) -> Optional[int]:
45    @abstractmethod
46    def get_page_size(self) -> Optional[int]:
47        """
48        :return: page size: The number of records to fetch in a page. Returns None if unspecified
49        """
Returns

page size: The number of records to fetch in a page. Returns None if unspecified

18@dataclass
19class Paginator(ABC, RequestOptionsProvider):
20    """
21    Defines the token to use to fetch the next page of records from the API.
22
23    If needed, the Paginator will set request options to be set on the HTTP request to fetch the next page of records.
24    If the next_page_token is the path to the next page of records, then it should be accessed through the `path` method
25    """
26
27    @abstractmethod
28    def get_initial_token(self) -> Optional[Any]:
29        """
30        Get the page token that should be included in the request to get the first page of records
31        """
32
33    @abstractmethod
34    def next_page_token(
35        self,
36        response: requests.Response,
37        last_page_size: int,
38        last_record: Optional[Record],
39        last_page_token_value: Optional[Any],
40    ) -> Optional[Mapping[str, Any]]:
41        """
42        Returns the next_page_token to use to fetch the next page of records.
43
44        :param response: the response to process
45        :param last_page_size: the number of records read from the response
46        :param last_record: the last record extracted from the response
47        :param last_page_token_value: The current value of the page token made on the last request
48        :return: A mapping {"next_page_token": <token>} for the next page from the input response object. Returning None means there are no more pages to read in this response.
49        """
50        pass
51
52    @abstractmethod
53    def path(
54        self,
55        next_page_token: Optional[Mapping[str, Any]],
56        stream_state: Optional[Mapping[str, Any]] = None,
57        stream_slice: Optional[StreamSlice] = None,
58    ) -> Optional[str]:
59        """
60        Returns the URL path to hit to fetch the next page of records
61
62        e.g: if you wanted to hit https://myapi.com/v1/some_entity then this will return "some_entity"
63
64        :return: path to hit to fetch the next request. Returning None means the path is not defined by the next_page_token
65        """
66        pass

Defines the token to use to fetch the next page of records from the API.

If needed, the Paginator will set request options to be set on the HTTP request to fetch the next page of records. If the next_page_token is the path to the next page of records, then it should be accessed through the path method

@abstractmethod
def get_initial_token(self) -> Optional[Any]:
27    @abstractmethod
28    def get_initial_token(self) -> Optional[Any]:
29        """
30        Get the page token that should be included in the request to get the first page of records
31        """

Get the page token that should be included in the request to get the first page of records

@abstractmethod
def next_page_token( self, response: requests.models.Response, last_page_size: int, last_record: Optional[airbyte_cdk.Record], last_page_token_value: Optional[Any]) -> Optional[Mapping[str, Any]]:
33    @abstractmethod
34    def next_page_token(
35        self,
36        response: requests.Response,
37        last_page_size: int,
38        last_record: Optional[Record],
39        last_page_token_value: Optional[Any],
40    ) -> Optional[Mapping[str, Any]]:
41        """
42        Returns the next_page_token to use to fetch the next page of records.
43
44        :param response: the response to process
45        :param last_page_size: the number of records read from the response
46        :param last_record: the last record extracted from the response
47        :param last_page_token_value: The current value of the page token made on the last request
48        :return: A mapping {"next_page_token": <token>} for the next page from the input response object. Returning None means there are no more pages to read in this response.
49        """
50        pass

Returns the next_page_token to use to fetch the next page of records.

Parameters
  • response: the response to process
  • last_page_size: the number of records read from the response
  • last_record: the last record extracted from the response
  • last_page_token_value: The current value of the page token made on the last request
Returns

A mapping {"next_page_token": } for the next page from the input response object. Returning None means there are no more pages to read in this response.

@abstractmethod
def path( self, next_page_token: Optional[Mapping[str, Any]], stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Optional[str]:
52    @abstractmethod
53    def path(
54        self,
55        next_page_token: Optional[Mapping[str, Any]],
56        stream_state: Optional[Mapping[str, Any]] = None,
57        stream_slice: Optional[StreamSlice] = None,
58    ) -> Optional[str]:
59        """
60        Returns the URL path to hit to fetch the next page of records
61
62        e.g: if you wanted to hit https://myapi.com/v1/some_entity then this will return "some_entity"
63
64        :return: path to hit to fetch the next request. Returning None means the path is not defined by the next_page_token
65        """
66        pass

Returns the URL path to hit to fetch the next page of records

e.g: if you wanted to hit https://myapi.com/v1/some_entity then this will return "some_entity"

Returns

path to hit to fetch the next request. Returning None means the path is not defined by the next_page_token

class PaginatorTestReadDecorator(airbyte_cdk.sources.declarative.requesters.paginators.Paginator):
235class PaginatorTestReadDecorator(Paginator):
236    """
237    In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of
238    pages that are queried throughout a read command.
239
240    WARNING: This decorator is not currently thread-safe like the rest of the low-code framework because it has
241    an internal state to track the current number of pages counted so that it can exit early during a test read
242    """
243
244    _PAGE_COUNT_BEFORE_FIRST_NEXT_CALL = 1
245
246    def __init__(self, decorated: Paginator, maximum_number_of_pages: int = 5) -> None:
247        if maximum_number_of_pages and maximum_number_of_pages < 1:
248            raise ValueError(
249                f"The maximum number of pages on a test read needs to be strictly positive. Got {maximum_number_of_pages}"
250            )
251        self._maximum_number_of_pages = maximum_number_of_pages
252        self._decorated = decorated
253        self._page_count = self._PAGE_COUNT_BEFORE_FIRST_NEXT_CALL
254
255    def get_initial_token(self) -> Optional[Any]:
256        self._page_count = self._PAGE_COUNT_BEFORE_FIRST_NEXT_CALL
257        return self._decorated.get_initial_token()
258
259    def next_page_token(
260        self,
261        response: requests.Response,
262        last_page_size: int,
263        last_record: Optional[Record],
264        last_page_token_value: Optional[Any] = None,
265    ) -> Optional[Mapping[str, Any]]:
266        if self._page_count >= self._maximum_number_of_pages:
267            return None
268
269        self._page_count += 1
270        return self._decorated.next_page_token(
271            response, last_page_size, last_record, last_page_token_value
272        )
273
274    def path(
275        self,
276        next_page_token: Optional[Mapping[str, Any]],
277        stream_state: Optional[Mapping[str, Any]] = None,
278        stream_slice: Optional[StreamSlice] = None,
279    ) -> Optional[str]:
280        return self._decorated.path(
281            next_page_token=next_page_token,
282            stream_state=stream_state,
283            stream_slice=stream_slice,
284        )
285
286    def get_request_params(
287        self,
288        *,
289        stream_state: Optional[StreamState] = None,
290        stream_slice: Optional[StreamSlice] = None,
291        next_page_token: Optional[Mapping[str, Any]] = None,
292    ) -> Mapping[str, Any]:
293        return self._decorated.get_request_params(
294            stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
295        )
296
297    def get_request_headers(
298        self,
299        *,
300        stream_state: Optional[StreamState] = None,
301        stream_slice: Optional[StreamSlice] = None,
302        next_page_token: Optional[Mapping[str, Any]] = None,
303    ) -> Mapping[str, str]:
304        return self._decorated.get_request_headers(
305            stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
306        )
307
308    def get_request_body_data(
309        self,
310        *,
311        stream_state: Optional[StreamState] = None,
312        stream_slice: Optional[StreamSlice] = None,
313        next_page_token: Optional[Mapping[str, Any]] = None,
314    ) -> Union[Mapping[str, Any], str]:
315        return self._decorated.get_request_body_data(
316            stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
317        )
318
319    def get_request_body_json(
320        self,
321        *,
322        stream_state: Optional[StreamState] = None,
323        stream_slice: Optional[StreamSlice] = None,
324        next_page_token: Optional[Mapping[str, Any]] = None,
325    ) -> Mapping[str, Any]:
326        return self._decorated.get_request_body_json(
327            stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
328        )

In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of pages that are queried throughout a read command.

WARNING: This decorator is not currently thread-safe like the rest of the low-code framework because it has an internal state to track the current number of pages counted so that it can exit early during a test read

PaginatorTestReadDecorator( decorated: Paginator, maximum_number_of_pages: int = 5)
246    def __init__(self, decorated: Paginator, maximum_number_of_pages: int = 5) -> None:
247        if maximum_number_of_pages and maximum_number_of_pages < 1:
248            raise ValueError(
249                f"The maximum number of pages on a test read needs to be strictly positive. Got {maximum_number_of_pages}"
250            )
251        self._maximum_number_of_pages = maximum_number_of_pages
252        self._decorated = decorated
253        self._page_count = self._PAGE_COUNT_BEFORE_FIRST_NEXT_CALL
def get_initial_token(self) -> Optional[Any]:
255    def get_initial_token(self) -> Optional[Any]:
256        self._page_count = self._PAGE_COUNT_BEFORE_FIRST_NEXT_CALL
257        return self._decorated.get_initial_token()

Get the page token that should be included in the request to get the first page of records

def next_page_token( self, response: requests.models.Response, last_page_size: int, last_record: Optional[airbyte_cdk.Record], last_page_token_value: Optional[Any] = None) -> Optional[Mapping[str, Any]]:
259    def next_page_token(
260        self,
261        response: requests.Response,
262        last_page_size: int,
263        last_record: Optional[Record],
264        last_page_token_value: Optional[Any] = None,
265    ) -> Optional[Mapping[str, Any]]:
266        if self._page_count >= self._maximum_number_of_pages:
267            return None
268
269        self._page_count += 1
270        return self._decorated.next_page_token(
271            response, last_page_size, last_record, last_page_token_value
272        )

Returns the next_page_token to use to fetch the next page of records.

Parameters
  • response: the response to process
  • last_page_size: the number of records read from the response
  • last_record: the last record extracted from the response
  • last_page_token_value: The current value of the page token made on the last request
Returns

A mapping {"next_page_token": } for the next page from the input response object. Returning None means there are no more pages to read in this response.

def path( self, next_page_token: Optional[Mapping[str, Any]], stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Optional[str]:
274    def path(
275        self,
276        next_page_token: Optional[Mapping[str, Any]],
277        stream_state: Optional[Mapping[str, Any]] = None,
278        stream_slice: Optional[StreamSlice] = None,
279    ) -> Optional[str]:
280        return self._decorated.path(
281            next_page_token=next_page_token,
282            stream_state=stream_state,
283            stream_slice=stream_slice,
284        )

Returns the URL path to hit to fetch the next page of records

e.g: if you wanted to hit https://myapi.com/v1/some_entity then this will return "some_entity"

Returns

path to hit to fetch the next request. Returning None means the path is not defined by the next_page_token

def get_request_params( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
286    def get_request_params(
287        self,
288        *,
289        stream_state: Optional[StreamState] = None,
290        stream_slice: Optional[StreamSlice] = None,
291        next_page_token: Optional[Mapping[str, Any]] = None,
292    ) -> Mapping[str, Any]:
293        return self._decorated.get_request_params(
294            stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
295        )

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def get_request_headers( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, str]:
297    def get_request_headers(
298        self,
299        *,
300        stream_state: Optional[StreamState] = None,
301        stream_slice: Optional[StreamSlice] = None,
302        next_page_token: Optional[Mapping[str, Any]] = None,
303    ) -> Mapping[str, str]:
304        return self._decorated.get_request_headers(
305            stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
306        )

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def get_request_body_data( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Union[Mapping[str, Any], str]:
308    def get_request_body_data(
309        self,
310        *,
311        stream_state: Optional[StreamState] = None,
312        stream_slice: Optional[StreamSlice] = None,
313        next_page_token: Optional[Mapping[str, Any]] = None,
314    ) -> Union[Mapping[str, Any], str]:
315        return self._decorated.get_request_body_data(
316            stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
317        )

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_request_body_json( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
319    def get_request_body_json(
320        self,
321        *,
322        stream_state: Optional[StreamState] = None,
323        stream_slice: Optional[StreamSlice] = None,
324        next_page_token: Optional[Mapping[str, Any]] = None,
325    ) -> Mapping[str, Any]:
326        return self._decorated.get_request_body_json(
327            stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
328        )

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.