airbyte_cdk.sources.declarative.requesters.paginators
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from airbyte_cdk.sources.declarative.requesters.paginators.default_paginator import ( 6 DefaultPaginator, 7 PaginatorTestReadDecorator, 8) 9from airbyte_cdk.sources.declarative.requesters.paginators.no_pagination import NoPagination 10from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator 11from airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy import ( 12 PaginationStrategy, 13) 14 15__all__ = [ 16 "DefaultPaginator", 17 "NoPagination", 18 "PaginationStrategy", 19 "Paginator", 20 "PaginatorTestReadDecorator", 21]
33@dataclass 34class DefaultPaginator(Paginator): 35 """ 36 Default paginator to request pages of results with a fixed size until the pagination strategy no longer returns a next_page_token 37 38 Examples: 39 1. 40 * fetches up to 10 records at a time by setting the "limit" request param to 10 41 * updates the request path with "{{ response._metadata.next }}" 42 ``` 43 paginator: 44 type: "DefaultPaginator" 45 page_size_option: 46 type: RequestOption 47 inject_into: request_parameter 48 field_name: limit 49 page_token_option: 50 type: RequestPath 51 path: "location" 52 pagination_strategy: 53 type: "CursorPagination" 54 cursor_value: "{{ response._metadata.next }}" 55 page_size: 10 56 ``` 57 58 2. 59 * fetches up to 5 records at a time by setting the "page_size" header to 5 60 * increments a record counter and set the request parameter "offset" to the value of the counter 61 ``` 62 paginator: 63 type: "DefaultPaginator" 64 page_size_option: 65 type: RequestOption 66 inject_into: header 67 field_name: page_size 68 pagination_strategy: 69 type: "OffsetIncrement" 70 page_size: 5 71 page_token_option: 72 option_type: "request_parameter" 73 field_name: "offset" 74 ``` 75 76 3. 77 * fetches up to 5 records at a time by setting the "page_size" request param to 5 78 * increments a page counter and set the request parameter "page" to the value of the counter 79 ``` 80 paginator: 81 type: "DefaultPaginator" 82 page_size_option: 83 type: RequestOption 84 inject_into: request_parameter 85 field_name: page_size 86 pagination_strategy: 87 type: "PageIncrement" 88 page_size: 5 89 page_token_option: 90 type: RequestOption 91 option_type: "request_parameter" 92 field_name: "page" 93 ``` 94 Attributes: 95 page_size_option (Optional[RequestOption]): the request option to set the page size. Cannot be injected in the path. 96 page_token_option (Optional[RequestPath, RequestOption]): the request option to set the page token 97 pagination_strategy (PaginationStrategy): Strategy defining how to get the next page token 98 config (Config): connection config 99 url_base (Union[InterpolatedString, str]): endpoint's base url 100 decoder (Decoder): decoder to decode the response 101 """ 102 103 pagination_strategy: PaginationStrategy 104 config: Config 105 url_base: Union[InterpolatedString, str] 106 parameters: InitVar[Mapping[str, Any]] 107 decoder: Decoder = field( 108 default_factory=lambda: PaginationDecoderDecorator(decoder=JsonDecoder(parameters={})) 109 ) 110 page_size_option: Optional[RequestOption] = None 111 page_token_option: Optional[Union[RequestPath, RequestOption]] = None 112 113 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 114 if self.page_size_option and not self.pagination_strategy.get_page_size(): 115 raise ValueError( 116 "page_size_option cannot be set if the pagination strategy does not have a page_size" 117 ) 118 if isinstance(self.url_base, str): 119 self.url_base = InterpolatedString(string=self.url_base, parameters=parameters) 120 121 if self.page_token_option and not isinstance(self.page_token_option, RequestPath): 122 _validate_component_request_option_paths( 123 self.config, 124 self.page_size_option, 125 self.page_token_option, 126 ) 127 128 def get_initial_token(self) -> Optional[Any]: 129 """ 130 Return the page token that should be used for the first request of a stream 131 132 WARNING: get_initial_token() should not be used by streams that use RFR that perform checkpointing 133 of state using page numbers. Because paginators are stateless 134 """ 135 return self.pagination_strategy.initial_token 136 137 def next_page_token( 138 self, 139 response: requests.Response, 140 last_page_size: int, 141 last_record: Optional[Record], 142 last_page_token_value: Optional[Any] = None, 143 ) -> Optional[Mapping[str, Any]]: 144 next_page_token = self.pagination_strategy.next_page_token( 145 response=response, 146 last_page_size=last_page_size, 147 last_record=last_record, 148 last_page_token_value=last_page_token_value, 149 ) 150 if next_page_token: 151 return {"next_page_token": next_page_token} 152 else: 153 return None 154 155 def path( 156 self, 157 next_page_token: Optional[Mapping[str, Any]], 158 stream_state: Optional[Mapping[str, Any]] = None, 159 stream_slice: Optional[StreamSlice] = None, 160 ) -> Optional[str]: 161 token = next_page_token.get("next_page_token") if next_page_token else None 162 if token and self.page_token_option and isinstance(self.page_token_option, RequestPath): 163 # make additional interpolation context 164 interpolation_context = get_interpolation_context( 165 stream_state=stream_state, 166 stream_slice=stream_slice, 167 next_page_token=next_page_token, 168 ) 169 # Replace url base to only return the path 170 return str(token).replace(self.url_base.eval(self.config, **interpolation_context), "") # type: ignore # url_base is casted to a InterpolatedString in __post_init__ 171 else: 172 return None 173 174 def get_request_params( 175 self, 176 *, 177 stream_state: Optional[StreamState] = None, 178 stream_slice: Optional[StreamSlice] = None, 179 next_page_token: Optional[Mapping[str, Any]] = None, 180 ) -> MutableMapping[str, Any]: 181 return self._get_request_options(RequestOptionType.request_parameter, next_page_token) 182 183 def get_request_headers( 184 self, 185 *, 186 stream_state: Optional[StreamState] = None, 187 stream_slice: Optional[StreamSlice] = None, 188 next_page_token: Optional[Mapping[str, Any]] = None, 189 ) -> Mapping[str, str]: 190 return self._get_request_options(RequestOptionType.header, next_page_token) 191 192 def get_request_body_data( 193 self, 194 *, 195 stream_state: Optional[StreamState] = None, 196 stream_slice: Optional[StreamSlice] = None, 197 next_page_token: Optional[Mapping[str, Any]] = None, 198 ) -> Mapping[str, Any]: 199 return self._get_request_options(RequestOptionType.body_data, next_page_token) 200 201 def get_request_body_json( 202 self, 203 *, 204 stream_state: Optional[StreamState] = None, 205 stream_slice: Optional[StreamSlice] = None, 206 next_page_token: Optional[Mapping[str, Any]] = None, 207 ) -> Mapping[str, Any]: 208 return self._get_request_options(RequestOptionType.body_json, next_page_token) 209 210 def _get_request_options( 211 self, option_type: RequestOptionType, next_page_token: Optional[Mapping[str, Any]] 212 ) -> MutableMapping[str, Any]: 213 options: MutableMapping[str, Any] = {} 214 215 token = next_page_token.get("next_page_token") if next_page_token else None 216 if ( 217 self.page_token_option 218 and token is not None 219 and isinstance(self.page_token_option, RequestOption) 220 and self.page_token_option.inject_into == option_type 221 ): 222 self.page_token_option.inject_into_request(options, token, self.config) 223 224 if ( 225 self.page_size_option 226 and self.pagination_strategy.get_page_size() 227 and self.page_size_option.inject_into == option_type 228 ): 229 page_size = self.pagination_strategy.get_page_size() 230 self.page_size_option.inject_into_request(options, page_size, self.config) 231 232 return options
Default paginator to request pages of results with a fixed size until the pagination strategy no longer returns a next_page_token
Examples:
1.
- fetches up to 10 records at a time by setting the "limit" request param to 10
- updates the request path with "{{ response._metadata.next }}"
paginator: type: "DefaultPaginator" page_size_option: type: RequestOption inject_into: request_parameter field_name: limit page_token_option: type: RequestPath path: "location" pagination_strategy: type: "CursorPagination" cursor_value: "{{ response._metadata.next }}" page_size: 10
2.
- fetches up to 5 records at a time by setting the "page_size" header to 5
- increments a record counter and set the request parameter "offset" to the value of the counter
paginator: type: "DefaultPaginator" page_size_option: type: RequestOption inject_into: header field_name: page_size pagination_strategy: type: "OffsetIncrement" page_size: 5 page_token_option: option_type: "request_parameter" field_name: "offset"
3.
- fetches up to 5 records at a time by setting the "page_size" request param to 5
- increments a page counter and set the request parameter "page" to the value of the counter
paginator: type: "DefaultPaginator" page_size_option: type: RequestOption inject_into: request_parameter field_name: page_size pagination_strategy: type: "PageIncrement" page_size: 5 page_token_option: type: RequestOption option_type: "request_parameter" field_name: "page"
Attributes:
- page_size_option (Optional[RequestOption]): the request option to set the page size. Cannot be injected in the path.
- page_token_option (Optional[RequestPath, RequestOption]): the request option to set the page token
- pagination_strategy (PaginationStrategy): Strategy defining how to get the next page token
- config (Config): connection config
- url_base (Union[InterpolatedString, str]): endpoint's base url
- decoder (Decoder): decoder to decode the response
128 def get_initial_token(self) -> Optional[Any]: 129 """ 130 Return the page token that should be used for the first request of a stream 131 132 WARNING: get_initial_token() should not be used by streams that use RFR that perform checkpointing 133 of state using page numbers. Because paginators are stateless 134 """ 135 return self.pagination_strategy.initial_token
Return the page token that should be used for the first request of a stream
WARNING: get_initial_token() should not be used by streams that use RFR that perform checkpointing of state using page numbers. Because paginators are stateless
137 def next_page_token( 138 self, 139 response: requests.Response, 140 last_page_size: int, 141 last_record: Optional[Record], 142 last_page_token_value: Optional[Any] = None, 143 ) -> Optional[Mapping[str, Any]]: 144 next_page_token = self.pagination_strategy.next_page_token( 145 response=response, 146 last_page_size=last_page_size, 147 last_record=last_record, 148 last_page_token_value=last_page_token_value, 149 ) 150 if next_page_token: 151 return {"next_page_token": next_page_token} 152 else: 153 return None
Returns the next_page_token to use to fetch the next page of records.
Parameters
- response: the response to process
- last_page_size: the number of records read from the response
- last_record: the last record extracted from the response
- last_page_token_value: The current value of the page token made on the last request
Returns
A mapping {"next_page_token":
} for the next page from the input response object. Returning None means there are no more pages to read in this response.
155 def path( 156 self, 157 next_page_token: Optional[Mapping[str, Any]], 158 stream_state: Optional[Mapping[str, Any]] = None, 159 stream_slice: Optional[StreamSlice] = None, 160 ) -> Optional[str]: 161 token = next_page_token.get("next_page_token") if next_page_token else None 162 if token and self.page_token_option and isinstance(self.page_token_option, RequestPath): 163 # make additional interpolation context 164 interpolation_context = get_interpolation_context( 165 stream_state=stream_state, 166 stream_slice=stream_slice, 167 next_page_token=next_page_token, 168 ) 169 # Replace url base to only return the path 170 return str(token).replace(self.url_base.eval(self.config, **interpolation_context), "") # type: ignore # url_base is casted to a InterpolatedString in __post_init__ 171 else: 172 return None
Returns the URL path to hit to fetch the next page of records
e.g: if you wanted to hit https://myapi.com/v1/some_entity then this will return "some_entity"
Returns
path to hit to fetch the next request. Returning None means the path is not defined by the next_page_token
174 def get_request_params( 175 self, 176 *, 177 stream_state: Optional[StreamState] = None, 178 stream_slice: Optional[StreamSlice] = None, 179 next_page_token: Optional[Mapping[str, Any]] = None, 180 ) -> MutableMapping[str, Any]: 181 return self._get_request_options(RequestOptionType.request_parameter, next_page_token)
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
183 def get_request_headers( 184 self, 185 *, 186 stream_state: Optional[StreamState] = None, 187 stream_slice: Optional[StreamSlice] = None, 188 next_page_token: Optional[Mapping[str, Any]] = None, 189 ) -> Mapping[str, str]: 190 return self._get_request_options(RequestOptionType.header, next_page_token)
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
192 def get_request_body_data( 193 self, 194 *, 195 stream_state: Optional[StreamState] = None, 196 stream_slice: Optional[StreamSlice] = None, 197 next_page_token: Optional[Mapping[str, Any]] = None, 198 ) -> Mapping[str, Any]: 199 return self._get_request_options(RequestOptionType.body_data, next_page_token)
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
201 def get_request_body_json( 202 self, 203 *, 204 stream_state: Optional[StreamState] = None, 205 stream_slice: Optional[StreamSlice] = None, 206 next_page_token: Optional[Mapping[str, Any]] = None, 207 ) -> Mapping[str, Any]: 208 return self._get_request_options(RequestOptionType.body_json, next_page_token)
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
15@dataclass 16class NoPagination(Paginator): 17 """ 18 Pagination implementation that never returns a next page. 19 """ 20 21 parameters: InitVar[Mapping[str, Any]] 22 23 def path( 24 self, 25 next_page_token: Optional[Mapping[str, Any]], 26 stream_state: Optional[Mapping[str, Any]] = None, 27 stream_slice: Optional[StreamSlice] = None, 28 ) -> Optional[str]: 29 return None 30 31 def get_request_params( 32 self, 33 *, 34 stream_state: Optional[StreamState] = None, 35 stream_slice: Optional[StreamSlice] = None, 36 next_page_token: Optional[Mapping[str, Any]] = None, 37 ) -> MutableMapping[str, Any]: 38 return {} 39 40 def get_request_headers( 41 self, 42 *, 43 stream_state: Optional[StreamState] = None, 44 stream_slice: Optional[StreamSlice] = None, 45 next_page_token: Optional[Mapping[str, Any]] = None, 46 ) -> Mapping[str, str]: 47 return {} 48 49 def get_request_body_data( 50 self, 51 *, 52 stream_state: Optional[StreamState] = None, 53 stream_slice: Optional[StreamSlice] = None, 54 next_page_token: Optional[Mapping[str, Any]] = None, 55 ) -> Union[Mapping[str, Any], str]: 56 return {} 57 58 def get_request_body_json( 59 self, 60 *, 61 stream_state: Optional[StreamState] = None, 62 stream_slice: Optional[StreamSlice] = None, 63 next_page_token: Optional[Mapping[str, Any]] = None, 64 ) -> Mapping[str, Any]: 65 return {} 66 67 def get_initial_token(self) -> Optional[Any]: 68 return None 69 70 def next_page_token( 71 self, 72 response: requests.Response, 73 last_page_size: int, 74 last_record: Optional[Record], 75 last_page_token_value: Optional[Any], 76 ) -> Optional[Mapping[str, Any]]: 77 return {}
Pagination implementation that never returns a next page.
23 def path( 24 self, 25 next_page_token: Optional[Mapping[str, Any]], 26 stream_state: Optional[Mapping[str, Any]] = None, 27 stream_slice: Optional[StreamSlice] = None, 28 ) -> Optional[str]: 29 return None
Returns the URL path to hit to fetch the next page of records
e.g: if you wanted to hit https://myapi.com/v1/some_entity then this will return "some_entity"
Returns
path to hit to fetch the next request. Returning None means the path is not defined by the next_page_token
31 def get_request_params( 32 self, 33 *, 34 stream_state: Optional[StreamState] = None, 35 stream_slice: Optional[StreamSlice] = None, 36 next_page_token: Optional[Mapping[str, Any]] = None, 37 ) -> MutableMapping[str, Any]: 38 return {}
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
40 def get_request_headers( 41 self, 42 *, 43 stream_state: Optional[StreamState] = None, 44 stream_slice: Optional[StreamSlice] = None, 45 next_page_token: Optional[Mapping[str, Any]] = None, 46 ) -> Mapping[str, str]: 47 return {}
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
49 def get_request_body_data( 50 self, 51 *, 52 stream_state: Optional[StreamState] = None, 53 stream_slice: Optional[StreamSlice] = None, 54 next_page_token: Optional[Mapping[str, Any]] = None, 55 ) -> Union[Mapping[str, Any], str]: 56 return {}
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
58 def get_request_body_json( 59 self, 60 *, 61 stream_state: Optional[StreamState] = None, 62 stream_slice: Optional[StreamSlice] = None, 63 next_page_token: Optional[Mapping[str, Any]] = None, 64 ) -> Mapping[str, Any]: 65 return {}
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
Get the page token that should be included in the request to get the first page of records
70 def next_page_token( 71 self, 72 response: requests.Response, 73 last_page_size: int, 74 last_record: Optional[Record], 75 last_page_token_value: Optional[Any], 76 ) -> Optional[Mapping[str, Any]]: 77 return {}
Returns the next_page_token to use to fetch the next page of records.
Parameters
- response: the response to process
- last_page_size: the number of records read from the response
- last_record: the last record extracted from the response
- last_page_token_value: The current value of the page token made on the last request
Returns
A mapping {"next_page_token":
} for the next page from the input response object. Returning None means there are no more pages to read in this response.
15@dataclass 16class PaginationStrategy: 17 """ 18 Defines how to get the next page token 19 """ 20 21 @property 22 @abstractmethod 23 def initial_token(self) -> Optional[Any]: 24 """ 25 Return the initial value of the token 26 """ 27 28 @abstractmethod 29 def next_page_token( 30 self, 31 response: requests.Response, 32 last_page_size: int, 33 last_record: Optional[Record], 34 last_page_token_value: Optional[Any], 35 ) -> Optional[Any]: 36 """ 37 :param response: response to process 38 :param last_page_size: the number of records read from the response 39 :param last_record: the last record extracted from the response 40 :param last_page_token_value: The current value of the page token made on the last request 41 :return: next page token. Returns None if there are no more pages to fetch 42 """ 43 pass 44 45 @abstractmethod 46 def get_page_size(self) -> Optional[int]: 47 """ 48 :return: page size: The number of records to fetch in a page. Returns None if unspecified 49 """
Defines how to get the next page token
21 @property 22 @abstractmethod 23 def initial_token(self) -> Optional[Any]: 24 """ 25 Return the initial value of the token 26 """
Return the initial value of the token
28 @abstractmethod 29 def next_page_token( 30 self, 31 response: requests.Response, 32 last_page_size: int, 33 last_record: Optional[Record], 34 last_page_token_value: Optional[Any], 35 ) -> Optional[Any]: 36 """ 37 :param response: response to process 38 :param last_page_size: the number of records read from the response 39 :param last_record: the last record extracted from the response 40 :param last_page_token_value: The current value of the page token made on the last request 41 :return: next page token. Returns None if there are no more pages to fetch 42 """ 43 pass
Parameters
- response: response to process
- last_page_size: the number of records read from the response
- last_record: the last record extracted from the response
- last_page_token_value: The current value of the page token made on the last request
Returns
next page token. Returns None if there are no more pages to fetch
45 @abstractmethod 46 def get_page_size(self) -> Optional[int]: 47 """ 48 :return: page size: The number of records to fetch in a page. Returns None if unspecified 49 """
Returns
page size: The number of records to fetch in a page. Returns None if unspecified
18@dataclass 19class Paginator(ABC, RequestOptionsProvider): 20 """ 21 Defines the token to use to fetch the next page of records from the API. 22 23 If needed, the Paginator will set request options to be set on the HTTP request to fetch the next page of records. 24 If the next_page_token is the path to the next page of records, then it should be accessed through the `path` method 25 """ 26 27 @abstractmethod 28 def get_initial_token(self) -> Optional[Any]: 29 """ 30 Get the page token that should be included in the request to get the first page of records 31 """ 32 33 @abstractmethod 34 def next_page_token( 35 self, 36 response: requests.Response, 37 last_page_size: int, 38 last_record: Optional[Record], 39 last_page_token_value: Optional[Any], 40 ) -> Optional[Mapping[str, Any]]: 41 """ 42 Returns the next_page_token to use to fetch the next page of records. 43 44 :param response: the response to process 45 :param last_page_size: the number of records read from the response 46 :param last_record: the last record extracted from the response 47 :param last_page_token_value: The current value of the page token made on the last request 48 :return: A mapping {"next_page_token": <token>} for the next page from the input response object. Returning None means there are no more pages to read in this response. 49 """ 50 pass 51 52 @abstractmethod 53 def path( 54 self, 55 next_page_token: Optional[Mapping[str, Any]], 56 stream_state: Optional[Mapping[str, Any]] = None, 57 stream_slice: Optional[StreamSlice] = None, 58 ) -> Optional[str]: 59 """ 60 Returns the URL path to hit to fetch the next page of records 61 62 e.g: if you wanted to hit https://myapi.com/v1/some_entity then this will return "some_entity" 63 64 :return: path to hit to fetch the next request. Returning None means the path is not defined by the next_page_token 65 """ 66 pass
Defines the token to use to fetch the next page of records from the API.
If needed, the Paginator will set request options to be set on the HTTP request to fetch the next page of records.
If the next_page_token is the path to the next page of records, then it should be accessed through the path
method
27 @abstractmethod 28 def get_initial_token(self) -> Optional[Any]: 29 """ 30 Get the page token that should be included in the request to get the first page of records 31 """
Get the page token that should be included in the request to get the first page of records
33 @abstractmethod 34 def next_page_token( 35 self, 36 response: requests.Response, 37 last_page_size: int, 38 last_record: Optional[Record], 39 last_page_token_value: Optional[Any], 40 ) -> Optional[Mapping[str, Any]]: 41 """ 42 Returns the next_page_token to use to fetch the next page of records. 43 44 :param response: the response to process 45 :param last_page_size: the number of records read from the response 46 :param last_record: the last record extracted from the response 47 :param last_page_token_value: The current value of the page token made on the last request 48 :return: A mapping {"next_page_token": <token>} for the next page from the input response object. Returning None means there are no more pages to read in this response. 49 """ 50 pass
Returns the next_page_token to use to fetch the next page of records.
Parameters
- response: the response to process
- last_page_size: the number of records read from the response
- last_record: the last record extracted from the response
- last_page_token_value: The current value of the page token made on the last request
Returns
A mapping {"next_page_token":
} for the next page from the input response object. Returning None means there are no more pages to read in this response.
52 @abstractmethod 53 def path( 54 self, 55 next_page_token: Optional[Mapping[str, Any]], 56 stream_state: Optional[Mapping[str, Any]] = None, 57 stream_slice: Optional[StreamSlice] = None, 58 ) -> Optional[str]: 59 """ 60 Returns the URL path to hit to fetch the next page of records 61 62 e.g: if you wanted to hit https://myapi.com/v1/some_entity then this will return "some_entity" 63 64 :return: path to hit to fetch the next request. Returning None means the path is not defined by the next_page_token 65 """ 66 pass
Returns the URL path to hit to fetch the next page of records
e.g: if you wanted to hit https://myapi.com/v1/some_entity then this will return "some_entity"
Returns
path to hit to fetch the next request. Returning None means the path is not defined by the next_page_token
235class PaginatorTestReadDecorator(Paginator): 236 """ 237 In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of 238 pages that are queried throughout a read command. 239 240 WARNING: This decorator is not currently thread-safe like the rest of the low-code framework because it has 241 an internal state to track the current number of pages counted so that it can exit early during a test read 242 """ 243 244 _PAGE_COUNT_BEFORE_FIRST_NEXT_CALL = 1 245 246 def __init__(self, decorated: Paginator, maximum_number_of_pages: int = 5) -> None: 247 if maximum_number_of_pages and maximum_number_of_pages < 1: 248 raise ValueError( 249 f"The maximum number of pages on a test read needs to be strictly positive. Got {maximum_number_of_pages}" 250 ) 251 self._maximum_number_of_pages = maximum_number_of_pages 252 self._decorated = decorated 253 self._page_count = self._PAGE_COUNT_BEFORE_FIRST_NEXT_CALL 254 255 def get_initial_token(self) -> Optional[Any]: 256 self._page_count = self._PAGE_COUNT_BEFORE_FIRST_NEXT_CALL 257 return self._decorated.get_initial_token() 258 259 def next_page_token( 260 self, 261 response: requests.Response, 262 last_page_size: int, 263 last_record: Optional[Record], 264 last_page_token_value: Optional[Any] = None, 265 ) -> Optional[Mapping[str, Any]]: 266 if self._page_count >= self._maximum_number_of_pages: 267 return None 268 269 self._page_count += 1 270 return self._decorated.next_page_token( 271 response, last_page_size, last_record, last_page_token_value 272 ) 273 274 def path( 275 self, 276 next_page_token: Optional[Mapping[str, Any]], 277 stream_state: Optional[Mapping[str, Any]] = None, 278 stream_slice: Optional[StreamSlice] = None, 279 ) -> Optional[str]: 280 return self._decorated.path( 281 next_page_token=next_page_token, 282 stream_state=stream_state, 283 stream_slice=stream_slice, 284 ) 285 286 def get_request_params( 287 self, 288 *, 289 stream_state: Optional[StreamState] = None, 290 stream_slice: Optional[StreamSlice] = None, 291 next_page_token: Optional[Mapping[str, Any]] = None, 292 ) -> Mapping[str, Any]: 293 return self._decorated.get_request_params( 294 stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token 295 ) 296 297 def get_request_headers( 298 self, 299 *, 300 stream_state: Optional[StreamState] = None, 301 stream_slice: Optional[StreamSlice] = None, 302 next_page_token: Optional[Mapping[str, Any]] = None, 303 ) -> Mapping[str, str]: 304 return self._decorated.get_request_headers( 305 stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token 306 ) 307 308 def get_request_body_data( 309 self, 310 *, 311 stream_state: Optional[StreamState] = None, 312 stream_slice: Optional[StreamSlice] = None, 313 next_page_token: Optional[Mapping[str, Any]] = None, 314 ) -> Union[Mapping[str, Any], str]: 315 return self._decorated.get_request_body_data( 316 stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token 317 ) 318 319 def get_request_body_json( 320 self, 321 *, 322 stream_state: Optional[StreamState] = None, 323 stream_slice: Optional[StreamSlice] = None, 324 next_page_token: Optional[Mapping[str, Any]] = None, 325 ) -> Mapping[str, Any]: 326 return self._decorated.get_request_body_json( 327 stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token 328 )
In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of pages that are queried throughout a read command.
WARNING: This decorator is not currently thread-safe like the rest of the low-code framework because it has an internal state to track the current number of pages counted so that it can exit early during a test read
246 def __init__(self, decorated: Paginator, maximum_number_of_pages: int = 5) -> None: 247 if maximum_number_of_pages and maximum_number_of_pages < 1: 248 raise ValueError( 249 f"The maximum number of pages on a test read needs to be strictly positive. Got {maximum_number_of_pages}" 250 ) 251 self._maximum_number_of_pages = maximum_number_of_pages 252 self._decorated = decorated 253 self._page_count = self._PAGE_COUNT_BEFORE_FIRST_NEXT_CALL
255 def get_initial_token(self) -> Optional[Any]: 256 self._page_count = self._PAGE_COUNT_BEFORE_FIRST_NEXT_CALL 257 return self._decorated.get_initial_token()
Get the page token that should be included in the request to get the first page of records
259 def next_page_token( 260 self, 261 response: requests.Response, 262 last_page_size: int, 263 last_record: Optional[Record], 264 last_page_token_value: Optional[Any] = None, 265 ) -> Optional[Mapping[str, Any]]: 266 if self._page_count >= self._maximum_number_of_pages: 267 return None 268 269 self._page_count += 1 270 return self._decorated.next_page_token( 271 response, last_page_size, last_record, last_page_token_value 272 )
Returns the next_page_token to use to fetch the next page of records.
Parameters
- response: the response to process
- last_page_size: the number of records read from the response
- last_record: the last record extracted from the response
- last_page_token_value: The current value of the page token made on the last request
Returns
A mapping {"next_page_token":
} for the next page from the input response object. Returning None means there are no more pages to read in this response.
274 def path( 275 self, 276 next_page_token: Optional[Mapping[str, Any]], 277 stream_state: Optional[Mapping[str, Any]] = None, 278 stream_slice: Optional[StreamSlice] = None, 279 ) -> Optional[str]: 280 return self._decorated.path( 281 next_page_token=next_page_token, 282 stream_state=stream_state, 283 stream_slice=stream_slice, 284 )
Returns the URL path to hit to fetch the next page of records
e.g: if you wanted to hit https://myapi.com/v1/some_entity then this will return "some_entity"
Returns
path to hit to fetch the next request. Returning None means the path is not defined by the next_page_token
286 def get_request_params( 287 self, 288 *, 289 stream_state: Optional[StreamState] = None, 290 stream_slice: Optional[StreamSlice] = None, 291 next_page_token: Optional[Mapping[str, Any]] = None, 292 ) -> Mapping[str, Any]: 293 return self._decorated.get_request_params( 294 stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token 295 )
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
297 def get_request_headers( 298 self, 299 *, 300 stream_state: Optional[StreamState] = None, 301 stream_slice: Optional[StreamSlice] = None, 302 next_page_token: Optional[Mapping[str, Any]] = None, 303 ) -> Mapping[str, str]: 304 return self._decorated.get_request_headers( 305 stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token 306 )
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
308 def get_request_body_data( 309 self, 310 *, 311 stream_state: Optional[StreamState] = None, 312 stream_slice: Optional[StreamSlice] = None, 313 next_page_token: Optional[Mapping[str, Any]] = None, 314 ) -> Union[Mapping[str, Any], str]: 315 return self._decorated.get_request_body_data( 316 stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token 317 )
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
319 def get_request_body_json( 320 self, 321 *, 322 stream_state: Optional[StreamState] = None, 323 stream_slice: Optional[StreamSlice] = None, 324 next_page_token: Optional[Mapping[str, Any]] = None, 325 ) -> Mapping[str, Any]: 326 return self._decorated.get_request_body_json( 327 stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token 328 )
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.