airbyte_cdk.sources.declarative.requesters
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from airbyte_cdk.sources.declarative.requesters.http_requester import HttpRequester 6from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption 7from airbyte_cdk.sources.declarative.requesters.requester import Requester 8 9__all__ = ["HttpRequester", "RequestOption", "Requester"]
38@dataclass 39class HttpRequester(Requester): 40 """ 41 Default implementation of a Requester 42 43 Attributes: 44 name (str): Name of the stream. Only used for request/response caching 45 url_base (Union[InterpolatedString, str]): Base url to send requests to 46 path (Union[InterpolatedString, str]): Path to send requests to 47 http_method (Union[str, HttpMethod]): HTTP method to use when sending requests 48 request_options_provider (Optional[InterpolatedRequestOptionsProvider]): request option provider defining the options to set on outgoing requests 49 authenticator (DeclarativeAuthenticator): Authenticator defining how to authenticate to the source 50 error_handler (Optional[ErrorHandler]): Error handler defining how to detect and handle errors 51 backoff_strategies (Optional[List[BackoffStrategy]]): List of backoff strategies to use when retrying requests 52 config (Config): The user-provided configuration as specified by the source's spec 53 use_cache (bool): Indicates that data should be cached for this stream 54 """ 55 56 name: str 57 url_base: Union[InterpolatedString, str] 58 config: Config 59 parameters: InitVar[Mapping[str, Any]] 60 61 path: Optional[Union[InterpolatedString, str]] = None 62 authenticator: Optional[DeclarativeAuthenticator] = None 63 http_method: Union[str, HttpMethod] = HttpMethod.GET 64 request_options_provider: Optional[InterpolatedRequestOptionsProvider] = None 65 error_handler: Optional[ErrorHandler] = None 66 api_budget: Optional[APIBudget] = None 67 disable_retries: bool = False 68 message_repository: MessageRepository = NoopMessageRepository() 69 use_cache: bool = False 70 _exit_on_rate_limit: bool = False 71 stream_response: bool = False 72 decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={})) 73 74 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 75 self._url_base = InterpolatedString.create(self.url_base, parameters=parameters) 76 self._path = InterpolatedString.create( 77 self.path if self.path else EmptyString, parameters=parameters 78 ) 79 if self.request_options_provider is None: 80 self._request_options_provider = InterpolatedRequestOptionsProvider( 81 config=self.config, parameters=parameters 82 ) 83 elif isinstance(self.request_options_provider, dict): 84 self._request_options_provider = InterpolatedRequestOptionsProvider( 85 config=self.config, **self.request_options_provider 86 ) 87 else: 88 self._request_options_provider = self.request_options_provider 89 self._authenticator = self.authenticator or NoAuth(parameters=parameters) 90 self._http_method = ( 91 HttpMethod[self.http_method] if isinstance(self.http_method, str) else self.http_method 92 ) 93 self.error_handler = self.error_handler 94 self._parameters = parameters 95 96 if self.error_handler is not None and hasattr(self.error_handler, "backoff_strategies"): 97 backoff_strategies = self.error_handler.backoff_strategies # type: ignore 98 else: 99 backoff_strategies = None 100 101 self._http_client = HttpClient( 102 name=self.name, 103 logger=self.logger, 104 error_handler=self.error_handler, 105 api_budget=self.api_budget, 106 authenticator=self._authenticator, 107 use_cache=self.use_cache, 108 backoff_strategy=backoff_strategies, 109 disable_retries=self.disable_retries, 110 message_repository=self.message_repository, 111 ) 112 113 @property 114 def exit_on_rate_limit(self) -> bool: 115 return self._exit_on_rate_limit 116 117 @exit_on_rate_limit.setter 118 def exit_on_rate_limit(self, value: bool) -> None: 119 self._exit_on_rate_limit = value 120 121 def get_authenticator(self) -> DeclarativeAuthenticator: 122 return self._authenticator 123 124 def get_url_base( 125 self, 126 *, 127 stream_state: Optional[StreamState] = None, 128 stream_slice: Optional[StreamSlice] = None, 129 next_page_token: Optional[Mapping[str, Any]] = None, 130 ) -> str: 131 interpolation_context = get_interpolation_context( 132 stream_state=stream_state, 133 stream_slice=stream_slice, 134 next_page_token=next_page_token, 135 ) 136 return str(self._url_base.eval(self.config, **interpolation_context)) 137 138 def get_path( 139 self, 140 *, 141 stream_state: Optional[StreamState] = None, 142 stream_slice: Optional[StreamSlice] = None, 143 next_page_token: Optional[Mapping[str, Any]] = None, 144 ) -> str: 145 interpolation_context = get_interpolation_context( 146 stream_state=stream_state, 147 stream_slice=stream_slice, 148 next_page_token=next_page_token, 149 ) 150 path = str(self._path.eval(self.config, **interpolation_context)) 151 return path.lstrip("/") 152 153 def get_method(self) -> HttpMethod: 154 return self._http_method 155 156 def get_request_params( 157 self, 158 *, 159 stream_state: Optional[StreamState] = None, 160 stream_slice: Optional[StreamSlice] = None, 161 next_page_token: Optional[Mapping[str, Any]] = None, 162 ) -> MutableMapping[str, Any]: 163 return self._request_options_provider.get_request_params( 164 stream_state=stream_state, 165 stream_slice=stream_slice, 166 next_page_token=next_page_token, 167 ) 168 169 def get_request_headers( 170 self, 171 *, 172 stream_state: Optional[StreamState] = None, 173 stream_slice: Optional[StreamSlice] = None, 174 next_page_token: Optional[Mapping[str, Any]] = None, 175 ) -> Mapping[str, Any]: 176 return self._request_options_provider.get_request_headers( 177 stream_state=stream_state, 178 stream_slice=stream_slice, 179 next_page_token=next_page_token, 180 ) 181 182 # fixing request options provider types has a lot of dependencies 183 def get_request_body_data( # type: ignore 184 self, 185 *, 186 stream_state: Optional[StreamState] = None, 187 stream_slice: Optional[StreamSlice] = None, 188 next_page_token: Optional[Mapping[str, Any]] = None, 189 ) -> Union[Mapping[str, Any], str]: 190 return ( 191 self._request_options_provider.get_request_body_data( 192 stream_state=stream_state, 193 stream_slice=stream_slice, 194 next_page_token=next_page_token, 195 ) 196 or {} 197 ) 198 199 # fixing request options provider types has a lot of dependencies 200 def get_request_body_json( # type: ignore 201 self, 202 *, 203 stream_state: Optional[StreamState] = None, 204 stream_slice: Optional[StreamSlice] = None, 205 next_page_token: Optional[Mapping[str, Any]] = None, 206 ) -> Optional[Mapping[str, Any]]: 207 return self._request_options_provider.get_request_body_json( 208 stream_state=stream_state, 209 stream_slice=stream_slice, 210 next_page_token=next_page_token, 211 ) 212 213 @property 214 def logger(self) -> logging.Logger: 215 return logging.getLogger(f"airbyte.HttpRequester.{self.name}") 216 217 def _get_request_options( 218 self, 219 stream_state: Optional[StreamState], 220 stream_slice: Optional[StreamSlice], 221 next_page_token: Optional[Mapping[str, Any]], 222 requester_method: Callable[..., Optional[Union[Mapping[str, Any], str]]], 223 auth_options_method: Callable[..., Optional[Union[Mapping[str, Any], str]]], 224 extra_options: Optional[Union[Mapping[str, Any], str]] = None, 225 ) -> Union[Mapping[str, Any], str]: 226 """ 227 Get the request_option from the requester, the authenticator and extra_options passed in. 228 Raise a ValueError if there's a key collision 229 Returned merged mapping otherwise 230 """ 231 232 is_body_json = requester_method.__name__ == "get_request_body_json" 233 234 return combine_mappings( 235 [ 236 requester_method( 237 stream_state=stream_state, 238 stream_slice=stream_slice, 239 next_page_token=next_page_token, 240 ), 241 auth_options_method(), 242 extra_options, 243 ], 244 allow_same_value_merge=is_body_json, 245 ) 246 247 def _request_headers( 248 self, 249 stream_state: Optional[StreamState] = None, 250 stream_slice: Optional[StreamSlice] = None, 251 next_page_token: Optional[Mapping[str, Any]] = None, 252 extra_headers: Optional[Mapping[str, Any]] = None, 253 ) -> Mapping[str, Any]: 254 """ 255 Specifies request headers. 256 Authentication headers will overwrite any overlapping headers returned from this method. 257 """ 258 headers = self._get_request_options( 259 stream_state, 260 stream_slice, 261 next_page_token, 262 self.get_request_headers, 263 self.get_authenticator().get_auth_header, 264 extra_headers, 265 ) 266 if isinstance(headers, str): 267 raise ValueError("Request headers cannot be a string") 268 return {str(k): str(v) for k, v in headers.items()} 269 270 def _request_params( 271 self, 272 stream_state: Optional[StreamState], 273 stream_slice: Optional[StreamSlice], 274 next_page_token: Optional[Mapping[str, Any]], 275 extra_params: Optional[Mapping[str, Any]] = None, 276 ) -> Mapping[str, Any]: 277 """ 278 Specifies the query parameters that should be set on an outgoing HTTP request given the inputs. 279 280 E.g: you might want to define query parameters for paging if next_page_token is not None. 281 """ 282 options = self._get_request_options( 283 stream_state, 284 stream_slice, 285 next_page_token, 286 self.get_request_params, 287 self.get_authenticator().get_request_params, 288 extra_params, 289 ) 290 if isinstance(options, str): 291 raise ValueError("Request params cannot be a string") 292 293 for k, v in options.items(): 294 if isinstance(v, (dict,)): 295 raise ValueError( 296 f"Invalid value for `{k}` parameter. The values of request params cannot be an object." 297 ) 298 299 return options 300 301 def _request_body_data( 302 self, 303 stream_state: Optional[StreamState], 304 stream_slice: Optional[StreamSlice], 305 next_page_token: Optional[Mapping[str, Any]], 306 extra_body_data: Optional[Union[Mapping[str, Any], str]] = None, 307 ) -> Optional[Union[Mapping[str, Any], str]]: 308 """ 309 Specifies how to populate the body of the request with a non-JSON payload. 310 311 If returns a ready text that it will be sent as is. 312 If returns a dict that it will be converted to a urlencoded form. 313 E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2" 314 315 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 316 """ 317 # Warning: use self.state instead of the stream_state passed as argument! 318 return self._get_request_options( 319 stream_state, 320 stream_slice, 321 next_page_token, 322 self.get_request_body_data, 323 self.get_authenticator().get_request_body_data, 324 extra_body_data, 325 ) 326 327 def _request_body_json( 328 self, 329 stream_state: Optional[StreamState], 330 stream_slice: Optional[StreamSlice], 331 next_page_token: Optional[Mapping[str, Any]], 332 extra_body_json: Optional[Mapping[str, Any]] = None, 333 ) -> Optional[Mapping[str, Any]]: 334 """ 335 Specifies how to populate the body of the request with a JSON payload. 336 337 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 338 """ 339 # Warning: use self.state instead of the stream_state passed as argument! 340 options = self._get_request_options( 341 stream_state, 342 stream_slice, 343 next_page_token, 344 self.get_request_body_json, 345 self.get_authenticator().get_request_body_json, 346 extra_body_json, 347 ) 348 if isinstance(options, str): 349 raise ValueError("Request body json cannot be a string") 350 return options 351 352 @classmethod 353 def _join_url(cls, url_base: str, path: str) -> str: 354 """ 355 Joins a base URL with a given path and returns the resulting URL with any trailing slash removed. 356 357 This method ensures that there are no duplicate slashes when concatenating the base URL and the path, 358 which is useful when the full URL is provided from an interpolation context. 359 360 Args: 361 url_base (str): The base URL to which the path will be appended. 362 path (str): The path to join with the base URL. 363 364 Returns: 365 str: The resulting joined URL. 366 367 Note: 368 Related issue: https://github.com/airbytehq/airbyte-internal-issues/issues/11869 369 - If the path is an empty string or None, the method returns the base URL with any trailing slash removed. 370 371 Example: 372 1) _join_url("https://example.com/api/", "endpoint") >> 'https://example.com/api/endpoint' 373 2) _join_url("https://example.com/api", "/endpoint") >> 'https://example.com/api/endpoint' 374 3) _join_url("https://example.com/api/", "") >> 'https://example.com/api/' 375 4) _join_url("https://example.com/api", None) >> 'https://example.com/api' 376 """ 377 378 # return a full-url if provided directly from interpolation context 379 if path == EmptyString or path is None: 380 return url_base 381 else: 382 # since we didn't provide a full-url, the url_base might not have a trailing slash 383 # so we join the url_base and path correctly 384 if not url_base.endswith("/"): 385 url_base += "/" 386 387 return urljoin(url_base, path) 388 389 def send_request( 390 self, 391 stream_state: Optional[StreamState] = None, 392 stream_slice: Optional[StreamSlice] = None, 393 next_page_token: Optional[Mapping[str, Any]] = None, 394 path: Optional[str] = None, 395 request_headers: Optional[Mapping[str, Any]] = None, 396 request_params: Optional[Mapping[str, Any]] = None, 397 request_body_data: Optional[Union[Mapping[str, Any], str]] = None, 398 request_body_json: Optional[Mapping[str, Any]] = None, 399 log_formatter: Optional[Callable[[requests.Response], Any]] = None, 400 ) -> Optional[requests.Response]: 401 request, response = self._http_client.send_request( 402 http_method=self.get_method().value, 403 url=self._join_url( 404 self.get_url_base( 405 stream_state=stream_state, 406 stream_slice=stream_slice, 407 next_page_token=next_page_token, 408 ), 409 path 410 or self.get_path( 411 stream_state=stream_state, 412 stream_slice=stream_slice, 413 next_page_token=next_page_token, 414 ), 415 ), 416 request_kwargs={"stream": self.stream_response}, 417 headers=self._request_headers( 418 stream_state, stream_slice, next_page_token, request_headers 419 ), 420 params=self._request_params( 421 stream_state, stream_slice, next_page_token, request_params 422 ), 423 json=self._request_body_json( 424 stream_state, stream_slice, next_page_token, request_body_json 425 ), 426 data=self._request_body_data( 427 stream_state, stream_slice, next_page_token, request_body_data 428 ), 429 dedupe_query_params=True, 430 log_formatter=log_formatter, 431 exit_on_rate_limit=self._exit_on_rate_limit, 432 ) 433 434 return response
Default implementation of a Requester
Attributes:
- name (str): Name of the stream. Only used for request/response caching
- url_base (Union[InterpolatedString, str]): Base url to send requests to
- path (Union[InterpolatedString, str]): Path to send requests to
- http_method (Union[str, HttpMethod]): HTTP method to use when sending requests
- request_options_provider (Optional[InterpolatedRequestOptionsProvider]): request option provider defining the options to set on outgoing requests
- authenticator (DeclarativeAuthenticator): Authenticator defining how to authenticate to the source
- error_handler (Optional[ErrorHandler]): Error handler defining how to detect and handle errors
- backoff_strategies (Optional[List[BackoffStrategy]]): List of backoff strategies to use when retrying requests
- config (Config): The user-provided configuration as specified by the source's spec
- use_cache (bool): Indicates that data should be cached for this stream
124 def get_url_base( 125 self, 126 *, 127 stream_state: Optional[StreamState] = None, 128 stream_slice: Optional[StreamSlice] = None, 129 next_page_token: Optional[Mapping[str, Any]] = None, 130 ) -> str: 131 interpolation_context = get_interpolation_context( 132 stream_state=stream_state, 133 stream_slice=stream_slice, 134 next_page_token=next_page_token, 135 ) 136 return str(self._url_base.eval(self.config, **interpolation_context))
Returns
URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"
138 def get_path( 139 self, 140 *, 141 stream_state: Optional[StreamState] = None, 142 stream_slice: Optional[StreamSlice] = None, 143 next_page_token: Optional[Mapping[str, Any]] = None, 144 ) -> str: 145 interpolation_context = get_interpolation_context( 146 stream_state=stream_state, 147 stream_slice=stream_slice, 148 next_page_token=next_page_token, 149 ) 150 path = str(self._path.eval(self.config, **interpolation_context)) 151 return path.lstrip("/")
Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity"
156 def get_request_params( 157 self, 158 *, 159 stream_state: Optional[StreamState] = None, 160 stream_slice: Optional[StreamSlice] = None, 161 next_page_token: Optional[Mapping[str, Any]] = None, 162 ) -> MutableMapping[str, Any]: 163 return self._request_options_provider.get_request_params( 164 stream_state=stream_state, 165 stream_slice=stream_slice, 166 next_page_token=next_page_token, 167 )
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
169 def get_request_headers( 170 self, 171 *, 172 stream_state: Optional[StreamState] = None, 173 stream_slice: Optional[StreamSlice] = None, 174 next_page_token: Optional[Mapping[str, Any]] = None, 175 ) -> Mapping[str, Any]: 176 return self._request_options_provider.get_request_headers( 177 stream_state=stream_state, 178 stream_slice=stream_slice, 179 next_page_token=next_page_token, 180 )
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
183 def get_request_body_data( # type: ignore 184 self, 185 *, 186 stream_state: Optional[StreamState] = None, 187 stream_slice: Optional[StreamSlice] = None, 188 next_page_token: Optional[Mapping[str, Any]] = None, 189 ) -> Union[Mapping[str, Any], str]: 190 return ( 191 self._request_options_provider.get_request_body_data( 192 stream_state=stream_state, 193 stream_slice=stream_slice, 194 next_page_token=next_page_token, 195 ) 196 or {} 197 )
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
200 def get_request_body_json( # type: ignore 201 self, 202 *, 203 stream_state: Optional[StreamState] = None, 204 stream_slice: Optional[StreamSlice] = None, 205 next_page_token: Optional[Mapping[str, Any]] = None, 206 ) -> Optional[Mapping[str, Any]]: 207 return self._request_options_provider.get_request_body_json( 208 stream_state=stream_state, 209 stream_slice=stream_slice, 210 next_page_token=next_page_token, 211 )
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
389 def send_request( 390 self, 391 stream_state: Optional[StreamState] = None, 392 stream_slice: Optional[StreamSlice] = None, 393 next_page_token: Optional[Mapping[str, Any]] = None, 394 path: Optional[str] = None, 395 request_headers: Optional[Mapping[str, Any]] = None, 396 request_params: Optional[Mapping[str, Any]] = None, 397 request_body_data: Optional[Union[Mapping[str, Any], str]] = None, 398 request_body_json: Optional[Mapping[str, Any]] = None, 399 log_formatter: Optional[Callable[[requests.Response], Any]] = None, 400 ) -> Optional[requests.Response]: 401 request, response = self._http_client.send_request( 402 http_method=self.get_method().value, 403 url=self._join_url( 404 self.get_url_base( 405 stream_state=stream_state, 406 stream_slice=stream_slice, 407 next_page_token=next_page_token, 408 ), 409 path 410 or self.get_path( 411 stream_state=stream_state, 412 stream_slice=stream_slice, 413 next_page_token=next_page_token, 414 ), 415 ), 416 request_kwargs={"stream": self.stream_response}, 417 headers=self._request_headers( 418 stream_state, stream_slice, next_page_token, request_headers 419 ), 420 params=self._request_params( 421 stream_state, stream_slice, next_page_token, request_params 422 ), 423 json=self._request_body_json( 424 stream_state, stream_slice, next_page_token, request_body_json 425 ), 426 data=self._request_body_data( 427 stream_state, stream_slice, next_page_token, request_body_data 428 ), 429 dedupe_query_params=True, 430 log_formatter=log_formatter, 431 exit_on_rate_limit=self._exit_on_rate_limit, 432 ) 433 434 return response
Sends a request and returns the response. Might return no response if the error handler chooses to ignore the response or throw an exception in case of an error. If path is set, the path configured on the requester itself is ignored. If header, params and body are set, they are merged with the ones configured on the requester itself.
If a log formatter is provided, it's used to log the performed request and response. If it's not provided, no logging is performed.
25@dataclass 26class RequestOption: 27 """ 28 Describes an option to set on a request 29 30 Attributes: 31 field_name (str): Describes the name of the parameter to inject. Mutually exclusive with field_path. 32 field_path (list(str)): Describes the path to a nested field as a list of field names. 33 Only valid for body_json injection type, and mutually exclusive with field_name. 34 inject_into (RequestOptionType): Describes where in the HTTP request to inject the parameter 35 """ 36 37 inject_into: RequestOptionType 38 parameters: InitVar[Mapping[str, Any]] 39 field_name: Optional[Union[InterpolatedString, str]] = None 40 field_path: Optional[List[Union[InterpolatedString, str]]] = None 41 42 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 43 # Validate inputs. We should expect either field_name or field_path, but not both 44 if self.field_name is None and self.field_path is None: 45 raise ValueError("RequestOption requires either a field_name or field_path") 46 47 if self.field_name is not None and self.field_path is not None: 48 raise ValueError( 49 "Only one of field_name or field_path can be provided to RequestOption" 50 ) 51 52 # Nested field injection is only supported for body JSON injection 53 if self.field_path is not None and self.inject_into != RequestOptionType.body_json: 54 raise ValueError( 55 "Nested field injection is only supported for body JSON injection. Please use a top-level field_name for other injection types." 56 ) 57 58 # Convert field_name and field_path into InterpolatedString objects if they are strings 59 if self.field_name is not None: 60 self.field_name = InterpolatedString.create(self.field_name, parameters=parameters) 61 elif self.field_path is not None: 62 self.field_path = [ 63 InterpolatedString.create(segment, parameters=parameters) 64 for segment in self.field_path 65 ] 66 67 @property 68 def _is_field_path(self) -> bool: 69 """Returns whether this option is a field path (ie, a nested field)""" 70 return self.field_path is not None 71 72 def inject_into_request( 73 self, 74 target: MutableMapping[str, Any], 75 value: Any, 76 config: Config, 77 ) -> None: 78 """ 79 Inject a request option value into a target request structure using either field_name or field_path. 80 For non-body-json injection, only top-level field names are supported. 81 For body-json injection, both field names and nested field paths are supported. 82 83 Args: 84 target: The request structure to inject the value into 85 value: The value to inject 86 config: The config object to use for interpolation 87 """ 88 if self._is_field_path: 89 if self.inject_into != RequestOptionType.body_json: 90 raise ValueError( 91 "Nested field injection is only supported for body JSON injection. Please use a top-level field_name for other injection types." 92 ) 93 94 assert self.field_path is not None # for type checker 95 current = target 96 # Convert path segments into strings, evaluating any interpolated segments 97 # Example: ["data", "{{ config[user_type] }}", "id"] -> ["data", "admin", "id"] 98 *path_parts, final_key = [ 99 str( 100 segment.eval(config=config) 101 if isinstance(segment, InterpolatedString) 102 else segment 103 ) 104 for segment in self.field_path 105 ] 106 107 # Build a nested dictionary structure and set the final value at the deepest level 108 for part in path_parts: 109 current = current.setdefault(part, {}) 110 current[final_key] = value 111 else: 112 # For non-nested fields, evaluate the field name if it's an interpolated string 113 key = ( 114 self.field_name.eval(config=config) 115 if isinstance(self.field_name, InterpolatedString) 116 else self.field_name 117 ) 118 target[str(key)] = value
Describes an option to set on a request
Attributes:
- field_name (str): Describes the name of the parameter to inject. Mutually exclusive with field_path.
- field_path (list(str)): Describes the path to a nested field as a list of field names. Only valid for body_json injection type, and mutually exclusive with field_name.
- inject_into (RequestOptionType): Describes where in the HTTP request to inject the parameter
72 def inject_into_request( 73 self, 74 target: MutableMapping[str, Any], 75 value: Any, 76 config: Config, 77 ) -> None: 78 """ 79 Inject a request option value into a target request structure using either field_name or field_path. 80 For non-body-json injection, only top-level field names are supported. 81 For body-json injection, both field names and nested field paths are supported. 82 83 Args: 84 target: The request structure to inject the value into 85 value: The value to inject 86 config: The config object to use for interpolation 87 """ 88 if self._is_field_path: 89 if self.inject_into != RequestOptionType.body_json: 90 raise ValueError( 91 "Nested field injection is only supported for body JSON injection. Please use a top-level field_name for other injection types." 92 ) 93 94 assert self.field_path is not None # for type checker 95 current = target 96 # Convert path segments into strings, evaluating any interpolated segments 97 # Example: ["data", "{{ config[user_type] }}", "id"] -> ["data", "admin", "id"] 98 *path_parts, final_key = [ 99 str( 100 segment.eval(config=config) 101 if isinstance(segment, InterpolatedString) 102 else segment 103 ) 104 for segment in self.field_path 105 ] 106 107 # Build a nested dictionary structure and set the final value at the deepest level 108 for part in path_parts: 109 current = current.setdefault(part, {}) 110 current[final_key] = value 111 else: 112 # For non-nested fields, evaluate the field name if it's an interpolated string 113 key = ( 114 self.field_name.eval(config=config) 115 if isinstance(self.field_name, InterpolatedString) 116 else self.field_name 117 ) 118 target[str(key)] = value
Inject a request option value into a target request structure using either field_name or field_path. For non-body-json injection, only top-level field names are supported. For body-json injection, both field names and nested field paths are supported.
Arguments:
- target: The request structure to inject the value into
- value: The value to inject
- config: The config object to use for interpolation
30class Requester(RequestOptionsProvider): 31 @abstractmethod 32 def get_authenticator(self) -> DeclarativeAuthenticator: 33 """ 34 Specifies the authenticator to use when submitting requests 35 """ 36 pass 37 38 @abstractmethod 39 def get_url_base( 40 self, 41 *, 42 stream_state: Optional[StreamState], 43 stream_slice: Optional[StreamSlice], 44 next_page_token: Optional[Mapping[str, Any]], 45 ) -> str: 46 """ 47 :return: URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/" 48 """ 49 50 @abstractmethod 51 def get_path( 52 self, 53 *, 54 stream_state: Optional[StreamState], 55 stream_slice: Optional[StreamSlice], 56 next_page_token: Optional[Mapping[str, Any]], 57 ) -> str: 58 """ 59 Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity" 60 """ 61 62 @abstractmethod 63 def get_method(self) -> HttpMethod: 64 """ 65 Specifies the HTTP method to use 66 """ 67 68 @abstractmethod 69 def get_request_params( 70 self, 71 *, 72 stream_state: Optional[StreamState] = None, 73 stream_slice: Optional[StreamSlice] = None, 74 next_page_token: Optional[Mapping[str, Any]] = None, 75 ) -> MutableMapping[str, Any]: 76 """ 77 Specifies the query parameters that should be set on an outgoing HTTP request given the inputs. 78 79 E.g: you might want to define query parameters for paging if next_page_token is not None. 80 """ 81 82 @abstractmethod 83 def get_request_headers( 84 self, 85 *, 86 stream_state: Optional[StreamState] = None, 87 stream_slice: Optional[StreamSlice] = None, 88 next_page_token: Optional[Mapping[str, Any]] = None, 89 ) -> Mapping[str, Any]: 90 """ 91 Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method. 92 """ 93 94 @abstractmethod 95 def get_request_body_data( 96 self, 97 *, 98 stream_state: Optional[StreamState] = None, 99 stream_slice: Optional[StreamSlice] = None, 100 next_page_token: Optional[Mapping[str, Any]] = None, 101 ) -> Union[Mapping[str, Any], str]: 102 """ 103 Specifies how to populate the body of the request with a non-JSON payload. 104 105 If returns a ready text that it will be sent as is. 106 If returns a dict that it will be converted to a urlencoded form. 107 E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2" 108 109 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 110 """ 111 112 @abstractmethod 113 def get_request_body_json( 114 self, 115 *, 116 stream_state: Optional[StreamState] = None, 117 stream_slice: Optional[StreamSlice] = None, 118 next_page_token: Optional[Mapping[str, Any]] = None, 119 ) -> Mapping[str, Any]: 120 """ 121 Specifies how to populate the body of the request with a JSON payload. 122 123 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 124 """ 125 126 @abstractmethod 127 def send_request( 128 self, 129 stream_state: Optional[StreamState] = None, 130 stream_slice: Optional[StreamSlice] = None, 131 next_page_token: Optional[Mapping[str, Any]] = None, 132 path: Optional[str] = None, 133 request_headers: Optional[Mapping[str, Any]] = None, 134 request_params: Optional[Mapping[str, Any]] = None, 135 request_body_data: Optional[Union[Mapping[str, Any], str]] = None, 136 request_body_json: Optional[Mapping[str, Any]] = None, 137 log_formatter: Optional[Callable[[requests.Response], Any]] = None, 138 ) -> Optional[requests.Response]: 139 """ 140 Sends a request and returns the response. Might return no response if the error handler chooses to ignore the response or throw an exception in case of an error. 141 If path is set, the path configured on the requester itself is ignored. 142 If header, params and body are set, they are merged with the ones configured on the requester itself. 143 144 If a log formatter is provided, it's used to log the performed request and response. If it's not provided, no logging is performed. 145 """
Defines the request options to set on an outgoing HTTP request
Options can be passed by
- request parameter
- request headers
- body data
- json content
31 @abstractmethod 32 def get_authenticator(self) -> DeclarativeAuthenticator: 33 """ 34 Specifies the authenticator to use when submitting requests 35 """ 36 pass
Specifies the authenticator to use when submitting requests
38 @abstractmethod 39 def get_url_base( 40 self, 41 *, 42 stream_state: Optional[StreamState], 43 stream_slice: Optional[StreamSlice], 44 next_page_token: Optional[Mapping[str, Any]], 45 ) -> str: 46 """ 47 :return: URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/" 48 """
Returns
URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"
50 @abstractmethod 51 def get_path( 52 self, 53 *, 54 stream_state: Optional[StreamState], 55 stream_slice: Optional[StreamSlice], 56 next_page_token: Optional[Mapping[str, Any]], 57 ) -> str: 58 """ 59 Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity" 60 """
Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity"
62 @abstractmethod 63 def get_method(self) -> HttpMethod: 64 """ 65 Specifies the HTTP method to use 66 """
Specifies the HTTP method to use
68 @abstractmethod 69 def get_request_params( 70 self, 71 *, 72 stream_state: Optional[StreamState] = None, 73 stream_slice: Optional[StreamSlice] = None, 74 next_page_token: Optional[Mapping[str, Any]] = None, 75 ) -> MutableMapping[str, Any]: 76 """ 77 Specifies the query parameters that should be set on an outgoing HTTP request given the inputs. 78 79 E.g: you might want to define query parameters for paging if next_page_token is not None. 80 """
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
82 @abstractmethod 83 def get_request_headers( 84 self, 85 *, 86 stream_state: Optional[StreamState] = None, 87 stream_slice: Optional[StreamSlice] = None, 88 next_page_token: Optional[Mapping[str, Any]] = None, 89 ) -> Mapping[str, Any]: 90 """ 91 Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method. 92 """
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
94 @abstractmethod 95 def get_request_body_data( 96 self, 97 *, 98 stream_state: Optional[StreamState] = None, 99 stream_slice: Optional[StreamSlice] = None, 100 next_page_token: Optional[Mapping[str, Any]] = None, 101 ) -> Union[Mapping[str, Any], str]: 102 """ 103 Specifies how to populate the body of the request with a non-JSON payload. 104 105 If returns a ready text that it will be sent as is. 106 If returns a dict that it will be converted to a urlencoded form. 107 E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2" 108 109 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 110 """
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
112 @abstractmethod 113 def get_request_body_json( 114 self, 115 *, 116 stream_state: Optional[StreamState] = None, 117 stream_slice: Optional[StreamSlice] = None, 118 next_page_token: Optional[Mapping[str, Any]] = None, 119 ) -> Mapping[str, Any]: 120 """ 121 Specifies how to populate the body of the request with a JSON payload. 122 123 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 124 """
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
126 @abstractmethod 127 def send_request( 128 self, 129 stream_state: Optional[StreamState] = None, 130 stream_slice: Optional[StreamSlice] = None, 131 next_page_token: Optional[Mapping[str, Any]] = None, 132 path: Optional[str] = None, 133 request_headers: Optional[Mapping[str, Any]] = None, 134 request_params: Optional[Mapping[str, Any]] = None, 135 request_body_data: Optional[Union[Mapping[str, Any], str]] = None, 136 request_body_json: Optional[Mapping[str, Any]] = None, 137 log_formatter: Optional[Callable[[requests.Response], Any]] = None, 138 ) -> Optional[requests.Response]: 139 """ 140 Sends a request and returns the response. Might return no response if the error handler chooses to ignore the response or throw an exception in case of an error. 141 If path is set, the path configured on the requester itself is ignored. 142 If header, params and body are set, they are merged with the ones configured on the requester itself. 143 144 If a log formatter is provided, it's used to log the performed request and response. If it's not provided, no logging is performed. 145 """
Sends a request and returns the response. Might return no response if the error handler chooses to ignore the response or throw an exception in case of an error. If path is set, the path configured on the requester itself is ignored. If header, params and body are set, they are merged with the ones configured on the requester itself.
If a log formatter is provided, it's used to log the performed request and response. If it's not provided, no logging is performed.