airbyte_cdk.sources.declarative.requesters
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from airbyte_cdk.sources.declarative.requesters.http_requester import HttpRequester 6from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption 7from airbyte_cdk.sources.declarative.requesters.requester import Requester 8 9__all__ = ["HttpRequester", "RequestOption", "Requester"]
37@dataclass 38class HttpRequester(Requester): 39 """ 40 Default implementation of a Requester 41 42 Attributes: 43 name (str): Name of the stream. Only used for request/response caching 44 url_base (Union[InterpolatedString, str]): Base url to send requests to 45 path (Union[InterpolatedString, str]): Path to send requests to 46 http_method (Union[str, HttpMethod]): HTTP method to use when sending requests 47 request_options_provider (Optional[InterpolatedRequestOptionsProvider]): request option provider defining the options to set on outgoing requests 48 authenticator (DeclarativeAuthenticator): Authenticator defining how to authenticate to the source 49 error_handler (Optional[ErrorHandler]): Error handler defining how to detect and handle errors 50 backoff_strategies (Optional[List[BackoffStrategy]]): List of backoff strategies to use when retrying requests 51 config (Config): The user-provided configuration as specified by the source's spec 52 use_cache (bool): Indicates that data should be cached for this stream 53 """ 54 55 name: str 56 config: Config 57 parameters: InitVar[Mapping[str, Any]] 58 59 url: Optional[Union[InterpolatedString, str]] = None 60 url_base: Optional[Union[InterpolatedString, str]] = None 61 path: Optional[Union[InterpolatedString, str]] = None 62 authenticator: Optional[DeclarativeAuthenticator] = None 63 http_method: Union[str, HttpMethod] = HttpMethod.GET 64 request_options_provider: Optional[InterpolatedRequestOptionsProvider] = None 65 error_handler: Optional[ErrorHandler] = None 66 api_budget: Optional[APIBudget] = None 67 disable_retries: bool = False 68 message_repository: MessageRepository = NoopMessageRepository() 69 use_cache: bool = False 70 _exit_on_rate_limit: bool = False 71 stream_response: bool = False 72 decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={})) 73 74 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 75 self._url = InterpolatedString.create( 76 self.url if self.url else EmptyString, parameters=parameters 77 ) 78 # deprecated 79 self._url_base = InterpolatedString.create( 80 self.url_base if self.url_base else EmptyString, parameters=parameters 81 ) 82 # deprecated 83 self._path = InterpolatedString.create( 84 self.path if self.path else EmptyString, parameters=parameters 85 ) 86 if self.request_options_provider is None: 87 self._request_options_provider = InterpolatedRequestOptionsProvider( 88 config=self.config, parameters=parameters 89 ) 90 elif isinstance(self.request_options_provider, dict): 91 self._request_options_provider = InterpolatedRequestOptionsProvider( 92 config=self.config, **self.request_options_provider 93 ) 94 else: 95 self._request_options_provider = self.request_options_provider 96 self._authenticator = self.authenticator or NoAuth(parameters=parameters) 97 self._http_method = ( 98 HttpMethod[self.http_method] if isinstance(self.http_method, str) else self.http_method 99 ) 100 self.error_handler = self.error_handler 101 self._parameters = parameters 102 103 if self.error_handler is not None and hasattr(self.error_handler, "backoff_strategies"): 104 backoff_strategies = self.error_handler.backoff_strategies # type: ignore 105 else: 106 backoff_strategies = None 107 108 self._http_client = HttpClient( 109 name=self.name, 110 logger=self.logger, 111 error_handler=self.error_handler, 112 api_budget=self.api_budget, 113 authenticator=self._authenticator, 114 use_cache=self.use_cache, 115 backoff_strategy=backoff_strategies, 116 disable_retries=self.disable_retries, 117 message_repository=self.message_repository, 118 ) 119 120 @property 121 def exit_on_rate_limit(self) -> bool: 122 return self._exit_on_rate_limit 123 124 @exit_on_rate_limit.setter 125 def exit_on_rate_limit(self, value: bool) -> None: 126 self._exit_on_rate_limit = value 127 128 def get_authenticator(self) -> DeclarativeAuthenticator: 129 return self._authenticator 130 131 def get_url( 132 self, 133 *, 134 stream_state: Optional[StreamState] = None, 135 stream_slice: Optional[StreamSlice] = None, 136 next_page_token: Optional[Mapping[str, Any]] = None, 137 ) -> str: 138 interpolation_context = get_interpolation_context( 139 stream_state=stream_state, 140 stream_slice=stream_slice, 141 next_page_token=next_page_token, 142 ) 143 144 return str(self._url.eval(self.config, **interpolation_context)) 145 146 def _get_url( 147 self, 148 *, 149 path: Optional[str] = None, 150 stream_state: Optional[StreamState] = None, 151 stream_slice: Optional[StreamSlice] = None, 152 next_page_token: Optional[Mapping[str, Any]] = None, 153 ) -> str: 154 url = self.get_url( 155 stream_state=stream_state, 156 stream_slice=stream_slice, 157 next_page_token=next_page_token, 158 ) 159 160 url_base = self.get_url_base( 161 stream_state=stream_state, 162 stream_slice=stream_slice, 163 next_page_token=next_page_token, 164 ) 165 166 path = path or self.get_path( 167 stream_state=stream_state, 168 stream_slice=stream_slice, 169 next_page_token=next_page_token, 170 ) 171 172 full_url = self._join_url(url_base, path) if url_base else url + path if path else url 173 174 return full_url 175 176 def get_url_base( 177 self, 178 *, 179 stream_state: Optional[StreamState] = None, 180 stream_slice: Optional[StreamSlice] = None, 181 next_page_token: Optional[Mapping[str, Any]] = None, 182 ) -> str: 183 interpolation_context = get_interpolation_context( 184 stream_state=stream_state, 185 stream_slice=stream_slice, 186 next_page_token=next_page_token, 187 ) 188 return str(self._url_base.eval(self.config, **interpolation_context)) 189 190 def get_path( 191 self, 192 *, 193 stream_state: Optional[StreamState] = None, 194 stream_slice: Optional[StreamSlice] = None, 195 next_page_token: Optional[Mapping[str, Any]] = None, 196 ) -> str: 197 interpolation_context = get_interpolation_context( 198 stream_state=stream_state, 199 stream_slice=stream_slice, 200 next_page_token=next_page_token, 201 ) 202 path = str(self._path.eval(self.config, **interpolation_context)) 203 return path.lstrip("/") 204 205 def get_method(self) -> HttpMethod: 206 return self._http_method 207 208 def get_request_params( 209 self, 210 *, 211 stream_state: Optional[StreamState] = None, 212 stream_slice: Optional[StreamSlice] = None, 213 next_page_token: Optional[Mapping[str, Any]] = None, 214 ) -> MutableMapping[str, Any]: 215 return self._request_options_provider.get_request_params( 216 stream_state=stream_state, 217 stream_slice=stream_slice, 218 next_page_token=next_page_token, 219 ) 220 221 def get_request_headers( 222 self, 223 *, 224 stream_state: Optional[StreamState] = None, 225 stream_slice: Optional[StreamSlice] = None, 226 next_page_token: Optional[Mapping[str, Any]] = None, 227 ) -> Mapping[str, Any]: 228 return self._request_options_provider.get_request_headers( 229 stream_state=stream_state, 230 stream_slice=stream_slice, 231 next_page_token=next_page_token, 232 ) 233 234 # fixing request options provider types has a lot of dependencies 235 def get_request_body_data( # type: ignore 236 self, 237 *, 238 stream_state: Optional[StreamState] = None, 239 stream_slice: Optional[StreamSlice] = None, 240 next_page_token: Optional[Mapping[str, Any]] = None, 241 ) -> Union[Mapping[str, Any], str]: 242 return ( 243 self._request_options_provider.get_request_body_data( 244 stream_state=stream_state, 245 stream_slice=stream_slice, 246 next_page_token=next_page_token, 247 ) 248 or {} 249 ) 250 251 # fixing request options provider types has a lot of dependencies 252 def get_request_body_json( # type: ignore 253 self, 254 *, 255 stream_state: Optional[StreamState] = None, 256 stream_slice: Optional[StreamSlice] = None, 257 next_page_token: Optional[Mapping[str, Any]] = None, 258 ) -> Optional[Mapping[str, Any]]: 259 return self._request_options_provider.get_request_body_json( 260 stream_state=stream_state, 261 stream_slice=stream_slice, 262 next_page_token=next_page_token, 263 ) 264 265 @property 266 def logger(self) -> logging.Logger: 267 return logging.getLogger(f"airbyte.HttpRequester.{self.name}") 268 269 def _get_request_options( 270 self, 271 stream_state: Optional[StreamState], 272 stream_slice: Optional[StreamSlice], 273 next_page_token: Optional[Mapping[str, Any]], 274 requester_method: Callable[..., Optional[Union[Mapping[str, Any], str]]], 275 auth_options_method: Callable[..., Optional[Union[Mapping[str, Any], str]]], 276 extra_options: Optional[Union[Mapping[str, Any], str]] = None, 277 ) -> Union[Mapping[str, Any], str]: 278 """ 279 Get the request_option from the requester, the authenticator and extra_options passed in. 280 Raise a ValueError if there's a key collision 281 Returned merged mapping otherwise 282 """ 283 284 is_body_json = requester_method.__name__ == "get_request_body_json" 285 286 return combine_mappings( 287 [ 288 requester_method( 289 stream_state=stream_state, 290 stream_slice=stream_slice, 291 next_page_token=next_page_token, 292 ), 293 auth_options_method(), 294 extra_options, 295 ], 296 allow_same_value_merge=is_body_json, 297 ) 298 299 def _request_headers( 300 self, 301 stream_state: Optional[StreamState] = None, 302 stream_slice: Optional[StreamSlice] = None, 303 next_page_token: Optional[Mapping[str, Any]] = None, 304 extra_headers: Optional[Mapping[str, Any]] = None, 305 ) -> Mapping[str, Any]: 306 """ 307 Specifies request headers. 308 Authentication headers will overwrite any overlapping headers returned from this method. 309 """ 310 headers = self._get_request_options( 311 stream_state, 312 stream_slice, 313 next_page_token, 314 self.get_request_headers, 315 self.get_authenticator().get_auth_header, 316 extra_headers, 317 ) 318 if isinstance(headers, str): 319 raise ValueError("Request headers cannot be a string") 320 return {str(k): str(v) for k, v in headers.items()} 321 322 def _request_params( 323 self, 324 stream_state: Optional[StreamState], 325 stream_slice: Optional[StreamSlice], 326 next_page_token: Optional[Mapping[str, Any]], 327 extra_params: Optional[Mapping[str, Any]] = None, 328 ) -> Mapping[str, Any]: 329 """ 330 Specifies the query parameters that should be set on an outgoing HTTP request given the inputs. 331 332 E.g: you might want to define query parameters for paging if next_page_token is not None. 333 """ 334 options = self._get_request_options( 335 stream_state, 336 stream_slice, 337 next_page_token, 338 self.get_request_params, 339 self.get_authenticator().get_request_params, 340 extra_params, 341 ) 342 if isinstance(options, str): 343 raise ValueError("Request params cannot be a string") 344 345 for k, v in options.items(): 346 if isinstance(v, (dict,)): 347 raise ValueError( 348 f"Invalid value for `{k}` parameter. The values of request params cannot be an object." 349 ) 350 351 return options 352 353 def _request_body_data( 354 self, 355 stream_state: Optional[StreamState], 356 stream_slice: Optional[StreamSlice], 357 next_page_token: Optional[Mapping[str, Any]], 358 extra_body_data: Optional[Union[Mapping[str, Any], str]] = None, 359 ) -> Optional[Union[Mapping[str, Any], str]]: 360 """ 361 Specifies how to populate the body of the request with a non-JSON payload. 362 363 If returns a ready text that it will be sent as is. 364 If returns a dict that it will be converted to a urlencoded form. 365 E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2" 366 367 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 368 """ 369 # Warning: use self.state instead of the stream_state passed as argument! 370 return self._get_request_options( 371 stream_state, 372 stream_slice, 373 next_page_token, 374 self.get_request_body_data, 375 self.get_authenticator().get_request_body_data, 376 extra_body_data, 377 ) 378 379 def _request_body_json( 380 self, 381 stream_state: Optional[StreamState], 382 stream_slice: Optional[StreamSlice], 383 next_page_token: Optional[Mapping[str, Any]], 384 extra_body_json: Optional[Mapping[str, Any]] = None, 385 ) -> Optional[Mapping[str, Any]]: 386 """ 387 Specifies how to populate the body of the request with a JSON payload. 388 389 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 390 """ 391 # Warning: use self.state instead of the stream_state passed as argument! 392 options = self._get_request_options( 393 stream_state, 394 stream_slice, 395 next_page_token, 396 self.get_request_body_json, 397 self.get_authenticator().get_request_body_json, 398 extra_body_json, 399 ) 400 if isinstance(options, str): 401 raise ValueError("Request body json cannot be a string") 402 return options 403 404 @classmethod 405 def _join_url(cls, url_base: str, path: Optional[str] = None) -> str: 406 """ 407 Joins a base URL with a given path and returns the resulting URL with any trailing slash removed. 408 409 This method ensures that there are no duplicate slashes when concatenating the base URL and the path, 410 which is useful when the full URL is provided from an interpolation context. 411 412 Args: 413 url_base (str): The base URL to which the path will be appended. 414 path (Optional[str]): The path to join with the base URL. 415 416 Returns: 417 str: The resulting joined URL. 418 419 Note: 420 Related issue: https://github.com/airbytehq/airbyte-internal-issues/issues/11869 421 - If the path is an empty string or None, the method returns the base URL with any trailing slash removed. 422 423 Example: 424 1) _join_url("https://example.com/api/", "endpoint") >> 'https://example.com/api/endpoint' 425 2) _join_url("https://example.com/api", "/endpoint") >> 'https://example.com/api/endpoint' 426 3) _join_url("https://example.com/api/", "") >> 'https://example.com/api/' 427 4) _join_url("https://example.com/api", None) >> 'https://example.com/api' 428 """ 429 430 # return a full-url if provided directly from interpolation context 431 if path == EmptyString or path is None: 432 return url_base 433 else: 434 # since we didn't provide a full-url, the url_base might not have a trailing slash 435 # so we join the url_base and path correctly 436 if not url_base.endswith("/"): 437 url_base += "/" 438 439 return urljoin(url_base, path) 440 441 def send_request( 442 self, 443 stream_state: Optional[StreamState] = None, 444 stream_slice: Optional[StreamSlice] = None, 445 next_page_token: Optional[Mapping[str, Any]] = None, 446 path: Optional[str] = None, 447 request_headers: Optional[Mapping[str, Any]] = None, 448 request_params: Optional[Mapping[str, Any]] = None, 449 request_body_data: Optional[Union[Mapping[str, Any], str]] = None, 450 request_body_json: Optional[Mapping[str, Any]] = None, 451 log_formatter: Optional[Callable[[requests.Response], Any]] = None, 452 ) -> Optional[requests.Response]: 453 request, response = self._http_client.send_request( 454 http_method=self.get_method().value, 455 url=self._get_url( 456 path=path, 457 stream_state=stream_state, 458 stream_slice=stream_slice, 459 next_page_token=next_page_token, 460 ), 461 request_kwargs={"stream": self.stream_response}, 462 headers=self._request_headers( 463 stream_state, stream_slice, next_page_token, request_headers 464 ), 465 params=self._request_params( 466 stream_state, stream_slice, next_page_token, request_params 467 ), 468 json=self._request_body_json( 469 stream_state, stream_slice, next_page_token, request_body_json 470 ), 471 data=self._request_body_data( 472 stream_state, stream_slice, next_page_token, request_body_data 473 ), 474 dedupe_query_params=True, 475 log_formatter=log_formatter, 476 exit_on_rate_limit=self._exit_on_rate_limit, 477 ) 478 479 return response
Default implementation of a Requester
Attributes:
- name (str): Name of the stream. Only used for request/response caching
- url_base (Union[InterpolatedString, str]): Base url to send requests to
- path (Union[InterpolatedString, str]): Path to send requests to
- http_method (Union[str, HttpMethod]): HTTP method to use when sending requests
- request_options_provider (Optional[InterpolatedRequestOptionsProvider]): request option provider defining the options to set on outgoing requests
- authenticator (DeclarativeAuthenticator): Authenticator defining how to authenticate to the source
- error_handler (Optional[ErrorHandler]): Error handler defining how to detect and handle errors
- backoff_strategies (Optional[List[BackoffStrategy]]): List of backoff strategies to use when retrying requests
- config (Config): The user-provided configuration as specified by the source's spec
- use_cache (bool): Indicates that data should be cached for this stream
131 def get_url( 132 self, 133 *, 134 stream_state: Optional[StreamState] = None, 135 stream_slice: Optional[StreamSlice] = None, 136 next_page_token: Optional[Mapping[str, Any]] = None, 137 ) -> str: 138 interpolation_context = get_interpolation_context( 139 stream_state=stream_state, 140 stream_slice=stream_slice, 141 next_page_token=next_page_token, 142 ) 143 144 return str(self._url.eval(self.config, **interpolation_context))
Returns
URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"
176 def get_url_base( 177 self, 178 *, 179 stream_state: Optional[StreamState] = None, 180 stream_slice: Optional[StreamSlice] = None, 181 next_page_token: Optional[Mapping[str, Any]] = None, 182 ) -> str: 183 interpolation_context = get_interpolation_context( 184 stream_state=stream_state, 185 stream_slice=stream_slice, 186 next_page_token=next_page_token, 187 ) 188 return str(self._url_base.eval(self.config, **interpolation_context))
Returns
URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"
190 def get_path( 191 self, 192 *, 193 stream_state: Optional[StreamState] = None, 194 stream_slice: Optional[StreamSlice] = None, 195 next_page_token: Optional[Mapping[str, Any]] = None, 196 ) -> str: 197 interpolation_context = get_interpolation_context( 198 stream_state=stream_state, 199 stream_slice=stream_slice, 200 next_page_token=next_page_token, 201 ) 202 path = str(self._path.eval(self.config, **interpolation_context)) 203 return path.lstrip("/")
Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity"
208 def get_request_params( 209 self, 210 *, 211 stream_state: Optional[StreamState] = None, 212 stream_slice: Optional[StreamSlice] = None, 213 next_page_token: Optional[Mapping[str, Any]] = None, 214 ) -> MutableMapping[str, Any]: 215 return self._request_options_provider.get_request_params( 216 stream_state=stream_state, 217 stream_slice=stream_slice, 218 next_page_token=next_page_token, 219 )
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
221 def get_request_headers( 222 self, 223 *, 224 stream_state: Optional[StreamState] = None, 225 stream_slice: Optional[StreamSlice] = None, 226 next_page_token: Optional[Mapping[str, Any]] = None, 227 ) -> Mapping[str, Any]: 228 return self._request_options_provider.get_request_headers( 229 stream_state=stream_state, 230 stream_slice=stream_slice, 231 next_page_token=next_page_token, 232 )
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
235 def get_request_body_data( # type: ignore 236 self, 237 *, 238 stream_state: Optional[StreamState] = None, 239 stream_slice: Optional[StreamSlice] = None, 240 next_page_token: Optional[Mapping[str, Any]] = None, 241 ) -> Union[Mapping[str, Any], str]: 242 return ( 243 self._request_options_provider.get_request_body_data( 244 stream_state=stream_state, 245 stream_slice=stream_slice, 246 next_page_token=next_page_token, 247 ) 248 or {} 249 )
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
252 def get_request_body_json( # type: ignore 253 self, 254 *, 255 stream_state: Optional[StreamState] = None, 256 stream_slice: Optional[StreamSlice] = None, 257 next_page_token: Optional[Mapping[str, Any]] = None, 258 ) -> Optional[Mapping[str, Any]]: 259 return self._request_options_provider.get_request_body_json( 260 stream_state=stream_state, 261 stream_slice=stream_slice, 262 next_page_token=next_page_token, 263 )
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
441 def send_request( 442 self, 443 stream_state: Optional[StreamState] = None, 444 stream_slice: Optional[StreamSlice] = None, 445 next_page_token: Optional[Mapping[str, Any]] = None, 446 path: Optional[str] = None, 447 request_headers: Optional[Mapping[str, Any]] = None, 448 request_params: Optional[Mapping[str, Any]] = None, 449 request_body_data: Optional[Union[Mapping[str, Any], str]] = None, 450 request_body_json: Optional[Mapping[str, Any]] = None, 451 log_formatter: Optional[Callable[[requests.Response], Any]] = None, 452 ) -> Optional[requests.Response]: 453 request, response = self._http_client.send_request( 454 http_method=self.get_method().value, 455 url=self._get_url( 456 path=path, 457 stream_state=stream_state, 458 stream_slice=stream_slice, 459 next_page_token=next_page_token, 460 ), 461 request_kwargs={"stream": self.stream_response}, 462 headers=self._request_headers( 463 stream_state, stream_slice, next_page_token, request_headers 464 ), 465 params=self._request_params( 466 stream_state, stream_slice, next_page_token, request_params 467 ), 468 json=self._request_body_json( 469 stream_state, stream_slice, next_page_token, request_body_json 470 ), 471 data=self._request_body_data( 472 stream_state, stream_slice, next_page_token, request_body_data 473 ), 474 dedupe_query_params=True, 475 log_formatter=log_formatter, 476 exit_on_rate_limit=self._exit_on_rate_limit, 477 ) 478 479 return response
Sends a request and returns the response. Might return no response if the error handler chooses to ignore the response or throw an exception in case of an error. If path is set, the path configured on the requester itself is ignored. If header, params and body are set, they are merged with the ones configured on the requester itself.
If a log formatter is provided, it's used to log the performed request and response. If it's not provided, no logging is performed.
25@dataclass 26class RequestOption: 27 """ 28 Describes an option to set on a request 29 30 Attributes: 31 field_name (str): Describes the name of the parameter to inject. Mutually exclusive with field_path. 32 field_path (list(str)): Describes the path to a nested field as a list of field names. 33 Only valid for body_json injection type, and mutually exclusive with field_name. 34 inject_into (RequestOptionType): Describes where in the HTTP request to inject the parameter 35 """ 36 37 inject_into: RequestOptionType 38 parameters: InitVar[Mapping[str, Any]] 39 field_name: Optional[Union[InterpolatedString, str]] = None 40 field_path: Optional[List[Union[InterpolatedString, str]]] = None 41 42 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 43 # Validate inputs. We should expect either field_name or field_path, but not both 44 if self.field_name is None and self.field_path is None: 45 raise ValueError("RequestOption requires either a field_name or field_path") 46 47 if self.field_name is not None and self.field_path is not None: 48 raise ValueError( 49 "Only one of field_name or field_path can be provided to RequestOption" 50 ) 51 52 # Nested field injection is only supported for body JSON injection 53 if self.field_path is not None and self.inject_into != RequestOptionType.body_json: 54 raise ValueError( 55 "Nested field injection is only supported for body JSON injection. Please use a top-level field_name for other injection types." 56 ) 57 58 # Convert field_name and field_path into InterpolatedString objects if they are strings 59 if self.field_name is not None: 60 self.field_name = InterpolatedString.create(self.field_name, parameters=parameters) 61 elif self.field_path is not None: 62 self.field_path = [ 63 InterpolatedString.create(segment, parameters=parameters) 64 for segment in self.field_path 65 ] 66 67 @property 68 def _is_field_path(self) -> bool: 69 """Returns whether this option is a field path (ie, a nested field)""" 70 return self.field_path is not None 71 72 def inject_into_request( 73 self, 74 target: MutableMapping[str, Any], 75 value: Any, 76 config: Config, 77 ) -> None: 78 """ 79 Inject a request option value into a target request structure using either field_name or field_path. 80 For non-body-json injection, only top-level field names are supported. 81 For body-json injection, both field names and nested field paths are supported. 82 83 Args: 84 target: The request structure to inject the value into 85 value: The value to inject 86 config: The config object to use for interpolation 87 """ 88 if self._is_field_path: 89 if self.inject_into != RequestOptionType.body_json: 90 raise ValueError( 91 "Nested field injection is only supported for body JSON injection. Please use a top-level field_name for other injection types." 92 ) 93 94 assert self.field_path is not None # for type checker 95 current = target 96 # Convert path segments into strings, evaluating any interpolated segments 97 # Example: ["data", "{{ config[user_type] }}", "id"] -> ["data", "admin", "id"] 98 *path_parts, final_key = [ 99 str( 100 segment.eval(config=config) 101 if isinstance(segment, InterpolatedString) 102 else segment 103 ) 104 for segment in self.field_path 105 ] 106 107 # Build a nested dictionary structure and set the final value at the deepest level 108 for part in path_parts: 109 current = current.setdefault(part, {}) 110 current[final_key] = value 111 else: 112 # For non-nested fields, evaluate the field name if it's an interpolated string 113 key = ( 114 self.field_name.eval(config=config) 115 if isinstance(self.field_name, InterpolatedString) 116 else self.field_name 117 ) 118 target[str(key)] = value
Describes an option to set on a request
Attributes:
- field_name (str): Describes the name of the parameter to inject. Mutually exclusive with field_path.
- field_path (list(str)): Describes the path to a nested field as a list of field names. Only valid for body_json injection type, and mutually exclusive with field_name.
- inject_into (RequestOptionType): Describes where in the HTTP request to inject the parameter
72 def inject_into_request( 73 self, 74 target: MutableMapping[str, Any], 75 value: Any, 76 config: Config, 77 ) -> None: 78 """ 79 Inject a request option value into a target request structure using either field_name or field_path. 80 For non-body-json injection, only top-level field names are supported. 81 For body-json injection, both field names and nested field paths are supported. 82 83 Args: 84 target: The request structure to inject the value into 85 value: The value to inject 86 config: The config object to use for interpolation 87 """ 88 if self._is_field_path: 89 if self.inject_into != RequestOptionType.body_json: 90 raise ValueError( 91 "Nested field injection is only supported for body JSON injection. Please use a top-level field_name for other injection types." 92 ) 93 94 assert self.field_path is not None # for type checker 95 current = target 96 # Convert path segments into strings, evaluating any interpolated segments 97 # Example: ["data", "{{ config[user_type] }}", "id"] -> ["data", "admin", "id"] 98 *path_parts, final_key = [ 99 str( 100 segment.eval(config=config) 101 if isinstance(segment, InterpolatedString) 102 else segment 103 ) 104 for segment in self.field_path 105 ] 106 107 # Build a nested dictionary structure and set the final value at the deepest level 108 for part in path_parts: 109 current = current.setdefault(part, {}) 110 current[final_key] = value 111 else: 112 # For non-nested fields, evaluate the field name if it's an interpolated string 113 key = ( 114 self.field_name.eval(config=config) 115 if isinstance(self.field_name, InterpolatedString) 116 else self.field_name 117 ) 118 target[str(key)] = value
Inject a request option value into a target request structure using either field_name or field_path. For non-body-json injection, only top-level field names are supported. For body-json injection, both field names and nested field paths are supported.
Arguments:
- target: The request structure to inject the value into
- value: The value to inject
- config: The config object to use for interpolation
30class Requester(RequestOptionsProvider): 31 @abstractmethod 32 def get_authenticator(self) -> DeclarativeAuthenticator: 33 """ 34 Specifies the authenticator to use when submitting requests 35 """ 36 pass 37 38 @abstractmethod 39 def get_url( 40 self, 41 *, 42 stream_state: Optional[StreamState], 43 stream_slice: Optional[StreamSlice], 44 next_page_token: Optional[Mapping[str, Any]], 45 ) -> str: 46 """ 47 :return: URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/" 48 """ 49 50 @abstractmethod 51 def get_url_base( 52 self, 53 *, 54 stream_state: Optional[StreamState], 55 stream_slice: Optional[StreamSlice], 56 next_page_token: Optional[Mapping[str, Any]], 57 ) -> str: 58 """ 59 :return: URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/" 60 """ 61 62 @abstractmethod 63 def get_path( 64 self, 65 *, 66 stream_state: Optional[StreamState], 67 stream_slice: Optional[StreamSlice], 68 next_page_token: Optional[Mapping[str, Any]], 69 ) -> str: 70 """ 71 Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity" 72 """ 73 74 @abstractmethod 75 def get_method(self) -> HttpMethod: 76 """ 77 Specifies the HTTP method to use 78 """ 79 80 @abstractmethod 81 def get_request_params( 82 self, 83 *, 84 stream_state: Optional[StreamState] = None, 85 stream_slice: Optional[StreamSlice] = None, 86 next_page_token: Optional[Mapping[str, Any]] = None, 87 ) -> MutableMapping[str, Any]: 88 """ 89 Specifies the query parameters that should be set on an outgoing HTTP request given the inputs. 90 91 E.g: you might want to define query parameters for paging if next_page_token is not None. 92 """ 93 94 @abstractmethod 95 def get_request_headers( 96 self, 97 *, 98 stream_state: Optional[StreamState] = None, 99 stream_slice: Optional[StreamSlice] = None, 100 next_page_token: Optional[Mapping[str, Any]] = None, 101 ) -> Mapping[str, Any]: 102 """ 103 Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method. 104 """ 105 106 @abstractmethod 107 def get_request_body_data( 108 self, 109 *, 110 stream_state: Optional[StreamState] = None, 111 stream_slice: Optional[StreamSlice] = None, 112 next_page_token: Optional[Mapping[str, Any]] = None, 113 ) -> Union[Mapping[str, Any], str]: 114 """ 115 Specifies how to populate the body of the request with a non-JSON payload. 116 117 If returns a ready text that it will be sent as is. 118 If returns a dict that it will be converted to a urlencoded form. 119 E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2" 120 121 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 122 """ 123 124 @abstractmethod 125 def get_request_body_json( 126 self, 127 *, 128 stream_state: Optional[StreamState] = None, 129 stream_slice: Optional[StreamSlice] = None, 130 next_page_token: Optional[Mapping[str, Any]] = None, 131 ) -> Mapping[str, Any]: 132 """ 133 Specifies how to populate the body of the request with a JSON payload. 134 135 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 136 """ 137 138 @abstractmethod 139 def send_request( 140 self, 141 stream_state: Optional[StreamState] = None, 142 stream_slice: Optional[StreamSlice] = None, 143 next_page_token: Optional[Mapping[str, Any]] = None, 144 path: Optional[str] = None, 145 request_headers: Optional[Mapping[str, Any]] = None, 146 request_params: Optional[Mapping[str, Any]] = None, 147 request_body_data: Optional[Union[Mapping[str, Any], str]] = None, 148 request_body_json: Optional[Mapping[str, Any]] = None, 149 log_formatter: Optional[Callable[[requests.Response], Any]] = None, 150 ) -> Optional[requests.Response]: 151 """ 152 Sends a request and returns the response. Might return no response if the error handler chooses to ignore the response or throw an exception in case of an error. 153 If path is set, the path configured on the requester itself is ignored. 154 If header, params and body are set, they are merged with the ones configured on the requester itself. 155 156 If a log formatter is provided, it's used to log the performed request and response. If it's not provided, no logging is performed. 157 """
Defines the request options to set on an outgoing HTTP request
Options can be passed by
- request parameter
- request headers
- body data
- json content
31 @abstractmethod 32 def get_authenticator(self) -> DeclarativeAuthenticator: 33 """ 34 Specifies the authenticator to use when submitting requests 35 """ 36 pass
Specifies the authenticator to use when submitting requests
38 @abstractmethod 39 def get_url( 40 self, 41 *, 42 stream_state: Optional[StreamState], 43 stream_slice: Optional[StreamSlice], 44 next_page_token: Optional[Mapping[str, Any]], 45 ) -> str: 46 """ 47 :return: URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/" 48 """
Returns
URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"
50 @abstractmethod 51 def get_url_base( 52 self, 53 *, 54 stream_state: Optional[StreamState], 55 stream_slice: Optional[StreamSlice], 56 next_page_token: Optional[Mapping[str, Any]], 57 ) -> str: 58 """ 59 :return: URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/" 60 """
Returns
URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"
62 @abstractmethod 63 def get_path( 64 self, 65 *, 66 stream_state: Optional[StreamState], 67 stream_slice: Optional[StreamSlice], 68 next_page_token: Optional[Mapping[str, Any]], 69 ) -> str: 70 """ 71 Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity" 72 """
Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity"
74 @abstractmethod 75 def get_method(self) -> HttpMethod: 76 """ 77 Specifies the HTTP method to use 78 """
Specifies the HTTP method to use
80 @abstractmethod 81 def get_request_params( 82 self, 83 *, 84 stream_state: Optional[StreamState] = None, 85 stream_slice: Optional[StreamSlice] = None, 86 next_page_token: Optional[Mapping[str, Any]] = None, 87 ) -> MutableMapping[str, Any]: 88 """ 89 Specifies the query parameters that should be set on an outgoing HTTP request given the inputs. 90 91 E.g: you might want to define query parameters for paging if next_page_token is not None. 92 """
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
94 @abstractmethod 95 def get_request_headers( 96 self, 97 *, 98 stream_state: Optional[StreamState] = None, 99 stream_slice: Optional[StreamSlice] = None, 100 next_page_token: Optional[Mapping[str, Any]] = None, 101 ) -> Mapping[str, Any]: 102 """ 103 Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method. 104 """
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
106 @abstractmethod 107 def get_request_body_data( 108 self, 109 *, 110 stream_state: Optional[StreamState] = None, 111 stream_slice: Optional[StreamSlice] = None, 112 next_page_token: Optional[Mapping[str, Any]] = None, 113 ) -> Union[Mapping[str, Any], str]: 114 """ 115 Specifies how to populate the body of the request with a non-JSON payload. 116 117 If returns a ready text that it will be sent as is. 118 If returns a dict that it will be converted to a urlencoded form. 119 E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2" 120 121 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 122 """
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
124 @abstractmethod 125 def get_request_body_json( 126 self, 127 *, 128 stream_state: Optional[StreamState] = None, 129 stream_slice: Optional[StreamSlice] = None, 130 next_page_token: Optional[Mapping[str, Any]] = None, 131 ) -> Mapping[str, Any]: 132 """ 133 Specifies how to populate the body of the request with a JSON payload. 134 135 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 136 """
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
138 @abstractmethod 139 def send_request( 140 self, 141 stream_state: Optional[StreamState] = None, 142 stream_slice: Optional[StreamSlice] = None, 143 next_page_token: Optional[Mapping[str, Any]] = None, 144 path: Optional[str] = None, 145 request_headers: Optional[Mapping[str, Any]] = None, 146 request_params: Optional[Mapping[str, Any]] = None, 147 request_body_data: Optional[Union[Mapping[str, Any], str]] = None, 148 request_body_json: Optional[Mapping[str, Any]] = None, 149 log_formatter: Optional[Callable[[requests.Response], Any]] = None, 150 ) -> Optional[requests.Response]: 151 """ 152 Sends a request and returns the response. Might return no response if the error handler chooses to ignore the response or throw an exception in case of an error. 153 If path is set, the path configured on the requester itself is ignored. 154 If header, params and body are set, they are merged with the ones configured on the requester itself. 155 156 If a log formatter is provided, it's used to log the performed request and response. If it's not provided, no logging is performed. 157 """
Sends a request and returns the response. Might return no response if the error handler chooses to ignore the response or throw an exception in case of an error. If path is set, the path configured on the requester itself is ignored. If header, params and body are set, they are merged with the ones configured on the requester itself.
If a log formatter is provided, it's used to log the performed request and response. If it's not provided, no logging is performed.