airbyte_cdk.sources.declarative.requesters.request_options
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from airbyte_cdk.sources.declarative.requesters.request_options.datetime_based_request_options_provider import ( 6 DatetimeBasedRequestOptionsProvider, 7) 8from airbyte_cdk.sources.declarative.requesters.request_options.default_request_options_provider import ( 9 DefaultRequestOptionsProvider, 10) 11from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import ( 12 InterpolatedRequestOptionsProvider, 13) 14from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import ( 15 RequestOptionsProvider, 16) 17 18__all__ = [ 19 "DatetimeBasedRequestOptionsProvider", 20 "DefaultRequestOptionsProvider", 21 "InterpolatedRequestOptionsProvider", 22 "RequestOptionsProvider", 23]
20@dataclass 21class DatetimeBasedRequestOptionsProvider(RequestOptionsProvider): 22 """ 23 Request options provider that extracts fields from the stream_slice and injects them into the respective location in the 24 outbound request being made 25 """ 26 27 config: Config 28 parameters: InitVar[Mapping[str, Any]] 29 start_time_option: Optional[RequestOption] = None 30 end_time_option: Optional[RequestOption] = None 31 partition_field_start: Optional[str] = None 32 partition_field_end: Optional[str] = None 33 34 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 35 self._partition_field_start = InterpolatedString.create( 36 self.partition_field_start or "start_time", parameters=parameters 37 ) 38 self._partition_field_end = InterpolatedString.create( 39 self.partition_field_end or "end_time", parameters=parameters 40 ) 41 42 def get_request_params( 43 self, 44 *, 45 stream_state: Optional[StreamState] = None, 46 stream_slice: Optional[StreamSlice] = None, 47 next_page_token: Optional[Mapping[str, Any]] = None, 48 ) -> Mapping[str, Any]: 49 return self._get_request_options(RequestOptionType.request_parameter, stream_slice) 50 51 def get_request_headers( 52 self, 53 *, 54 stream_state: Optional[StreamState] = None, 55 stream_slice: Optional[StreamSlice] = None, 56 next_page_token: Optional[Mapping[str, Any]] = None, 57 ) -> Mapping[str, Any]: 58 return self._get_request_options(RequestOptionType.header, stream_slice) 59 60 def get_request_body_data( 61 self, 62 *, 63 stream_state: Optional[StreamState] = None, 64 stream_slice: Optional[StreamSlice] = None, 65 next_page_token: Optional[Mapping[str, Any]] = None, 66 ) -> Union[Mapping[str, Any], str]: 67 return self._get_request_options(RequestOptionType.body_data, stream_slice) 68 69 def get_request_body_json( 70 self, 71 *, 72 stream_state: Optional[StreamState] = None, 73 stream_slice: Optional[StreamSlice] = None, 74 next_page_token: Optional[Mapping[str, Any]] = None, 75 ) -> Mapping[str, Any]: 76 return self._get_request_options(RequestOptionType.body_json, stream_slice) 77 78 def _get_request_options( 79 self, option_type: RequestOptionType, stream_slice: Optional[StreamSlice] 80 ) -> Mapping[str, Any]: 81 options: MutableMapping[str, Any] = {} 82 if not stream_slice: 83 return options 84 85 if self.start_time_option and self.start_time_option.inject_into == option_type: 86 start_time_value = stream_slice.get(self._partition_field_start.eval(self.config)) 87 self.start_time_option.inject_into_request(options, start_time_value, self.config) 88 89 if self.end_time_option and self.end_time_option.inject_into == option_type: 90 end_time_value = stream_slice.get(self._partition_field_end.eval(self.config)) 91 self.end_time_option.inject_into_request(options, end_time_value, self.config) 92 93 return options
Request options provider that extracts fields from the stream_slice and injects them into the respective location in the outbound request being made
42 def get_request_params( 43 self, 44 *, 45 stream_state: Optional[StreamState] = None, 46 stream_slice: Optional[StreamSlice] = None, 47 next_page_token: Optional[Mapping[str, Any]] = None, 48 ) -> Mapping[str, Any]: 49 return self._get_request_options(RequestOptionType.request_parameter, stream_slice)
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
51 def get_request_headers( 52 self, 53 *, 54 stream_state: Optional[StreamState] = None, 55 stream_slice: Optional[StreamSlice] = None, 56 next_page_token: Optional[Mapping[str, Any]] = None, 57 ) -> Mapping[str, Any]: 58 return self._get_request_options(RequestOptionType.header, stream_slice)
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
60 def get_request_body_data( 61 self, 62 *, 63 stream_state: Optional[StreamState] = None, 64 stream_slice: Optional[StreamSlice] = None, 65 next_page_token: Optional[Mapping[str, Any]] = None, 66 ) -> Union[Mapping[str, Any], str]: 67 return self._get_request_options(RequestOptionType.body_data, stream_slice)
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
69 def get_request_body_json( 70 self, 71 *, 72 stream_state: Optional[StreamState] = None, 73 stream_slice: Optional[StreamSlice] = None, 74 next_page_token: Optional[Mapping[str, Any]] = None, 75 ) -> Mapping[str, Any]: 76 return self._get_request_options(RequestOptionType.body_json, stream_slice)
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
15@dataclass 16class DefaultRequestOptionsProvider(RequestOptionsProvider): 17 """ 18 Request options provider that extracts fields from the stream_slice and injects them into the respective location in the 19 outbound request being made 20 """ 21 22 parameters: InitVar[Mapping[str, Any]] 23 24 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 25 pass 26 27 def get_request_params( 28 self, 29 *, 30 stream_state: Optional[StreamState] = None, 31 stream_slice: Optional[StreamSlice] = None, 32 next_page_token: Optional[Mapping[str, Any]] = None, 33 ) -> Mapping[str, Any]: 34 return {} 35 36 def get_request_headers( 37 self, 38 *, 39 stream_state: Optional[StreamState] = None, 40 stream_slice: Optional[StreamSlice] = None, 41 next_page_token: Optional[Mapping[str, Any]] = None, 42 ) -> Mapping[str, Any]: 43 return {} 44 45 def get_request_body_data( 46 self, 47 *, 48 stream_state: Optional[StreamState] = None, 49 stream_slice: Optional[StreamSlice] = None, 50 next_page_token: Optional[Mapping[str, Any]] = None, 51 ) -> Union[Mapping[str, Any], str]: 52 return {} 53 54 def get_request_body_json( 55 self, 56 *, 57 stream_state: Optional[StreamState] = None, 58 stream_slice: Optional[StreamSlice] = None, 59 next_page_token: Optional[Mapping[str, Any]] = None, 60 ) -> Mapping[str, Any]: 61 return {}
Request options provider that extracts fields from the stream_slice and injects them into the respective location in the outbound request being made
27 def get_request_params( 28 self, 29 *, 30 stream_state: Optional[StreamState] = None, 31 stream_slice: Optional[StreamSlice] = None, 32 next_page_token: Optional[Mapping[str, Any]] = None, 33 ) -> Mapping[str, Any]: 34 return {}
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
36 def get_request_headers( 37 self, 38 *, 39 stream_state: Optional[StreamState] = None, 40 stream_slice: Optional[StreamSlice] = None, 41 next_page_token: Optional[Mapping[str, Any]] = None, 42 ) -> Mapping[str, Any]: 43 return {}
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
45 def get_request_body_data( 46 self, 47 *, 48 stream_state: Optional[StreamState] = None, 49 stream_slice: Optional[StreamSlice] = None, 50 next_page_token: Optional[Mapping[str, Any]] = None, 51 ) -> Union[Mapping[str, Any], str]: 52 return {}
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
54 def get_request_body_json( 55 self, 56 *, 57 stream_state: Optional[StreamState] = None, 58 stream_slice: Optional[StreamSlice] = None, 59 next_page_token: Optional[Mapping[str, Any]] = None, 60 ) -> Mapping[str, Any]: 61 return {}
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
31@dataclass 32class InterpolatedRequestOptionsProvider(RequestOptionsProvider): 33 """ 34 Defines the request options to set on an outgoing HTTP request by evaluating `InterpolatedMapping`s 35 36 Attributes: 37 config (Config): The user-provided configuration as specified by the source's spec 38 request_parameters (Union[str, Mapping[str, str]]): The request parameters to set on an outgoing HTTP request 39 request_headers (Union[str, Mapping[str, str]]): The request headers to set on an outgoing HTTP request 40 request_body_data (Union[str, Mapping[str, str]]): The body data to set on an outgoing HTTP request 41 request_body_json (Union[str, Mapping[str, str]]): The json content to set on an outgoing HTTP request 42 """ 43 44 parameters: InitVar[Mapping[str, Any]] 45 config: Config = field(default_factory=dict) 46 request_parameters: Optional[RequestInput] = None 47 request_headers: Optional[RequestInput] = None 48 request_body: Optional[ 49 Union[ 50 RequestBodyGraphQL, 51 RequestBodyJsonObject, 52 RequestBodyPlainText, 53 RequestBodyUrlEncodedForm, 54 ] 55 ] = None 56 request_body_data: Optional[RequestInput] = None 57 request_body_json: Optional[NestedMapping] = None 58 query_properties_key: Optional[str] = None 59 60 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 61 if self.request_parameters is None: 62 self.request_parameters = {} 63 if self.request_headers is None: 64 self.request_headers = {} 65 # resolve the request body to either data or json 66 self._resolve_request_body() 67 # If request_body is not provided, set request_body_data and request_body_json to empty dicts 68 if self.request_body_data is None: 69 self.request_body_data = {} 70 if self.request_body_json is None: 71 self.request_body_json = {} 72 # If both request_body_data and request_body_json are provided, raise an error 73 if self.request_body_json and self.request_body_data: 74 raise ValueError( 75 "RequestOptionsProvider should only contain either 'request_body_data' or 'request_body_json' not both" 76 ) 77 # set interpolators 78 self._parameter_interpolator = InterpolatedRequestInputProvider( 79 config=self.config, request_inputs=self.request_parameters, parameters=parameters 80 ) 81 self._headers_interpolator = InterpolatedRequestInputProvider( 82 config=self.config, request_inputs=self.request_headers, parameters=parameters 83 ) 84 self._body_data_interpolator = InterpolatedRequestInputProvider( 85 config=self.config, request_inputs=self.request_body_data, parameters=parameters 86 ) 87 self._body_json_interpolator = InterpolatedNestedRequestInputProvider( 88 config=self.config, request_inputs=self.request_body_json, parameters=parameters 89 ) 90 91 def _resolve_request_body(self) -> None: 92 """ 93 Resolves the request body configuration by setting either `request_body_data` or `request_body_json` 94 based on the type specified in `self.request_body`. If neither is provided, both are initialized as empty 95 dictionaries. Raises a ValueError if both `request_body_data` and `request_body_json` are set simultaneously. 96 Raises: 97 ValueError: if an unsupported request body type is provided. 98 """ 99 # Resolve the request body to either data or json 100 if self.request_body is not None and self.request_body.type is not None: 101 if self.request_body.type == "RequestBodyUrlEncodedForm": 102 self.request_body_data = self.request_body.value 103 elif self.request_body.type == "RequestBodyGraphQL": 104 self.request_body_json = {"query": self.request_body.value.query} 105 elif self.request_body.type in ("RequestBodyJsonObject", "RequestBodyPlainText"): 106 self.request_body_json = self.request_body.value 107 else: 108 raise ValueError(f"Unsupported request body type: {self.request_body.type}") 109 110 def get_request_params( 111 self, 112 *, 113 stream_state: Optional[StreamState] = None, 114 stream_slice: Optional[StreamSlice] = None, 115 next_page_token: Optional[Mapping[str, Any]] = None, 116 ) -> MutableMapping[str, Any]: 117 interpolated_value = self._parameter_interpolator.eval_request_inputs( 118 stream_slice, 119 next_page_token, 120 valid_key_types=(str,), 121 valid_value_types=ValidRequestTypes, 122 ) 123 if isinstance(interpolated_value, dict): 124 if self.query_properties_key: 125 if not stream_slice: 126 raise ValueError( 127 "stream_slice should not be None if query properties in requests is enabled. Please contact Airbyte Support" 128 ) 129 elif ( 130 "query_properties" not in stream_slice.extra_fields 131 or stream_slice.extra_fields.get("query_properties") is None 132 ): 133 raise ValueError( 134 "QueryProperties component is defined but stream_partition does not contain query_properties. Please contact Airbyte Support" 135 ) 136 elif not isinstance(stream_slice.extra_fields.get("query_properties"), List): 137 raise ValueError( 138 "QueryProperties component is defined but stream_slice.extra_fields.query_properties is not a List. Please contact Airbyte Support" 139 ) 140 interpolated_value = { 141 **interpolated_value, 142 self.query_properties_key: ",".join( 143 stream_slice.extra_fields.get("query_properties") # type: ignore # Earlier type checks validate query_properties type 144 ), 145 } 146 return interpolated_value 147 return {} 148 149 def get_request_headers( 150 self, 151 *, 152 stream_state: Optional[StreamState] = None, 153 stream_slice: Optional[StreamSlice] = None, 154 next_page_token: Optional[Mapping[str, Any]] = None, 155 ) -> Mapping[str, Any]: 156 return self._headers_interpolator.eval_request_inputs(stream_slice, next_page_token) 157 158 def get_request_body_data( 159 self, 160 *, 161 stream_state: Optional[StreamState] = None, 162 stream_slice: Optional[StreamSlice] = None, 163 next_page_token: Optional[Mapping[str, Any]] = None, 164 ) -> Union[Mapping[str, Any], str]: 165 return self._body_data_interpolator.eval_request_inputs( 166 stream_slice, 167 next_page_token, 168 valid_key_types=(str,), 169 valid_value_types=ValidRequestTypes, 170 ) 171 172 def get_request_body_json( 173 self, 174 *, 175 stream_state: Optional[StreamState] = None, 176 stream_slice: Optional[StreamSlice] = None, 177 next_page_token: Optional[Mapping[str, Any]] = None, 178 ) -> Mapping[str, Any]: 179 return self._body_json_interpolator.eval_request_inputs(stream_slice, next_page_token)
Defines the request options to set on an outgoing HTTP request by evaluating InterpolatedMapping
s
Attributes:
- config (Config): The user-provided configuration as specified by the source's spec
- request_parameters (Union[str, Mapping[str, str]]): The request parameters to set on an outgoing HTTP request
- request_headers (Union[str, Mapping[str, str]]): The request headers to set on an outgoing HTTP request
- request_body_data (Union[str, Mapping[str, str]]): The body data to set on an outgoing HTTP request
- request_body_json (Union[str, Mapping[str, str]]): The json content to set on an outgoing HTTP request
110 def get_request_params( 111 self, 112 *, 113 stream_state: Optional[StreamState] = None, 114 stream_slice: Optional[StreamSlice] = None, 115 next_page_token: Optional[Mapping[str, Any]] = None, 116 ) -> MutableMapping[str, Any]: 117 interpolated_value = self._parameter_interpolator.eval_request_inputs( 118 stream_slice, 119 next_page_token, 120 valid_key_types=(str,), 121 valid_value_types=ValidRequestTypes, 122 ) 123 if isinstance(interpolated_value, dict): 124 if self.query_properties_key: 125 if not stream_slice: 126 raise ValueError( 127 "stream_slice should not be None if query properties in requests is enabled. Please contact Airbyte Support" 128 ) 129 elif ( 130 "query_properties" not in stream_slice.extra_fields 131 or stream_slice.extra_fields.get("query_properties") is None 132 ): 133 raise ValueError( 134 "QueryProperties component is defined but stream_partition does not contain query_properties. Please contact Airbyte Support" 135 ) 136 elif not isinstance(stream_slice.extra_fields.get("query_properties"), List): 137 raise ValueError( 138 "QueryProperties component is defined but stream_slice.extra_fields.query_properties is not a List. Please contact Airbyte Support" 139 ) 140 interpolated_value = { 141 **interpolated_value, 142 self.query_properties_key: ",".join( 143 stream_slice.extra_fields.get("query_properties") # type: ignore # Earlier type checks validate query_properties type 144 ), 145 } 146 return interpolated_value 147 return {}
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
149 def get_request_headers( 150 self, 151 *, 152 stream_state: Optional[StreamState] = None, 153 stream_slice: Optional[StreamSlice] = None, 154 next_page_token: Optional[Mapping[str, Any]] = None, 155 ) -> Mapping[str, Any]: 156 return self._headers_interpolator.eval_request_inputs(stream_slice, next_page_token)
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
158 def get_request_body_data( 159 self, 160 *, 161 stream_state: Optional[StreamState] = None, 162 stream_slice: Optional[StreamSlice] = None, 163 next_page_token: Optional[Mapping[str, Any]] = None, 164 ) -> Union[Mapping[str, Any], str]: 165 return self._body_data_interpolator.eval_request_inputs( 166 stream_slice, 167 next_page_token, 168 valid_key_types=(str,), 169 valid_value_types=ValidRequestTypes, 170 )
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
172 def get_request_body_json( 173 self, 174 *, 175 stream_state: Optional[StreamState] = None, 176 stream_slice: Optional[StreamSlice] = None, 177 next_page_token: Optional[Mapping[str, Any]] = None, 178 ) -> Mapping[str, Any]: 179 return self._body_json_interpolator.eval_request_inputs(stream_slice, next_page_token)
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
13@dataclass 14class RequestOptionsProvider: 15 """ 16 Defines the request options to set on an outgoing HTTP request 17 18 Options can be passed by 19 - request parameter 20 - request headers 21 - body data 22 - json content 23 """ 24 25 @abstractmethod 26 def get_request_params( 27 self, 28 *, 29 stream_state: Optional[StreamState] = None, 30 stream_slice: Optional[StreamSlice] = None, 31 next_page_token: Optional[Mapping[str, Any]] = None, 32 ) -> Mapping[str, Any]: 33 """ 34 Specifies the query parameters that should be set on an outgoing HTTP request given the inputs. 35 36 E.g: you might want to define query parameters for paging if next_page_token is not None. 37 """ 38 pass 39 40 @abstractmethod 41 def get_request_headers( 42 self, 43 *, 44 stream_state: Optional[StreamState] = None, 45 stream_slice: Optional[StreamSlice] = None, 46 next_page_token: Optional[Mapping[str, Any]] = None, 47 ) -> Mapping[str, Any]: 48 """Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.""" 49 50 @abstractmethod 51 def get_request_body_data( 52 self, 53 *, 54 stream_state: Optional[StreamState] = None, 55 stream_slice: Optional[StreamSlice] = None, 56 next_page_token: Optional[Mapping[str, Any]] = None, 57 ) -> Union[Mapping[str, Any], str]: 58 """ 59 Specifies how to populate the body of the request with a non-JSON payload. 60 61 If returns a ready text that it will be sent as is. 62 If returns a dict that it will be converted to a urlencoded form. 63 E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2" 64 65 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 66 """ 67 68 @abstractmethod 69 def get_request_body_json( 70 self, 71 *, 72 stream_state: Optional[StreamState] = None, 73 stream_slice: Optional[StreamSlice] = None, 74 next_page_token: Optional[Mapping[str, Any]] = None, 75 ) -> Mapping[str, Any]: 76 """ 77 Specifies how to populate the body of the request with a JSON payload. 78 79 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 80 """
Defines the request options to set on an outgoing HTTP request
Options can be passed by
- request parameter
- request headers
- body data
- json content
25 @abstractmethod 26 def get_request_params( 27 self, 28 *, 29 stream_state: Optional[StreamState] = None, 30 stream_slice: Optional[StreamSlice] = None, 31 next_page_token: Optional[Mapping[str, Any]] = None, 32 ) -> Mapping[str, Any]: 33 """ 34 Specifies the query parameters that should be set on an outgoing HTTP request given the inputs. 35 36 E.g: you might want to define query parameters for paging if next_page_token is not None. 37 """ 38 pass
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
40 @abstractmethod 41 def get_request_headers( 42 self, 43 *, 44 stream_state: Optional[StreamState] = None, 45 stream_slice: Optional[StreamSlice] = None, 46 next_page_token: Optional[Mapping[str, Any]] = None, 47 ) -> Mapping[str, Any]: 48 """Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method."""
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
50 @abstractmethod 51 def get_request_body_data( 52 self, 53 *, 54 stream_state: Optional[StreamState] = None, 55 stream_slice: Optional[StreamSlice] = None, 56 next_page_token: Optional[Mapping[str, Any]] = None, 57 ) -> Union[Mapping[str, Any], str]: 58 """ 59 Specifies how to populate the body of the request with a non-JSON payload. 60 61 If returns a ready text that it will be sent as is. 62 If returns a dict that it will be converted to a urlencoded form. 63 E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2" 64 65 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 66 """
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
68 @abstractmethod 69 def get_request_body_json( 70 self, 71 *, 72 stream_state: Optional[StreamState] = None, 73 stream_slice: Optional[StreamSlice] = None, 74 next_page_token: Optional[Mapping[str, Any]] = None, 75 ) -> Mapping[str, Any]: 76 """ 77 Specifies how to populate the body of the request with a JSON payload. 78 79 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 80 """
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.