airbyte_cdk.sources.declarative.requesters.paginators
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from airbyte_cdk.sources.declarative.requesters.paginators.default_paginator import ( 6 DefaultPaginator, 7 PaginatorTestReadDecorator, 8) 9from airbyte_cdk.sources.declarative.requesters.paginators.no_pagination import NoPagination 10from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator 11from airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy import ( 12 PaginationStrategy, 13) 14 15__all__ = [ 16 "DefaultPaginator", 17 "NoPagination", 18 "PaginationStrategy", 19 "Paginator", 20 "PaginatorTestReadDecorator", 21]
33@dataclass 34class DefaultPaginator(Paginator): 35 """ 36 Default paginator to request pages of results with a fixed size until the pagination strategy no longer returns a next_page_token 37 38 Examples: 39 1. 40 * fetches up to 10 records at a time by setting the "limit" request param to 10 41 * updates the request path with "{{ response._metadata.next }}" 42 ``` 43 paginator: 44 type: "DefaultPaginator" 45 page_size_option: 46 type: RequestOption 47 inject_into: request_parameter 48 field_name: limit 49 page_token_option: 50 type: RequestPath 51 path: "location" 52 pagination_strategy: 53 type: "CursorPagination" 54 cursor_value: "{{ response._metadata.next }}" 55 page_size: 10 56 ``` 57 58 2. 59 * fetches up to 5 records at a time by setting the "page_size" header to 5 60 * increments a record counter and set the request parameter "offset" to the value of the counter 61 ``` 62 paginator: 63 type: "DefaultPaginator" 64 page_size_option: 65 type: RequestOption 66 inject_into: header 67 field_name: page_size 68 pagination_strategy: 69 type: "OffsetIncrement" 70 page_size: 5 71 page_token_option: 72 option_type: "request_parameter" 73 field_name: "offset" 74 ``` 75 76 3. 77 * fetches up to 5 records at a time by setting the "page_size" request param to 5 78 * increments a page counter and set the request parameter "page" to the value of the counter 79 ``` 80 paginator: 81 type: "DefaultPaginator" 82 page_size_option: 83 type: RequestOption 84 inject_into: request_parameter 85 field_name: page_size 86 pagination_strategy: 87 type: "PageIncrement" 88 page_size: 5 89 page_token_option: 90 type: RequestOption 91 option_type: "request_parameter" 92 field_name: "page" 93 ``` 94 Attributes: 95 page_size_option (Optional[RequestOption]): the request option to set the page size. Cannot be injected in the path. 96 page_token_option (Optional[RequestPath, RequestOption]): the request option to set the page token 97 pagination_strategy (PaginationStrategy): Strategy defining how to get the next page token 98 config (Config): connection config 99 url_base (Union[InterpolatedString, str]): endpoint's base url 100 decoder (Decoder): decoder to decode the response 101 """ 102 103 pagination_strategy: PaginationStrategy 104 config: Config 105 url_base: Union[InterpolatedString, str] 106 parameters: InitVar[Mapping[str, Any]] 107 decoder: Decoder = field( 108 default_factory=lambda: PaginationDecoderDecorator(decoder=JsonDecoder(parameters={})) 109 ) 110 page_size_option: Optional[RequestOption] = None 111 page_token_option: Optional[Union[RequestPath, RequestOption]] = None 112 113 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 114 if self.page_size_option and not self.pagination_strategy.get_page_size(): 115 raise ValueError( 116 "page_size_option cannot be set if the pagination strategy does not have a page_size" 117 ) 118 if isinstance(self.url_base, str): 119 self.url_base = InterpolatedString(string=self.url_base, parameters=parameters) 120 121 if self.page_token_option and not isinstance(self.page_token_option, RequestPath): 122 _validate_component_request_option_paths( 123 self.config, 124 self.page_size_option, 125 self.page_token_option, 126 ) 127 128 def get_initial_token(self) -> Optional[Any]: 129 """ 130 Return the page token that should be used for the first request of a stream 131 132 WARNING: get_initial_token() should not be used by streams that use RFR that perform checkpointing 133 of state using page numbers. Because paginators are stateless 134 """ 135 return self.pagination_strategy.initial_token 136 137 def next_page_token( 138 self, 139 response: requests.Response, 140 last_page_size: int, 141 last_record: Optional[Record], 142 last_page_token_value: Optional[Any] = None, 143 ) -> Optional[Mapping[str, Any]]: 144 next_page_token = self.pagination_strategy.next_page_token( 145 response=response, 146 last_page_size=last_page_size, 147 last_record=last_record, 148 last_page_token_value=last_page_token_value, 149 ) 150 if next_page_token: 151 return {"next_page_token": next_page_token} 152 else: 153 return None 154 155 def path( 156 self, 157 next_page_token: Optional[Mapping[str, Any]], 158 stream_state: Optional[Mapping[str, Any]] = None, 159 stream_slice: Optional[StreamSlice] = None, 160 ) -> Optional[str]: 161 token = next_page_token.get("next_page_token") if next_page_token else None 162 if token and self.page_token_option and isinstance(self.page_token_option, RequestPath): 163 return str(token) 164 else: 165 return None 166 167 def get_request_params( 168 self, 169 *, 170 stream_state: Optional[StreamState] = None, 171 stream_slice: Optional[StreamSlice] = None, 172 next_page_token: Optional[Mapping[str, Any]] = None, 173 ) -> MutableMapping[str, Any]: 174 return self._get_request_options(RequestOptionType.request_parameter, next_page_token) 175 176 def get_request_headers( 177 self, 178 *, 179 stream_state: Optional[StreamState] = None, 180 stream_slice: Optional[StreamSlice] = None, 181 next_page_token: Optional[Mapping[str, Any]] = None, 182 ) -> Mapping[str, str]: 183 return self._get_request_options(RequestOptionType.header, next_page_token) 184 185 def get_request_body_data( 186 self, 187 *, 188 stream_state: Optional[StreamState] = None, 189 stream_slice: Optional[StreamSlice] = None, 190 next_page_token: Optional[Mapping[str, Any]] = None, 191 ) -> Mapping[str, Any]: 192 return self._get_request_options(RequestOptionType.body_data, next_page_token) 193 194 def get_request_body_json( 195 self, 196 *, 197 stream_state: Optional[StreamState] = None, 198 stream_slice: Optional[StreamSlice] = None, 199 next_page_token: Optional[Mapping[str, Any]] = None, 200 ) -> Mapping[str, Any]: 201 return self._get_request_options(RequestOptionType.body_json, next_page_token) 202 203 def _get_request_options( 204 self, option_type: RequestOptionType, next_page_token: Optional[Mapping[str, Any]] 205 ) -> MutableMapping[str, Any]: 206 options: MutableMapping[str, Any] = {} 207 208 token = next_page_token.get("next_page_token") if next_page_token else None 209 if ( 210 self.page_token_option 211 and token is not None 212 and isinstance(self.page_token_option, RequestOption) 213 and self.page_token_option.inject_into == option_type 214 ): 215 self.page_token_option.inject_into_request(options, token, self.config) 216 217 if ( 218 self.page_size_option 219 and self.pagination_strategy.get_page_size() 220 and self.page_size_option.inject_into == option_type 221 ): 222 page_size = self.pagination_strategy.get_page_size() 223 self.page_size_option.inject_into_request(options, page_size, self.config) 224 225 return options
Default paginator to request pages of results with a fixed size until the pagination strategy no longer returns a next_page_token
Examples:
1.
- fetches up to 10 records at a time by setting the "limit" request param to 10
- updates the request path with "{{ response._metadata.next }}"
paginator: type: "DefaultPaginator" page_size_option: type: RequestOption inject_into: request_parameter field_name: limit page_token_option: type: RequestPath path: "location" pagination_strategy: type: "CursorPagination" cursor_value: "{{ response._metadata.next }}" page_size: 10
2.
- fetches up to 5 records at a time by setting the "page_size" header to 5
- increments a record counter and set the request parameter "offset" to the value of the counter
paginator: type: "DefaultPaginator" page_size_option: type: RequestOption inject_into: header field_name: page_size pagination_strategy: type: "OffsetIncrement" page_size: 5 page_token_option: option_type: "request_parameter" field_name: "offset"
3.
- fetches up to 5 records at a time by setting the "page_size" request param to 5
- increments a page counter and set the request parameter "page" to the value of the counter
paginator: type: "DefaultPaginator" page_size_option: type: RequestOption inject_into: request_parameter field_name: page_size pagination_strategy: type: "PageIncrement" page_size: 5 page_token_option: type: RequestOption option_type: "request_parameter" field_name: "page"
Attributes:
- page_size_option (Optional[RequestOption]): the request option to set the page size. Cannot be injected in the path.
- page_token_option (Optional[RequestPath, RequestOption]): the request option to set the page token
- pagination_strategy (PaginationStrategy): Strategy defining how to get the next page token
- config (Config): connection config
- url_base (Union[InterpolatedString, str]): endpoint's base url
- decoder (Decoder): decoder to decode the response
128 def get_initial_token(self) -> Optional[Any]: 129 """ 130 Return the page token that should be used for the first request of a stream 131 132 WARNING: get_initial_token() should not be used by streams that use RFR that perform checkpointing 133 of state using page numbers. Because paginators are stateless 134 """ 135 return self.pagination_strategy.initial_token
Return the page token that should be used for the first request of a stream
WARNING: get_initial_token() should not be used by streams that use RFR that perform checkpointing of state using page numbers. Because paginators are stateless
137 def next_page_token( 138 self, 139 response: requests.Response, 140 last_page_size: int, 141 last_record: Optional[Record], 142 last_page_token_value: Optional[Any] = None, 143 ) -> Optional[Mapping[str, Any]]: 144 next_page_token = self.pagination_strategy.next_page_token( 145 response=response, 146 last_page_size=last_page_size, 147 last_record=last_record, 148 last_page_token_value=last_page_token_value, 149 ) 150 if next_page_token: 151 return {"next_page_token": next_page_token} 152 else: 153 return None
Returns the next_page_token to use to fetch the next page of records.
Parameters
- response: the response to process
- last_page_size: the number of records read from the response
- last_record: the last record extracted from the response
- last_page_token_value: The current value of the page token made on the last request
Returns
A mapping {"next_page_token":
} for the next page from the input response object. Returning None means there are no more pages to read in this response.
155 def path( 156 self, 157 next_page_token: Optional[Mapping[str, Any]], 158 stream_state: Optional[Mapping[str, Any]] = None, 159 stream_slice: Optional[StreamSlice] = None, 160 ) -> Optional[str]: 161 token = next_page_token.get("next_page_token") if next_page_token else None 162 if token and self.page_token_option and isinstance(self.page_token_option, RequestPath): 163 return str(token) 164 else: 165 return None
Returns the URL path to hit to fetch the next page of records
e.g: if you wanted to hit https://myapi.com/v1/some_entity then this will return "some_entity"
Returns
path to hit to fetch the next request. Returning None means the path is not defined by the next_page_token
167 def get_request_params( 168 self, 169 *, 170 stream_state: Optional[StreamState] = None, 171 stream_slice: Optional[StreamSlice] = None, 172 next_page_token: Optional[Mapping[str, Any]] = None, 173 ) -> MutableMapping[str, Any]: 174 return self._get_request_options(RequestOptionType.request_parameter, next_page_token)
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
176 def get_request_headers( 177 self, 178 *, 179 stream_state: Optional[StreamState] = None, 180 stream_slice: Optional[StreamSlice] = None, 181 next_page_token: Optional[Mapping[str, Any]] = None, 182 ) -> Mapping[str, str]: 183 return self._get_request_options(RequestOptionType.header, next_page_token)
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
185 def get_request_body_data( 186 self, 187 *, 188 stream_state: Optional[StreamState] = None, 189 stream_slice: Optional[StreamSlice] = None, 190 next_page_token: Optional[Mapping[str, Any]] = None, 191 ) -> Mapping[str, Any]: 192 return self._get_request_options(RequestOptionType.body_data, next_page_token)
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
194 def get_request_body_json( 195 self, 196 *, 197 stream_state: Optional[StreamState] = None, 198 stream_slice: Optional[StreamSlice] = None, 199 next_page_token: Optional[Mapping[str, Any]] = None, 200 ) -> Mapping[str, Any]: 201 return self._get_request_options(RequestOptionType.body_json, next_page_token)
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
15@dataclass 16class NoPagination(Paginator): 17 """ 18 Pagination implementation that never returns a next page. 19 """ 20 21 parameters: InitVar[Mapping[str, Any]] 22 23 def path( 24 self, 25 next_page_token: Optional[Mapping[str, Any]], 26 stream_state: Optional[Mapping[str, Any]] = None, 27 stream_slice: Optional[StreamSlice] = None, 28 ) -> Optional[str]: 29 return None 30 31 def get_request_params( 32 self, 33 *, 34 stream_state: Optional[StreamState] = None, 35 stream_slice: Optional[StreamSlice] = None, 36 next_page_token: Optional[Mapping[str, Any]] = None, 37 ) -> MutableMapping[str, Any]: 38 return {} 39 40 def get_request_headers( 41 self, 42 *, 43 stream_state: Optional[StreamState] = None, 44 stream_slice: Optional[StreamSlice] = None, 45 next_page_token: Optional[Mapping[str, Any]] = None, 46 ) -> Mapping[str, str]: 47 return {} 48 49 def get_request_body_data( 50 self, 51 *, 52 stream_state: Optional[StreamState] = None, 53 stream_slice: Optional[StreamSlice] = None, 54 next_page_token: Optional[Mapping[str, Any]] = None, 55 ) -> Union[Mapping[str, Any], str]: 56 return {} 57 58 def get_request_body_json( 59 self, 60 *, 61 stream_state: Optional[StreamState] = None, 62 stream_slice: Optional[StreamSlice] = None, 63 next_page_token: Optional[Mapping[str, Any]] = None, 64 ) -> Mapping[str, Any]: 65 return {} 66 67 def get_initial_token(self) -> Optional[Any]: 68 return None 69 70 def next_page_token( 71 self, 72 response: requests.Response, 73 last_page_size: int, 74 last_record: Optional[Record], 75 last_page_token_value: Optional[Any], 76 ) -> Optional[Mapping[str, Any]]: 77 return {}
Pagination implementation that never returns a next page.
23 def path( 24 self, 25 next_page_token: Optional[Mapping[str, Any]], 26 stream_state: Optional[Mapping[str, Any]] = None, 27 stream_slice: Optional[StreamSlice] = None, 28 ) -> Optional[str]: 29 return None
Returns the URL path to hit to fetch the next page of records
e.g: if you wanted to hit https://myapi.com/v1/some_entity then this will return "some_entity"
Returns
path to hit to fetch the next request. Returning None means the path is not defined by the next_page_token
31 def get_request_params( 32 self, 33 *, 34 stream_state: Optional[StreamState] = None, 35 stream_slice: Optional[StreamSlice] = None, 36 next_page_token: Optional[Mapping[str, Any]] = None, 37 ) -> MutableMapping[str, Any]: 38 return {}
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
40 def get_request_headers( 41 self, 42 *, 43 stream_state: Optional[StreamState] = None, 44 stream_slice: Optional[StreamSlice] = None, 45 next_page_token: Optional[Mapping[str, Any]] = None, 46 ) -> Mapping[str, str]: 47 return {}
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
49 def get_request_body_data( 50 self, 51 *, 52 stream_state: Optional[StreamState] = None, 53 stream_slice: Optional[StreamSlice] = None, 54 next_page_token: Optional[Mapping[str, Any]] = None, 55 ) -> Union[Mapping[str, Any], str]: 56 return {}
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
58 def get_request_body_json( 59 self, 60 *, 61 stream_state: Optional[StreamState] = None, 62 stream_slice: Optional[StreamSlice] = None, 63 next_page_token: Optional[Mapping[str, Any]] = None, 64 ) -> Mapping[str, Any]: 65 return {}
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
Get the page token that should be included in the request to get the first page of records
70 def next_page_token( 71 self, 72 response: requests.Response, 73 last_page_size: int, 74 last_record: Optional[Record], 75 last_page_token_value: Optional[Any], 76 ) -> Optional[Mapping[str, Any]]: 77 return {}
Returns the next_page_token to use to fetch the next page of records.
Parameters
- response: the response to process
- last_page_size: the number of records read from the response
- last_record: the last record extracted from the response
- last_page_token_value: The current value of the page token made on the last request
Returns
A mapping {"next_page_token":
} for the next page from the input response object. Returning None means there are no more pages to read in this response.
15@dataclass 16class PaginationStrategy: 17 """ 18 Defines how to get the next page token 19 """ 20 21 @property 22 @abstractmethod 23 def initial_token(self) -> Optional[Any]: 24 """ 25 Return the initial value of the token 26 """ 27 28 @abstractmethod 29 def next_page_token( 30 self, 31 response: requests.Response, 32 last_page_size: int, 33 last_record: Optional[Record], 34 last_page_token_value: Optional[Any], 35 ) -> Optional[Any]: 36 """ 37 :param response: response to process 38 :param last_page_size: the number of records read from the response 39 :param last_record: the last record extracted from the response 40 :param last_page_token_value: The current value of the page token made on the last request 41 :return: next page token. Returns None if there are no more pages to fetch 42 """ 43 pass 44 45 @abstractmethod 46 def get_page_size(self) -> Optional[int]: 47 """ 48 :return: page size: The number of records to fetch in a page. Returns None if unspecified 49 """
Defines how to get the next page token
21 @property 22 @abstractmethod 23 def initial_token(self) -> Optional[Any]: 24 """ 25 Return the initial value of the token 26 """
Return the initial value of the token
28 @abstractmethod 29 def next_page_token( 30 self, 31 response: requests.Response, 32 last_page_size: int, 33 last_record: Optional[Record], 34 last_page_token_value: Optional[Any], 35 ) -> Optional[Any]: 36 """ 37 :param response: response to process 38 :param last_page_size: the number of records read from the response 39 :param last_record: the last record extracted from the response 40 :param last_page_token_value: The current value of the page token made on the last request 41 :return: next page token. Returns None if there are no more pages to fetch 42 """ 43 pass
Parameters
- response: response to process
- last_page_size: the number of records read from the response
- last_record: the last record extracted from the response
- last_page_token_value: The current value of the page token made on the last request
Returns
next page token. Returns None if there are no more pages to fetch
45 @abstractmethod 46 def get_page_size(self) -> Optional[int]: 47 """ 48 :return: page size: The number of records to fetch in a page. Returns None if unspecified 49 """
Returns
page size: The number of records to fetch in a page. Returns None if unspecified
18@dataclass 19class Paginator(ABC, RequestOptionsProvider): 20 """ 21 Defines the token to use to fetch the next page of records from the API. 22 23 If needed, the Paginator will set request options to be set on the HTTP request to fetch the next page of records. 24 If the next_page_token is the path to the next page of records, then it should be accessed through the `path` method 25 """ 26 27 @abstractmethod 28 def get_initial_token(self) -> Optional[Any]: 29 """ 30 Get the page token that should be included in the request to get the first page of records 31 """ 32 33 @abstractmethod 34 def next_page_token( 35 self, 36 response: requests.Response, 37 last_page_size: int, 38 last_record: Optional[Record], 39 last_page_token_value: Optional[Any], 40 ) -> Optional[Mapping[str, Any]]: 41 """ 42 Returns the next_page_token to use to fetch the next page of records. 43 44 :param response: the response to process 45 :param last_page_size: the number of records read from the response 46 :param last_record: the last record extracted from the response 47 :param last_page_token_value: The current value of the page token made on the last request 48 :return: A mapping {"next_page_token": <token>} for the next page from the input response object. Returning None means there are no more pages to read in this response. 49 """ 50 pass 51 52 @abstractmethod 53 def path( 54 self, 55 next_page_token: Optional[Mapping[str, Any]], 56 stream_state: Optional[Mapping[str, Any]] = None, 57 stream_slice: Optional[StreamSlice] = None, 58 ) -> Optional[str]: 59 """ 60 Returns the URL path to hit to fetch the next page of records 61 62 e.g: if you wanted to hit https://myapi.com/v1/some_entity then this will return "some_entity" 63 64 :return: path to hit to fetch the next request. Returning None means the path is not defined by the next_page_token 65 """ 66 pass
Defines the token to use to fetch the next page of records from the API.
If needed, the Paginator will set request options to be set on the HTTP request to fetch the next page of records.
If the next_page_token is the path to the next page of records, then it should be accessed through the path
method
27 @abstractmethod 28 def get_initial_token(self) -> Optional[Any]: 29 """ 30 Get the page token that should be included in the request to get the first page of records 31 """
Get the page token that should be included in the request to get the first page of records
33 @abstractmethod 34 def next_page_token( 35 self, 36 response: requests.Response, 37 last_page_size: int, 38 last_record: Optional[Record], 39 last_page_token_value: Optional[Any], 40 ) -> Optional[Mapping[str, Any]]: 41 """ 42 Returns the next_page_token to use to fetch the next page of records. 43 44 :param response: the response to process 45 :param last_page_size: the number of records read from the response 46 :param last_record: the last record extracted from the response 47 :param last_page_token_value: The current value of the page token made on the last request 48 :return: A mapping {"next_page_token": <token>} for the next page from the input response object. Returning None means there are no more pages to read in this response. 49 """ 50 pass
Returns the next_page_token to use to fetch the next page of records.
Parameters
- response: the response to process
- last_page_size: the number of records read from the response
- last_record: the last record extracted from the response
- last_page_token_value: The current value of the page token made on the last request
Returns
A mapping {"next_page_token":
} for the next page from the input response object. Returning None means there are no more pages to read in this response.
52 @abstractmethod 53 def path( 54 self, 55 next_page_token: Optional[Mapping[str, Any]], 56 stream_state: Optional[Mapping[str, Any]] = None, 57 stream_slice: Optional[StreamSlice] = None, 58 ) -> Optional[str]: 59 """ 60 Returns the URL path to hit to fetch the next page of records 61 62 e.g: if you wanted to hit https://myapi.com/v1/some_entity then this will return "some_entity" 63 64 :return: path to hit to fetch the next request. Returning None means the path is not defined by the next_page_token 65 """ 66 pass
Returns the URL path to hit to fetch the next page of records
e.g: if you wanted to hit https://myapi.com/v1/some_entity then this will return "some_entity"
Returns
path to hit to fetch the next request. Returning None means the path is not defined by the next_page_token
228class PaginatorTestReadDecorator(Paginator): 229 """ 230 In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of 231 pages that are queried throughout a read command. 232 233 WARNING: This decorator is not currently thread-safe like the rest of the low-code framework because it has 234 an internal state to track the current number of pages counted so that it can exit early during a test read 235 """ 236 237 _PAGE_COUNT_BEFORE_FIRST_NEXT_CALL = 1 238 239 def __init__(self, decorated: Paginator, maximum_number_of_pages: int = 5) -> None: 240 if maximum_number_of_pages and maximum_number_of_pages < 1: 241 raise ValueError( 242 f"The maximum number of pages on a test read needs to be strictly positive. Got {maximum_number_of_pages}" 243 ) 244 self._maximum_number_of_pages = maximum_number_of_pages 245 self._decorated = decorated 246 self._page_count = self._PAGE_COUNT_BEFORE_FIRST_NEXT_CALL 247 248 def get_initial_token(self) -> Optional[Any]: 249 self._page_count = self._PAGE_COUNT_BEFORE_FIRST_NEXT_CALL 250 return self._decorated.get_initial_token() 251 252 def next_page_token( 253 self, 254 response: requests.Response, 255 last_page_size: int, 256 last_record: Optional[Record], 257 last_page_token_value: Optional[Any] = None, 258 ) -> Optional[Mapping[str, Any]]: 259 if self._page_count >= self._maximum_number_of_pages: 260 return None 261 262 self._page_count += 1 263 return self._decorated.next_page_token( 264 response, last_page_size, last_record, last_page_token_value 265 ) 266 267 def path( 268 self, 269 next_page_token: Optional[Mapping[str, Any]], 270 stream_state: Optional[Mapping[str, Any]] = None, 271 stream_slice: Optional[StreamSlice] = None, 272 ) -> Optional[str]: 273 return self._decorated.path( 274 next_page_token=next_page_token, 275 stream_state=stream_state, 276 stream_slice=stream_slice, 277 ) 278 279 def get_request_params( 280 self, 281 *, 282 stream_state: Optional[StreamState] = None, 283 stream_slice: Optional[StreamSlice] = None, 284 next_page_token: Optional[Mapping[str, Any]] = None, 285 ) -> Mapping[str, Any]: 286 return self._decorated.get_request_params( 287 stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token 288 ) 289 290 def get_request_headers( 291 self, 292 *, 293 stream_state: Optional[StreamState] = None, 294 stream_slice: Optional[StreamSlice] = None, 295 next_page_token: Optional[Mapping[str, Any]] = None, 296 ) -> Mapping[str, str]: 297 return self._decorated.get_request_headers( 298 stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token 299 ) 300 301 def get_request_body_data( 302 self, 303 *, 304 stream_state: Optional[StreamState] = None, 305 stream_slice: Optional[StreamSlice] = None, 306 next_page_token: Optional[Mapping[str, Any]] = None, 307 ) -> Union[Mapping[str, Any], str]: 308 return self._decorated.get_request_body_data( 309 stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token 310 ) 311 312 def get_request_body_json( 313 self, 314 *, 315 stream_state: Optional[StreamState] = None, 316 stream_slice: Optional[StreamSlice] = None, 317 next_page_token: Optional[Mapping[str, Any]] = None, 318 ) -> Mapping[str, Any]: 319 return self._decorated.get_request_body_json( 320 stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token 321 )
In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of pages that are queried throughout a read command.
WARNING: This decorator is not currently thread-safe like the rest of the low-code framework because it has an internal state to track the current number of pages counted so that it can exit early during a test read
239 def __init__(self, decorated: Paginator, maximum_number_of_pages: int = 5) -> None: 240 if maximum_number_of_pages and maximum_number_of_pages < 1: 241 raise ValueError( 242 f"The maximum number of pages on a test read needs to be strictly positive. Got {maximum_number_of_pages}" 243 ) 244 self._maximum_number_of_pages = maximum_number_of_pages 245 self._decorated = decorated 246 self._page_count = self._PAGE_COUNT_BEFORE_FIRST_NEXT_CALL
248 def get_initial_token(self) -> Optional[Any]: 249 self._page_count = self._PAGE_COUNT_BEFORE_FIRST_NEXT_CALL 250 return self._decorated.get_initial_token()
Get the page token that should be included in the request to get the first page of records
252 def next_page_token( 253 self, 254 response: requests.Response, 255 last_page_size: int, 256 last_record: Optional[Record], 257 last_page_token_value: Optional[Any] = None, 258 ) -> Optional[Mapping[str, Any]]: 259 if self._page_count >= self._maximum_number_of_pages: 260 return None 261 262 self._page_count += 1 263 return self._decorated.next_page_token( 264 response, last_page_size, last_record, last_page_token_value 265 )
Returns the next_page_token to use to fetch the next page of records.
Parameters
- response: the response to process
- last_page_size: the number of records read from the response
- last_record: the last record extracted from the response
- last_page_token_value: The current value of the page token made on the last request
Returns
A mapping {"next_page_token":
} for the next page from the input response object. Returning None means there are no more pages to read in this response.
267 def path( 268 self, 269 next_page_token: Optional[Mapping[str, Any]], 270 stream_state: Optional[Mapping[str, Any]] = None, 271 stream_slice: Optional[StreamSlice] = None, 272 ) -> Optional[str]: 273 return self._decorated.path( 274 next_page_token=next_page_token, 275 stream_state=stream_state, 276 stream_slice=stream_slice, 277 )
Returns the URL path to hit to fetch the next page of records
e.g: if you wanted to hit https://myapi.com/v1/some_entity then this will return "some_entity"
Returns
path to hit to fetch the next request. Returning None means the path is not defined by the next_page_token
279 def get_request_params( 280 self, 281 *, 282 stream_state: Optional[StreamState] = None, 283 stream_slice: Optional[StreamSlice] = None, 284 next_page_token: Optional[Mapping[str, Any]] = None, 285 ) -> Mapping[str, Any]: 286 return self._decorated.get_request_params( 287 stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token 288 )
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
290 def get_request_headers( 291 self, 292 *, 293 stream_state: Optional[StreamState] = None, 294 stream_slice: Optional[StreamSlice] = None, 295 next_page_token: Optional[Mapping[str, Any]] = None, 296 ) -> Mapping[str, str]: 297 return self._decorated.get_request_headers( 298 stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token 299 )
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
301 def get_request_body_data( 302 self, 303 *, 304 stream_state: Optional[StreamState] = None, 305 stream_slice: Optional[StreamSlice] = None, 306 next_page_token: Optional[Mapping[str, Any]] = None, 307 ) -> Union[Mapping[str, Any], str]: 308 return self._decorated.get_request_body_data( 309 stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token 310 )
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
312 def get_request_body_json( 313 self, 314 *, 315 stream_state: Optional[StreamState] = None, 316 stream_slice: Optional[StreamSlice] = None, 317 next_page_token: Optional[Mapping[str, Any]] = None, 318 ) -> Mapping[str, Any]: 319 return self._decorated.get_request_body_json( 320 stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token 321 )
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.