airbyte_cdk.sources.declarative.requesters
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from airbyte_cdk.sources.declarative.requesters.http_requester import HttpRequester 6from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption 7from airbyte_cdk.sources.declarative.requesters.requester import Requester 8 9__all__ = ["HttpRequester", "RequestOption", "Requester"]
37@dataclass 38class HttpRequester(Requester): 39 """ 40 Default implementation of a Requester 41 42 Attributes: 43 name (str): Name of the stream. Only used for request/response caching 44 url_base (Union[InterpolatedString, str]): Base url to send requests to 45 path (Union[InterpolatedString, str]): Path to send requests to 46 http_method (Union[str, HttpMethod]): HTTP method to use when sending requests 47 request_options_provider (Optional[InterpolatedRequestOptionsProvider]): request option provider defining the options to set on outgoing requests 48 authenticator (DeclarativeAuthenticator): Authenticator defining how to authenticate to the source 49 error_handler (Optional[ErrorHandler]): Error handler defining how to detect and handle errors 50 backoff_strategies (Optional[List[BackoffStrategy]]): List of backoff strategies to use when retrying requests 51 config (Config): The user-provided configuration as specified by the source's spec 52 use_cache (bool): Indicates that data should be cached for this stream 53 """ 54 55 name: str 56 config: Config 57 parameters: InitVar[Mapping[str, Any]] 58 59 url: Optional[Union[InterpolatedString, str]] = None 60 url_base: Optional[Union[InterpolatedString, str]] = None 61 path: Optional[Union[InterpolatedString, str]] = None 62 authenticator: Optional[DeclarativeAuthenticator] = None 63 http_method: Union[str, HttpMethod] = HttpMethod.GET 64 request_options_provider: Optional[InterpolatedRequestOptionsProvider] = None 65 error_handler: Optional[ErrorHandler] = None 66 api_budget: Optional[APIBudget] = None 67 disable_retries: bool = False 68 message_repository: MessageRepository = NoopMessageRepository() 69 use_cache: bool = False 70 _exit_on_rate_limit: bool = False 71 stream_response: bool = False 72 decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={})) 73 74 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 75 self._url = InterpolatedString.create( 76 self.url if self.url else EmptyString, parameters=parameters 77 ) 78 # deprecated 79 self._url_base = InterpolatedString.create( 80 self.url_base if self.url_base else EmptyString, parameters=parameters 81 ) 82 # deprecated 83 self._path = InterpolatedString.create( 84 self.path if self.path else EmptyString, parameters=parameters 85 ) 86 if self.request_options_provider is None: 87 self._request_options_provider = InterpolatedRequestOptionsProvider( 88 config=self.config, parameters=parameters 89 ) 90 elif isinstance(self.request_options_provider, dict): 91 self._request_options_provider = InterpolatedRequestOptionsProvider( 92 config=self.config, **self.request_options_provider 93 ) 94 else: 95 self._request_options_provider = self.request_options_provider 96 self._authenticator = self.authenticator or NoAuth(parameters=parameters) 97 self._http_method = ( 98 HttpMethod[self.http_method] if isinstance(self.http_method, str) else self.http_method 99 ) 100 self.error_handler = self.error_handler 101 self._parameters = parameters 102 103 if self.error_handler is not None and hasattr(self.error_handler, "backoff_strategies"): 104 backoff_strategies = self.error_handler.backoff_strategies # type: ignore 105 else: 106 backoff_strategies = None 107 108 self._http_client = HttpClient( 109 name=self.name, 110 logger=self.logger, 111 error_handler=self.error_handler, 112 api_budget=self.api_budget, 113 authenticator=self._authenticator, 114 use_cache=self.use_cache, 115 backoff_strategy=backoff_strategies, 116 disable_retries=self.disable_retries, 117 message_repository=self.message_repository, 118 ) 119 120 @property 121 def exit_on_rate_limit(self) -> bool: 122 return self._exit_on_rate_limit 123 124 @exit_on_rate_limit.setter 125 def exit_on_rate_limit(self, value: bool) -> None: 126 self._exit_on_rate_limit = value 127 128 def get_authenticator(self) -> DeclarativeAuthenticator: 129 return self._authenticator 130 131 def get_url( 132 self, 133 *, 134 stream_state: Optional[StreamState] = None, 135 stream_slice: Optional[StreamSlice] = None, 136 next_page_token: Optional[Mapping[str, Any]] = None, 137 ) -> str: 138 interpolation_context = get_interpolation_context( 139 stream_state=stream_state, 140 stream_slice=stream_slice, 141 next_page_token=next_page_token, 142 ) 143 144 return str(self._url.eval(self.config, **interpolation_context)) 145 146 def _get_url( 147 self, 148 *, 149 path: Optional[str] = None, 150 stream_state: Optional[StreamState] = None, 151 stream_slice: Optional[StreamSlice] = None, 152 next_page_token: Optional[Mapping[str, Any]] = None, 153 ) -> str: 154 url = self.get_url( 155 stream_state=stream_state, 156 stream_slice=stream_slice, 157 next_page_token=next_page_token, 158 ) 159 160 url_base = self.get_url_base( 161 stream_state=stream_state, 162 stream_slice=stream_slice, 163 next_page_token=next_page_token, 164 ) 165 166 path = path or self.get_path( 167 stream_state=stream_state, 168 stream_slice=stream_slice, 169 next_page_token=next_page_token, 170 ) 171 172 full_url = ( 173 self._join_url(url_base, path) 174 if url_base 175 else self._join_url(url, path) 176 if path 177 else url 178 ) 179 180 return full_url 181 182 def get_url_base( 183 self, 184 *, 185 stream_state: Optional[StreamState] = None, 186 stream_slice: Optional[StreamSlice] = None, 187 next_page_token: Optional[Mapping[str, Any]] = None, 188 ) -> str: 189 interpolation_context = get_interpolation_context( 190 stream_state=stream_state, 191 stream_slice=stream_slice, 192 next_page_token=next_page_token, 193 ) 194 return str(self._url_base.eval(self.config, **interpolation_context)) 195 196 def get_path( 197 self, 198 *, 199 stream_state: Optional[StreamState] = None, 200 stream_slice: Optional[StreamSlice] = None, 201 next_page_token: Optional[Mapping[str, Any]] = None, 202 ) -> str: 203 interpolation_context = get_interpolation_context( 204 stream_state=stream_state, 205 stream_slice=stream_slice, 206 next_page_token=next_page_token, 207 ) 208 path = str(self._path.eval(self.config, **interpolation_context)) 209 return path.lstrip("/") 210 211 def get_method(self) -> HttpMethod: 212 return self._http_method 213 214 def get_request_params( 215 self, 216 *, 217 stream_state: Optional[StreamState] = None, 218 stream_slice: Optional[StreamSlice] = None, 219 next_page_token: Optional[Mapping[str, Any]] = None, 220 ) -> MutableMapping[str, Any]: 221 return self._request_options_provider.get_request_params( 222 stream_state=stream_state, 223 stream_slice=stream_slice, 224 next_page_token=next_page_token, 225 ) 226 227 def get_request_headers( 228 self, 229 *, 230 stream_state: Optional[StreamState] = None, 231 stream_slice: Optional[StreamSlice] = None, 232 next_page_token: Optional[Mapping[str, Any]] = None, 233 ) -> Mapping[str, Any]: 234 return self._request_options_provider.get_request_headers( 235 stream_state=stream_state, 236 stream_slice=stream_slice, 237 next_page_token=next_page_token, 238 ) 239 240 # fixing request options provider types has a lot of dependencies 241 def get_request_body_data( # type: ignore 242 self, 243 *, 244 stream_state: Optional[StreamState] = None, 245 stream_slice: Optional[StreamSlice] = None, 246 next_page_token: Optional[Mapping[str, Any]] = None, 247 ) -> Union[Mapping[str, Any], str]: 248 return ( 249 self._request_options_provider.get_request_body_data( 250 stream_state=stream_state, 251 stream_slice=stream_slice, 252 next_page_token=next_page_token, 253 ) 254 or {} 255 ) 256 257 # fixing request options provider types has a lot of dependencies 258 def get_request_body_json( # type: ignore 259 self, 260 *, 261 stream_state: Optional[StreamState] = None, 262 stream_slice: Optional[StreamSlice] = None, 263 next_page_token: Optional[Mapping[str, Any]] = None, 264 ) -> Optional[Mapping[str, Any]]: 265 return self._request_options_provider.get_request_body_json( 266 stream_state=stream_state, 267 stream_slice=stream_slice, 268 next_page_token=next_page_token, 269 ) 270 271 @property 272 def logger(self) -> logging.Logger: 273 return logging.getLogger(f"airbyte.HttpRequester.{self.name}") 274 275 def _get_request_options( 276 self, 277 stream_state: Optional[StreamState], 278 stream_slice: Optional[StreamSlice], 279 next_page_token: Optional[Mapping[str, Any]], 280 requester_method: Callable[..., Optional[Union[Mapping[str, Any], str]]], 281 auth_options_method: Callable[..., Optional[Union[Mapping[str, Any], str]]], 282 extra_options: Optional[Union[Mapping[str, Any], str]] = None, 283 ) -> Union[Mapping[str, Any], str]: 284 """ 285 Get the request_option from the requester, the authenticator and extra_options passed in. 286 Raise a ValueError if there's a key collision 287 Returned merged mapping otherwise 288 """ 289 290 is_body_json = requester_method.__name__ == "get_request_body_json" 291 292 return combine_mappings( 293 [ 294 requester_method( 295 stream_state=stream_state, 296 stream_slice=stream_slice, 297 next_page_token=next_page_token, 298 ), 299 auth_options_method(), 300 extra_options, 301 ], 302 allow_same_value_merge=is_body_json, 303 ) 304 305 def _request_headers( 306 self, 307 stream_state: Optional[StreamState] = None, 308 stream_slice: Optional[StreamSlice] = None, 309 next_page_token: Optional[Mapping[str, Any]] = None, 310 extra_headers: Optional[Mapping[str, Any]] = None, 311 ) -> Mapping[str, Any]: 312 """ 313 Specifies request headers. 314 Authentication headers will overwrite any overlapping headers returned from this method. 315 """ 316 headers = self._get_request_options( 317 stream_state, 318 stream_slice, 319 next_page_token, 320 self.get_request_headers, 321 self.get_authenticator().get_auth_header, 322 extra_headers, 323 ) 324 if isinstance(headers, str): 325 raise ValueError("Request headers cannot be a string") 326 return {str(k): str(v) for k, v in headers.items()} 327 328 def _request_params( 329 self, 330 stream_state: Optional[StreamState], 331 stream_slice: Optional[StreamSlice], 332 next_page_token: Optional[Mapping[str, Any]], 333 extra_params: Optional[Mapping[str, Any]] = None, 334 ) -> Mapping[str, Any]: 335 """ 336 Specifies the query parameters that should be set on an outgoing HTTP request given the inputs. 337 338 E.g: you might want to define query parameters for paging if next_page_token is not None. 339 """ 340 options = self._get_request_options( 341 stream_state, 342 stream_slice, 343 next_page_token, 344 self.get_request_params, 345 self.get_authenticator().get_request_params, 346 extra_params, 347 ) 348 if isinstance(options, str): 349 raise ValueError("Request params cannot be a string") 350 351 for k, v in options.items(): 352 if isinstance(v, (dict,)): 353 raise ValueError( 354 f"Invalid value for `{k}` parameter. The values of request params cannot be an object." 355 ) 356 357 return options 358 359 def _request_body_data( 360 self, 361 stream_state: Optional[StreamState], 362 stream_slice: Optional[StreamSlice], 363 next_page_token: Optional[Mapping[str, Any]], 364 extra_body_data: Optional[Union[Mapping[str, Any], str]] = None, 365 ) -> Optional[Union[Mapping[str, Any], str]]: 366 """ 367 Specifies how to populate the body of the request with a non-JSON payload. 368 369 If returns a ready text that it will be sent as is. 370 If returns a dict that it will be converted to a urlencoded form. 371 E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2" 372 373 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 374 """ 375 # Warning: use self.state instead of the stream_state passed as argument! 376 return self._get_request_options( 377 stream_state, 378 stream_slice, 379 next_page_token, 380 self.get_request_body_data, 381 self.get_authenticator().get_request_body_data, 382 extra_body_data, 383 ) 384 385 def _request_body_json( 386 self, 387 stream_state: Optional[StreamState], 388 stream_slice: Optional[StreamSlice], 389 next_page_token: Optional[Mapping[str, Any]], 390 extra_body_json: Optional[Mapping[str, Any]] = None, 391 ) -> Optional[Mapping[str, Any]]: 392 """ 393 Specifies how to populate the body of the request with a JSON payload. 394 395 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 396 """ 397 # Warning: use self.state instead of the stream_state passed as argument! 398 options = self._get_request_options( 399 stream_state, 400 stream_slice, 401 next_page_token, 402 self.get_request_body_json, 403 self.get_authenticator().get_request_body_json, 404 extra_body_json, 405 ) 406 if isinstance(options, str): 407 raise ValueError("Request body json cannot be a string") 408 return options 409 410 @classmethod 411 def _join_url(cls, url_base: str, path: Optional[str] = None) -> str: 412 """ 413 Joins a base URL with a given path and returns the resulting URL with any trailing slash removed. 414 415 This method ensures that there are no duplicate slashes when concatenating the base URL and the path, 416 which is useful when the full URL is provided from an interpolation context. 417 418 Args: 419 url_base (str): The base URL to which the path will be appended. 420 path (Optional[str]): The path to join with the base URL. 421 422 Returns: 423 str: The resulting joined URL. 424 425 Note: 426 Related issue: https://github.com/airbytehq/airbyte-internal-issues/issues/11869 427 - If the path is an empty string or None, the method returns the base URL with any trailing slash removed. 428 429 Example: 430 1) _join_url("https://example.com/api/", "endpoint") >> 'https://example.com/api/endpoint' 431 2) _join_url("https://example.com/api", "/endpoint") >> 'https://example.com/api/endpoint' 432 3) _join_url("https://example.com/api/", "") >> 'https://example.com/api/' 433 4) _join_url("https://example.com/api", None) >> 'https://example.com/api' 434 """ 435 436 # return a full-url if provided directly from interpolation context 437 if path == EmptyString or path is None: 438 return url_base 439 else: 440 # since we didn't provide a full-url, the url_base might not have a trailing slash 441 # so we join the url_base and path correctly 442 if not url_base.endswith("/"): 443 url_base += "/" 444 445 return urljoin(url_base, path) 446 447 def send_request( 448 self, 449 stream_state: Optional[StreamState] = None, 450 stream_slice: Optional[StreamSlice] = None, 451 next_page_token: Optional[Mapping[str, Any]] = None, 452 path: Optional[str] = None, 453 request_headers: Optional[Mapping[str, Any]] = None, 454 request_params: Optional[Mapping[str, Any]] = None, 455 request_body_data: Optional[Union[Mapping[str, Any], str]] = None, 456 request_body_json: Optional[Mapping[str, Any]] = None, 457 log_formatter: Optional[Callable[[requests.Response], Any]] = None, 458 ) -> Optional[requests.Response]: 459 request, response = self._http_client.send_request( 460 http_method=self.get_method().value, 461 url=self._get_url( 462 path=path, 463 stream_state=stream_state, 464 stream_slice=stream_slice, 465 next_page_token=next_page_token, 466 ), 467 request_kwargs={"stream": self.stream_response}, 468 headers=self._request_headers( 469 stream_state, stream_slice, next_page_token, request_headers 470 ), 471 params=self._request_params( 472 stream_state, stream_slice, next_page_token, request_params 473 ), 474 json=self._request_body_json( 475 stream_state, stream_slice, next_page_token, request_body_json 476 ), 477 data=self._request_body_data( 478 stream_state, stream_slice, next_page_token, request_body_data 479 ), 480 dedupe_query_params=True, 481 log_formatter=log_formatter, 482 exit_on_rate_limit=self._exit_on_rate_limit, 483 ) 484 485 return response
Default implementation of a Requester
Attributes:
- name (str): Name of the stream. Only used for request/response caching
- url_base (Union[InterpolatedString, str]): Base url to send requests to
- path (Union[InterpolatedString, str]): Path to send requests to
- http_method (Union[str, HttpMethod]): HTTP method to use when sending requests
- request_options_provider (Optional[InterpolatedRequestOptionsProvider]): request option provider defining the options to set on outgoing requests
- authenticator (DeclarativeAuthenticator): Authenticator defining how to authenticate to the source
- error_handler (Optional[ErrorHandler]): Error handler defining how to detect and handle errors
- backoff_strategies (Optional[List[BackoffStrategy]]): List of backoff strategies to use when retrying requests
- config (Config): The user-provided configuration as specified by the source's spec
- use_cache (bool): Indicates that data should be cached for this stream
131 def get_url( 132 self, 133 *, 134 stream_state: Optional[StreamState] = None, 135 stream_slice: Optional[StreamSlice] = None, 136 next_page_token: Optional[Mapping[str, Any]] = None, 137 ) -> str: 138 interpolation_context = get_interpolation_context( 139 stream_state=stream_state, 140 stream_slice=stream_slice, 141 next_page_token=next_page_token, 142 ) 143 144 return str(self._url.eval(self.config, **interpolation_context))
Returns
URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"
182 def get_url_base( 183 self, 184 *, 185 stream_state: Optional[StreamState] = None, 186 stream_slice: Optional[StreamSlice] = None, 187 next_page_token: Optional[Mapping[str, Any]] = None, 188 ) -> str: 189 interpolation_context = get_interpolation_context( 190 stream_state=stream_state, 191 stream_slice=stream_slice, 192 next_page_token=next_page_token, 193 ) 194 return str(self._url_base.eval(self.config, **interpolation_context))
Returns
URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"
196 def get_path( 197 self, 198 *, 199 stream_state: Optional[StreamState] = None, 200 stream_slice: Optional[StreamSlice] = None, 201 next_page_token: Optional[Mapping[str, Any]] = None, 202 ) -> str: 203 interpolation_context = get_interpolation_context( 204 stream_state=stream_state, 205 stream_slice=stream_slice, 206 next_page_token=next_page_token, 207 ) 208 path = str(self._path.eval(self.config, **interpolation_context)) 209 return path.lstrip("/")
Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity"
214 def get_request_params( 215 self, 216 *, 217 stream_state: Optional[StreamState] = None, 218 stream_slice: Optional[StreamSlice] = None, 219 next_page_token: Optional[Mapping[str, Any]] = None, 220 ) -> MutableMapping[str, Any]: 221 return self._request_options_provider.get_request_params( 222 stream_state=stream_state, 223 stream_slice=stream_slice, 224 next_page_token=next_page_token, 225 )
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
227 def get_request_headers( 228 self, 229 *, 230 stream_state: Optional[StreamState] = None, 231 stream_slice: Optional[StreamSlice] = None, 232 next_page_token: Optional[Mapping[str, Any]] = None, 233 ) -> Mapping[str, Any]: 234 return self._request_options_provider.get_request_headers( 235 stream_state=stream_state, 236 stream_slice=stream_slice, 237 next_page_token=next_page_token, 238 )
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
241 def get_request_body_data( # type: ignore 242 self, 243 *, 244 stream_state: Optional[StreamState] = None, 245 stream_slice: Optional[StreamSlice] = None, 246 next_page_token: Optional[Mapping[str, Any]] = None, 247 ) -> Union[Mapping[str, Any], str]: 248 return ( 249 self._request_options_provider.get_request_body_data( 250 stream_state=stream_state, 251 stream_slice=stream_slice, 252 next_page_token=next_page_token, 253 ) 254 or {} 255 )
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
258 def get_request_body_json( # type: ignore 259 self, 260 *, 261 stream_state: Optional[StreamState] = None, 262 stream_slice: Optional[StreamSlice] = None, 263 next_page_token: Optional[Mapping[str, Any]] = None, 264 ) -> Optional[Mapping[str, Any]]: 265 return self._request_options_provider.get_request_body_json( 266 stream_state=stream_state, 267 stream_slice=stream_slice, 268 next_page_token=next_page_token, 269 )
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
447 def send_request( 448 self, 449 stream_state: Optional[StreamState] = None, 450 stream_slice: Optional[StreamSlice] = None, 451 next_page_token: Optional[Mapping[str, Any]] = None, 452 path: Optional[str] = None, 453 request_headers: Optional[Mapping[str, Any]] = None, 454 request_params: Optional[Mapping[str, Any]] = None, 455 request_body_data: Optional[Union[Mapping[str, Any], str]] = None, 456 request_body_json: Optional[Mapping[str, Any]] = None, 457 log_formatter: Optional[Callable[[requests.Response], Any]] = None, 458 ) -> Optional[requests.Response]: 459 request, response = self._http_client.send_request( 460 http_method=self.get_method().value, 461 url=self._get_url( 462 path=path, 463 stream_state=stream_state, 464 stream_slice=stream_slice, 465 next_page_token=next_page_token, 466 ), 467 request_kwargs={"stream": self.stream_response}, 468 headers=self._request_headers( 469 stream_state, stream_slice, next_page_token, request_headers 470 ), 471 params=self._request_params( 472 stream_state, stream_slice, next_page_token, request_params 473 ), 474 json=self._request_body_json( 475 stream_state, stream_slice, next_page_token, request_body_json 476 ), 477 data=self._request_body_data( 478 stream_state, stream_slice, next_page_token, request_body_data 479 ), 480 dedupe_query_params=True, 481 log_formatter=log_formatter, 482 exit_on_rate_limit=self._exit_on_rate_limit, 483 ) 484 485 return response
Sends a request and returns the response. Might return no response if the error handler chooses to ignore the response or throw an exception in case of an error. If path is set, the path configured on the requester itself is ignored. If header, params and body are set, they are merged with the ones configured on the requester itself.
If a log formatter is provided, it's used to log the performed request and response. If it's not provided, no logging is performed.
25@dataclass 26class RequestOption: 27 """ 28 Describes an option to set on a request 29 30 Attributes: 31 field_name (str): Describes the name of the parameter to inject. Mutually exclusive with field_path. 32 field_path (list(str)): Describes the path to a nested field as a list of field names. 33 Only valid for body_json injection type, and mutually exclusive with field_name. 34 inject_into (RequestOptionType): Describes where in the HTTP request to inject the parameter 35 """ 36 37 inject_into: RequestOptionType 38 parameters: InitVar[Mapping[str, Any]] 39 field_name: Optional[Union[InterpolatedString, str]] = None 40 field_path: Optional[List[Union[InterpolatedString, str]]] = None 41 42 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 43 # Validate inputs. We should expect either field_name or field_path, but not both 44 if self.field_name is None and self.field_path is None: 45 raise ValueError("RequestOption requires either a field_name or field_path") 46 47 if self.field_name is not None and self.field_path is not None: 48 raise ValueError( 49 "Only one of field_name or field_path can be provided to RequestOption" 50 ) 51 52 # Nested field injection is only supported for body JSON injection 53 if self.field_path is not None and self.inject_into != RequestOptionType.body_json: 54 raise ValueError( 55 "Nested field injection is only supported for body JSON injection. Please use a top-level field_name for other injection types." 56 ) 57 58 # Convert field_name and field_path into InterpolatedString objects if they are strings 59 if self.field_name is not None: 60 self.field_name = InterpolatedString.create(self.field_name, parameters=parameters) 61 elif self.field_path is not None: 62 self.field_path = [ 63 InterpolatedString.create(segment, parameters=parameters) 64 for segment in self.field_path 65 ] 66 67 @property 68 def _is_field_path(self) -> bool: 69 """Returns whether this option is a field path (ie, a nested field)""" 70 return self.field_path is not None 71 72 def inject_into_request( 73 self, 74 target: MutableMapping[str, Any], 75 value: Any, 76 config: Config, 77 ) -> None: 78 """ 79 Inject a request option value into a target request structure using either field_name or field_path. 80 For non-body-json injection, only top-level field names are supported. 81 For body-json injection, both field names and nested field paths are supported. 82 83 Args: 84 target: The request structure to inject the value into 85 value: The value to inject 86 config: The config object to use for interpolation 87 """ 88 if self._is_field_path: 89 if self.inject_into != RequestOptionType.body_json: 90 raise ValueError( 91 "Nested field injection is only supported for body JSON injection. Please use a top-level field_name for other injection types." 92 ) 93 94 assert self.field_path is not None # for type checker 95 current = target 96 # Convert path segments into strings, evaluating any interpolated segments 97 # Example: ["data", "{{ config[user_type] }}", "id"] -> ["data", "admin", "id"] 98 *path_parts, final_key = [ 99 str( 100 segment.eval(config=config) 101 if isinstance(segment, InterpolatedString) 102 else segment 103 ) 104 for segment in self.field_path 105 ] 106 107 # Build a nested dictionary structure and set the final value at the deepest level 108 for part in path_parts: 109 current = current.setdefault(part, {}) 110 current[final_key] = value 111 else: 112 # For non-nested fields, evaluate the field name if it's an interpolated string 113 key = ( 114 self.field_name.eval(config=config) 115 if isinstance(self.field_name, InterpolatedString) 116 else self.field_name 117 ) 118 target[str(key)] = value
Describes an option to set on a request
Attributes:
- field_name (str): Describes the name of the parameter to inject. Mutually exclusive with field_path.
- field_path (list(str)): Describes the path to a nested field as a list of field names. Only valid for body_json injection type, and mutually exclusive with field_name.
- inject_into (RequestOptionType): Describes where in the HTTP request to inject the parameter
72 def inject_into_request( 73 self, 74 target: MutableMapping[str, Any], 75 value: Any, 76 config: Config, 77 ) -> None: 78 """ 79 Inject a request option value into a target request structure using either field_name or field_path. 80 For non-body-json injection, only top-level field names are supported. 81 For body-json injection, both field names and nested field paths are supported. 82 83 Args: 84 target: The request structure to inject the value into 85 value: The value to inject 86 config: The config object to use for interpolation 87 """ 88 if self._is_field_path: 89 if self.inject_into != RequestOptionType.body_json: 90 raise ValueError( 91 "Nested field injection is only supported for body JSON injection. Please use a top-level field_name for other injection types." 92 ) 93 94 assert self.field_path is not None # for type checker 95 current = target 96 # Convert path segments into strings, evaluating any interpolated segments 97 # Example: ["data", "{{ config[user_type] }}", "id"] -> ["data", "admin", "id"] 98 *path_parts, final_key = [ 99 str( 100 segment.eval(config=config) 101 if isinstance(segment, InterpolatedString) 102 else segment 103 ) 104 for segment in self.field_path 105 ] 106 107 # Build a nested dictionary structure and set the final value at the deepest level 108 for part in path_parts: 109 current = current.setdefault(part, {}) 110 current[final_key] = value 111 else: 112 # For non-nested fields, evaluate the field name if it's an interpolated string 113 key = ( 114 self.field_name.eval(config=config) 115 if isinstance(self.field_name, InterpolatedString) 116 else self.field_name 117 ) 118 target[str(key)] = value
Inject a request option value into a target request structure using either field_name or field_path. For non-body-json injection, only top-level field names are supported. For body-json injection, both field names and nested field paths are supported.
Arguments:
- target: The request structure to inject the value into
- value: The value to inject
- config: The config object to use for interpolation
30class Requester(RequestOptionsProvider): 31 @abstractmethod 32 def get_authenticator(self) -> DeclarativeAuthenticator: 33 """ 34 Specifies the authenticator to use when submitting requests 35 """ 36 pass 37 38 @abstractmethod 39 def get_url( 40 self, 41 *, 42 stream_state: Optional[StreamState], 43 stream_slice: Optional[StreamSlice], 44 next_page_token: Optional[Mapping[str, Any]], 45 ) -> str: 46 """ 47 :return: URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/" 48 """ 49 50 @abstractmethod 51 def get_url_base( 52 self, 53 *, 54 stream_state: Optional[StreamState], 55 stream_slice: Optional[StreamSlice], 56 next_page_token: Optional[Mapping[str, Any]], 57 ) -> str: 58 """ 59 :return: URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/" 60 """ 61 62 @abstractmethod 63 def get_path( 64 self, 65 *, 66 stream_state: Optional[StreamState], 67 stream_slice: Optional[StreamSlice], 68 next_page_token: Optional[Mapping[str, Any]], 69 ) -> str: 70 """ 71 Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity" 72 """ 73 74 @abstractmethod 75 def get_method(self) -> HttpMethod: 76 """ 77 Specifies the HTTP method to use 78 """ 79 80 @abstractmethod 81 def get_request_params( 82 self, 83 *, 84 stream_state: Optional[StreamState] = None, 85 stream_slice: Optional[StreamSlice] = None, 86 next_page_token: Optional[Mapping[str, Any]] = None, 87 ) -> MutableMapping[str, Any]: 88 """ 89 Specifies the query parameters that should be set on an outgoing HTTP request given the inputs. 90 91 E.g: you might want to define query parameters for paging if next_page_token is not None. 92 """ 93 94 @abstractmethod 95 def get_request_headers( 96 self, 97 *, 98 stream_state: Optional[StreamState] = None, 99 stream_slice: Optional[StreamSlice] = None, 100 next_page_token: Optional[Mapping[str, Any]] = None, 101 ) -> Mapping[str, Any]: 102 """ 103 Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method. 104 """ 105 106 @abstractmethod 107 def get_request_body_data( 108 self, 109 *, 110 stream_state: Optional[StreamState] = None, 111 stream_slice: Optional[StreamSlice] = None, 112 next_page_token: Optional[Mapping[str, Any]] = None, 113 ) -> Union[Mapping[str, Any], str]: 114 """ 115 Specifies how to populate the body of the request with a non-JSON payload. 116 117 If returns a ready text that it will be sent as is. 118 If returns a dict that it will be converted to a urlencoded form. 119 E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2" 120 121 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 122 """ 123 124 @abstractmethod 125 def get_request_body_json( 126 self, 127 *, 128 stream_state: Optional[StreamState] = None, 129 stream_slice: Optional[StreamSlice] = None, 130 next_page_token: Optional[Mapping[str, Any]] = None, 131 ) -> Mapping[str, Any]: 132 """ 133 Specifies how to populate the body of the request with a JSON payload. 134 135 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 136 """ 137 138 @abstractmethod 139 def send_request( 140 self, 141 stream_state: Optional[StreamState] = None, 142 stream_slice: Optional[StreamSlice] = None, 143 next_page_token: Optional[Mapping[str, Any]] = None, 144 path: Optional[str] = None, 145 request_headers: Optional[Mapping[str, Any]] = None, 146 request_params: Optional[Mapping[str, Any]] = None, 147 request_body_data: Optional[Union[Mapping[str, Any], str]] = None, 148 request_body_json: Optional[Mapping[str, Any]] = None, 149 log_formatter: Optional[Callable[[requests.Response], Any]] = None, 150 ) -> Optional[requests.Response]: 151 """ 152 Sends a request and returns the response. Might return no response if the error handler chooses to ignore the response or throw an exception in case of an error. 153 If path is set, the path configured on the requester itself is ignored. 154 If header, params and body are set, they are merged with the ones configured on the requester itself. 155 156 If a log formatter is provided, it's used to log the performed request and response. If it's not provided, no logging is performed. 157 """
Defines the request options to set on an outgoing HTTP request
Options can be passed by
- request parameter
- request headers
- body data
- json content
31 @abstractmethod 32 def get_authenticator(self) -> DeclarativeAuthenticator: 33 """ 34 Specifies the authenticator to use when submitting requests 35 """ 36 pass
Specifies the authenticator to use when submitting requests
38 @abstractmethod 39 def get_url( 40 self, 41 *, 42 stream_state: Optional[StreamState], 43 stream_slice: Optional[StreamSlice], 44 next_page_token: Optional[Mapping[str, Any]], 45 ) -> str: 46 """ 47 :return: URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/" 48 """
Returns
URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"
50 @abstractmethod 51 def get_url_base( 52 self, 53 *, 54 stream_state: Optional[StreamState], 55 stream_slice: Optional[StreamSlice], 56 next_page_token: Optional[Mapping[str, Any]], 57 ) -> str: 58 """ 59 :return: URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/" 60 """
Returns
URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"
62 @abstractmethod 63 def get_path( 64 self, 65 *, 66 stream_state: Optional[StreamState], 67 stream_slice: Optional[StreamSlice], 68 next_page_token: Optional[Mapping[str, Any]], 69 ) -> str: 70 """ 71 Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity" 72 """
Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity"
74 @abstractmethod 75 def get_method(self) -> HttpMethod: 76 """ 77 Specifies the HTTP method to use 78 """
Specifies the HTTP method to use
80 @abstractmethod 81 def get_request_params( 82 self, 83 *, 84 stream_state: Optional[StreamState] = None, 85 stream_slice: Optional[StreamSlice] = None, 86 next_page_token: Optional[Mapping[str, Any]] = None, 87 ) -> MutableMapping[str, Any]: 88 """ 89 Specifies the query parameters that should be set on an outgoing HTTP request given the inputs. 90 91 E.g: you might want to define query parameters for paging if next_page_token is not None. 92 """
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
94 @abstractmethod 95 def get_request_headers( 96 self, 97 *, 98 stream_state: Optional[StreamState] = None, 99 stream_slice: Optional[StreamSlice] = None, 100 next_page_token: Optional[Mapping[str, Any]] = None, 101 ) -> Mapping[str, Any]: 102 """ 103 Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method. 104 """
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
106 @abstractmethod 107 def get_request_body_data( 108 self, 109 *, 110 stream_state: Optional[StreamState] = None, 111 stream_slice: Optional[StreamSlice] = None, 112 next_page_token: Optional[Mapping[str, Any]] = None, 113 ) -> Union[Mapping[str, Any], str]: 114 """ 115 Specifies how to populate the body of the request with a non-JSON payload. 116 117 If returns a ready text that it will be sent as is. 118 If returns a dict that it will be converted to a urlencoded form. 119 E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2" 120 121 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 122 """
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
124 @abstractmethod 125 def get_request_body_json( 126 self, 127 *, 128 stream_state: Optional[StreamState] = None, 129 stream_slice: Optional[StreamSlice] = None, 130 next_page_token: Optional[Mapping[str, Any]] = None, 131 ) -> Mapping[str, Any]: 132 """ 133 Specifies how to populate the body of the request with a JSON payload. 134 135 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 136 """
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
138 @abstractmethod 139 def send_request( 140 self, 141 stream_state: Optional[StreamState] = None, 142 stream_slice: Optional[StreamSlice] = None, 143 next_page_token: Optional[Mapping[str, Any]] = None, 144 path: Optional[str] = None, 145 request_headers: Optional[Mapping[str, Any]] = None, 146 request_params: Optional[Mapping[str, Any]] = None, 147 request_body_data: Optional[Union[Mapping[str, Any], str]] = None, 148 request_body_json: Optional[Mapping[str, Any]] = None, 149 log_formatter: Optional[Callable[[requests.Response], Any]] = None, 150 ) -> Optional[requests.Response]: 151 """ 152 Sends a request and returns the response. Might return no response if the error handler chooses to ignore the response or throw an exception in case of an error. 153 If path is set, the path configured on the requester itself is ignored. 154 If header, params and body are set, they are merged with the ones configured on the requester itself. 155 156 If a log formatter is provided, it's used to log the performed request and response. If it's not provided, no logging is performed. 157 """
Sends a request and returns the response. Might return no response if the error handler chooses to ignore the response or throw an exception in case of an error. If path is set, the path configured on the requester itself is ignored. If header, params and body are set, they are merged with the ones configured on the requester itself.
If a log formatter is provided, it's used to log the performed request and response. If it's not provided, no logging is performed.