airbyte_cdk.sources.declarative.requesters.request_options
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from airbyte_cdk.sources.declarative.requesters.request_options.datetime_based_request_options_provider import ( 6 DatetimeBasedRequestOptionsProvider, 7) 8from airbyte_cdk.sources.declarative.requesters.request_options.default_request_options_provider import ( 9 DefaultRequestOptionsProvider, 10) 11from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import ( 12 InterpolatedRequestOptionsProvider, 13) 14from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import ( 15 RequestOptionsProvider, 16) 17 18__all__ = [ 19 "DatetimeBasedRequestOptionsProvider", 20 "DefaultRequestOptionsProvider", 21 "InterpolatedRequestOptionsProvider", 22 "RequestOptionsProvider", 23]
20@dataclass 21class DatetimeBasedRequestOptionsProvider(RequestOptionsProvider): 22 """ 23 Request options provider that extracts fields from the stream_slice and injects them into the respective location in the 24 outbound request being made 25 """ 26 27 config: Config 28 parameters: InitVar[Mapping[str, Any]] 29 start_time_option: Optional[RequestOption] = None 30 end_time_option: Optional[RequestOption] = None 31 partition_field_start: Optional[str] = None 32 partition_field_end: Optional[str] = None 33 34 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 35 self._partition_field_start = InterpolatedString.create( 36 self.partition_field_start or "start_time", parameters=parameters 37 ) 38 self._partition_field_end = InterpolatedString.create( 39 self.partition_field_end or "end_time", parameters=parameters 40 ) 41 42 def get_request_params( 43 self, 44 *, 45 stream_state: Optional[StreamState] = None, 46 stream_slice: Optional[StreamSlice] = None, 47 next_page_token: Optional[Mapping[str, Any]] = None, 48 ) -> Mapping[str, Any]: 49 return self._get_request_options(RequestOptionType.request_parameter, stream_slice) 50 51 def get_request_headers( 52 self, 53 *, 54 stream_state: Optional[StreamState] = None, 55 stream_slice: Optional[StreamSlice] = None, 56 next_page_token: Optional[Mapping[str, Any]] = None, 57 ) -> Mapping[str, Any]: 58 return self._get_request_options(RequestOptionType.header, stream_slice) 59 60 def get_request_body_data( 61 self, 62 *, 63 stream_state: Optional[StreamState] = None, 64 stream_slice: Optional[StreamSlice] = None, 65 next_page_token: Optional[Mapping[str, Any]] = None, 66 ) -> Union[Mapping[str, Any], str]: 67 return self._get_request_options(RequestOptionType.body_data, stream_slice) 68 69 def get_request_body_json( 70 self, 71 *, 72 stream_state: Optional[StreamState] = None, 73 stream_slice: Optional[StreamSlice] = None, 74 next_page_token: Optional[Mapping[str, Any]] = None, 75 ) -> Mapping[str, Any]: 76 return self._get_request_options(RequestOptionType.body_json, stream_slice) 77 78 def _get_request_options( 79 self, option_type: RequestOptionType, stream_slice: Optional[StreamSlice] 80 ) -> Mapping[str, Any]: 81 options: MutableMapping[str, Any] = {} 82 if not stream_slice: 83 return options 84 85 if self.start_time_option and self.start_time_option.inject_into == option_type: 86 start_time_value = stream_slice.get(self._partition_field_start.eval(self.config)) 87 self.start_time_option.inject_into_request(options, start_time_value, self.config) 88 89 if self.end_time_option and self.end_time_option.inject_into == option_type: 90 end_time_value = stream_slice.get(self._partition_field_end.eval(self.config)) 91 self.end_time_option.inject_into_request(options, end_time_value, self.config) 92 93 return options
Request options provider that extracts fields from the stream_slice and injects them into the respective location in the outbound request being made
42 def get_request_params( 43 self, 44 *, 45 stream_state: Optional[StreamState] = None, 46 stream_slice: Optional[StreamSlice] = None, 47 next_page_token: Optional[Mapping[str, Any]] = None, 48 ) -> Mapping[str, Any]: 49 return self._get_request_options(RequestOptionType.request_parameter, stream_slice)
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
51 def get_request_headers( 52 self, 53 *, 54 stream_state: Optional[StreamState] = None, 55 stream_slice: Optional[StreamSlice] = None, 56 next_page_token: Optional[Mapping[str, Any]] = None, 57 ) -> Mapping[str, Any]: 58 return self._get_request_options(RequestOptionType.header, stream_slice)
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
60 def get_request_body_data( 61 self, 62 *, 63 stream_state: Optional[StreamState] = None, 64 stream_slice: Optional[StreamSlice] = None, 65 next_page_token: Optional[Mapping[str, Any]] = None, 66 ) -> Union[Mapping[str, Any], str]: 67 return self._get_request_options(RequestOptionType.body_data, stream_slice)
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
69 def get_request_body_json( 70 self, 71 *, 72 stream_state: Optional[StreamState] = None, 73 stream_slice: Optional[StreamSlice] = None, 74 next_page_token: Optional[Mapping[str, Any]] = None, 75 ) -> Mapping[str, Any]: 76 return self._get_request_options(RequestOptionType.body_json, stream_slice)
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
15@dataclass 16class DefaultRequestOptionsProvider(RequestOptionsProvider): 17 """ 18 Request options provider that extracts fields from the stream_slice and injects them into the respective location in the 19 outbound request being made 20 """ 21 22 parameters: InitVar[Mapping[str, Any]] 23 24 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 25 pass 26 27 def get_request_params( 28 self, 29 *, 30 stream_state: Optional[StreamState] = None, 31 stream_slice: Optional[StreamSlice] = None, 32 next_page_token: Optional[Mapping[str, Any]] = None, 33 ) -> Mapping[str, Any]: 34 return {} 35 36 def get_request_headers( 37 self, 38 *, 39 stream_state: Optional[StreamState] = None, 40 stream_slice: Optional[StreamSlice] = None, 41 next_page_token: Optional[Mapping[str, Any]] = None, 42 ) -> Mapping[str, Any]: 43 return {} 44 45 def get_request_body_data( 46 self, 47 *, 48 stream_state: Optional[StreamState] = None, 49 stream_slice: Optional[StreamSlice] = None, 50 next_page_token: Optional[Mapping[str, Any]] = None, 51 ) -> Union[Mapping[str, Any], str]: 52 return {} 53 54 def get_request_body_json( 55 self, 56 *, 57 stream_state: Optional[StreamState] = None, 58 stream_slice: Optional[StreamSlice] = None, 59 next_page_token: Optional[Mapping[str, Any]] = None, 60 ) -> Mapping[str, Any]: 61 return {}
Request options provider that extracts fields from the stream_slice and injects them into the respective location in the outbound request being made
27 def get_request_params( 28 self, 29 *, 30 stream_state: Optional[StreamState] = None, 31 stream_slice: Optional[StreamSlice] = None, 32 next_page_token: Optional[Mapping[str, Any]] = None, 33 ) -> Mapping[str, Any]: 34 return {}
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
36 def get_request_headers( 37 self, 38 *, 39 stream_state: Optional[StreamState] = None, 40 stream_slice: Optional[StreamSlice] = None, 41 next_page_token: Optional[Mapping[str, Any]] = None, 42 ) -> Mapping[str, Any]: 43 return {}
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
45 def get_request_body_data( 46 self, 47 *, 48 stream_state: Optional[StreamState] = None, 49 stream_slice: Optional[StreamSlice] = None, 50 next_page_token: Optional[Mapping[str, Any]] = None, 51 ) -> Union[Mapping[str, Any], str]: 52 return {}
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
54 def get_request_body_json( 55 self, 56 *, 57 stream_state: Optional[StreamState] = None, 58 stream_slice: Optional[StreamSlice] = None, 59 next_page_token: Optional[Mapping[str, Any]] = None, 60 ) -> Mapping[str, Any]: 61 return {}
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
25@dataclass 26class InterpolatedRequestOptionsProvider(RequestOptionsProvider): 27 """ 28 Defines the request options to set on an outgoing HTTP request by evaluating `InterpolatedMapping`s 29 30 Attributes: 31 config (Config): The user-provided configuration as specified by the source's spec 32 request_parameters (Union[str, Mapping[str, str]]): The request parameters to set on an outgoing HTTP request 33 request_headers (Union[str, Mapping[str, str]]): The request headers to set on an outgoing HTTP request 34 request_body_data (Union[str, Mapping[str, str]]): The body data to set on an outgoing HTTP request 35 request_body_json (Union[str, Mapping[str, str]]): The json content to set on an outgoing HTTP request 36 """ 37 38 parameters: InitVar[Mapping[str, Any]] 39 config: Config = field(default_factory=dict) 40 request_parameters: Optional[RequestInput] = None 41 request_headers: Optional[RequestInput] = None 42 request_body_data: Optional[RequestInput] = None 43 request_body_json: Optional[NestedMapping] = None 44 45 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 46 if self.request_parameters is None: 47 self.request_parameters = {} 48 if self.request_headers is None: 49 self.request_headers = {} 50 if self.request_body_data is None: 51 self.request_body_data = {} 52 if self.request_body_json is None: 53 self.request_body_json = {} 54 55 if self.request_body_json and self.request_body_data: 56 raise ValueError( 57 "RequestOptionsProvider should only contain either 'request_body_data' or 'request_body_json' not both" 58 ) 59 60 self._parameter_interpolator = InterpolatedRequestInputProvider( 61 config=self.config, request_inputs=self.request_parameters, parameters=parameters 62 ) 63 self._headers_interpolator = InterpolatedRequestInputProvider( 64 config=self.config, request_inputs=self.request_headers, parameters=parameters 65 ) 66 self._body_data_interpolator = InterpolatedRequestInputProvider( 67 config=self.config, request_inputs=self.request_body_data, parameters=parameters 68 ) 69 self._body_json_interpolator = InterpolatedNestedRequestInputProvider( 70 config=self.config, request_inputs=self.request_body_json, parameters=parameters 71 ) 72 73 def get_request_params( 74 self, 75 *, 76 stream_state: Optional[StreamState] = None, 77 stream_slice: Optional[StreamSlice] = None, 78 next_page_token: Optional[Mapping[str, Any]] = None, 79 ) -> MutableMapping[str, Any]: 80 interpolated_value = self._parameter_interpolator.eval_request_inputs( 81 stream_slice, 82 next_page_token, 83 valid_key_types=(str,), 84 valid_value_types=ValidRequestTypes, 85 ) 86 if isinstance(interpolated_value, dict): 87 return interpolated_value 88 return {} 89 90 def get_request_headers( 91 self, 92 *, 93 stream_state: Optional[StreamState] = None, 94 stream_slice: Optional[StreamSlice] = None, 95 next_page_token: Optional[Mapping[str, Any]] = None, 96 ) -> Mapping[str, Any]: 97 return self._headers_interpolator.eval_request_inputs(stream_slice, next_page_token) 98 99 def get_request_body_data( 100 self, 101 *, 102 stream_state: Optional[StreamState] = None, 103 stream_slice: Optional[StreamSlice] = None, 104 next_page_token: Optional[Mapping[str, Any]] = None, 105 ) -> Union[Mapping[str, Any], str]: 106 return self._body_data_interpolator.eval_request_inputs( 107 stream_slice, 108 next_page_token, 109 valid_key_types=(str,), 110 valid_value_types=ValidRequestTypes, 111 ) 112 113 def get_request_body_json( 114 self, 115 *, 116 stream_state: Optional[StreamState] = None, 117 stream_slice: Optional[StreamSlice] = None, 118 next_page_token: Optional[Mapping[str, Any]] = None, 119 ) -> Mapping[str, Any]: 120 return self._body_json_interpolator.eval_request_inputs(stream_slice, next_page_token)
Defines the request options to set on an outgoing HTTP request by evaluating InterpolatedMapping
s
Attributes:
- config (Config): The user-provided configuration as specified by the source's spec
- request_parameters (Union[str, Mapping[str, str]]): The request parameters to set on an outgoing HTTP request
- request_headers (Union[str, Mapping[str, str]]): The request headers to set on an outgoing HTTP request
- request_body_data (Union[str, Mapping[str, str]]): The body data to set on an outgoing HTTP request
- request_body_json (Union[str, Mapping[str, str]]): The json content to set on an outgoing HTTP request
73 def get_request_params( 74 self, 75 *, 76 stream_state: Optional[StreamState] = None, 77 stream_slice: Optional[StreamSlice] = None, 78 next_page_token: Optional[Mapping[str, Any]] = None, 79 ) -> MutableMapping[str, Any]: 80 interpolated_value = self._parameter_interpolator.eval_request_inputs( 81 stream_slice, 82 next_page_token, 83 valid_key_types=(str,), 84 valid_value_types=ValidRequestTypes, 85 ) 86 if isinstance(interpolated_value, dict): 87 return interpolated_value 88 return {}
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
90 def get_request_headers( 91 self, 92 *, 93 stream_state: Optional[StreamState] = None, 94 stream_slice: Optional[StreamSlice] = None, 95 next_page_token: Optional[Mapping[str, Any]] = None, 96 ) -> Mapping[str, Any]: 97 return self._headers_interpolator.eval_request_inputs(stream_slice, next_page_token)
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
99 def get_request_body_data( 100 self, 101 *, 102 stream_state: Optional[StreamState] = None, 103 stream_slice: Optional[StreamSlice] = None, 104 next_page_token: Optional[Mapping[str, Any]] = None, 105 ) -> Union[Mapping[str, Any], str]: 106 return self._body_data_interpolator.eval_request_inputs( 107 stream_slice, 108 next_page_token, 109 valid_key_types=(str,), 110 valid_value_types=ValidRequestTypes, 111 )
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
113 def get_request_body_json( 114 self, 115 *, 116 stream_state: Optional[StreamState] = None, 117 stream_slice: Optional[StreamSlice] = None, 118 next_page_token: Optional[Mapping[str, Any]] = None, 119 ) -> Mapping[str, Any]: 120 return self._body_json_interpolator.eval_request_inputs(stream_slice, next_page_token)
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
13@dataclass 14class RequestOptionsProvider: 15 """ 16 Defines the request options to set on an outgoing HTTP request 17 18 Options can be passed by 19 - request parameter 20 - request headers 21 - body data 22 - json content 23 """ 24 25 @abstractmethod 26 def get_request_params( 27 self, 28 *, 29 stream_state: Optional[StreamState] = None, 30 stream_slice: Optional[StreamSlice] = None, 31 next_page_token: Optional[Mapping[str, Any]] = None, 32 ) -> Mapping[str, Any]: 33 """ 34 Specifies the query parameters that should be set on an outgoing HTTP request given the inputs. 35 36 E.g: you might want to define query parameters for paging if next_page_token is not None. 37 """ 38 pass 39 40 @abstractmethod 41 def get_request_headers( 42 self, 43 *, 44 stream_state: Optional[StreamState] = None, 45 stream_slice: Optional[StreamSlice] = None, 46 next_page_token: Optional[Mapping[str, Any]] = None, 47 ) -> Mapping[str, Any]: 48 """Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.""" 49 50 @abstractmethod 51 def get_request_body_data( 52 self, 53 *, 54 stream_state: Optional[StreamState] = None, 55 stream_slice: Optional[StreamSlice] = None, 56 next_page_token: Optional[Mapping[str, Any]] = None, 57 ) -> Union[Mapping[str, Any], str]: 58 """ 59 Specifies how to populate the body of the request with a non-JSON payload. 60 61 If returns a ready text that it will be sent as is. 62 If returns a dict that it will be converted to a urlencoded form. 63 E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2" 64 65 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 66 """ 67 68 @abstractmethod 69 def get_request_body_json( 70 self, 71 *, 72 stream_state: Optional[StreamState] = None, 73 stream_slice: Optional[StreamSlice] = None, 74 next_page_token: Optional[Mapping[str, Any]] = None, 75 ) -> Mapping[str, Any]: 76 """ 77 Specifies how to populate the body of the request with a JSON payload. 78 79 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 80 """
Defines the request options to set on an outgoing HTTP request
Options can be passed by
- request parameter
- request headers
- body data
- json content
25 @abstractmethod 26 def get_request_params( 27 self, 28 *, 29 stream_state: Optional[StreamState] = None, 30 stream_slice: Optional[StreamSlice] = None, 31 next_page_token: Optional[Mapping[str, Any]] = None, 32 ) -> Mapping[str, Any]: 33 """ 34 Specifies the query parameters that should be set on an outgoing HTTP request given the inputs. 35 36 E.g: you might want to define query parameters for paging if next_page_token is not None. 37 """ 38 pass
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
40 @abstractmethod 41 def get_request_headers( 42 self, 43 *, 44 stream_state: Optional[StreamState] = None, 45 stream_slice: Optional[StreamSlice] = None, 46 next_page_token: Optional[Mapping[str, Any]] = None, 47 ) -> Mapping[str, Any]: 48 """Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method."""
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
50 @abstractmethod 51 def get_request_body_data( 52 self, 53 *, 54 stream_state: Optional[StreamState] = None, 55 stream_slice: Optional[StreamSlice] = None, 56 next_page_token: Optional[Mapping[str, Any]] = None, 57 ) -> Union[Mapping[str, Any], str]: 58 """ 59 Specifies how to populate the body of the request with a non-JSON payload. 60 61 If returns a ready text that it will be sent as is. 62 If returns a dict that it will be converted to a urlencoded form. 63 E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2" 64 65 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 66 """
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
68 @abstractmethod 69 def get_request_body_json( 70 self, 71 *, 72 stream_state: Optional[StreamState] = None, 73 stream_slice: Optional[StreamSlice] = None, 74 next_page_token: Optional[Mapping[str, Any]] = None, 75 ) -> Mapping[str, Any]: 76 """ 77 Specifies how to populate the body of the request with a JSON payload. 78 79 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 80 """
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.