airbyte_cdk.sources.declarative.requesters.request_options
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from airbyte_cdk.sources.declarative.requesters.request_options.datetime_based_request_options_provider import ( 6 DatetimeBasedRequestOptionsProvider, 7) 8from airbyte_cdk.sources.declarative.requesters.request_options.default_request_options_provider import ( 9 DefaultRequestOptionsProvider, 10) 11from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import ( 12 InterpolatedRequestOptionsProvider, 13) 14from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import ( 15 RequestOptionsProvider, 16) 17 18__all__ = [ 19 "DatetimeBasedRequestOptionsProvider", 20 "DefaultRequestOptionsProvider", 21 "InterpolatedRequestOptionsProvider", 22 "RequestOptionsProvider", 23]
20@dataclass 21class DatetimeBasedRequestOptionsProvider(RequestOptionsProvider): 22 """ 23 Request options provider that extracts fields from the stream_slice and injects them into the respective location in the 24 outbound request being made 25 """ 26 27 config: Config 28 parameters: InitVar[Mapping[str, Any]] 29 start_time_option: Optional[RequestOption] = None 30 end_time_option: Optional[RequestOption] = None 31 partition_field_start: Optional[str] = None 32 partition_field_end: Optional[str] = None 33 34 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 35 self._partition_field_start = InterpolatedString.create( 36 self.partition_field_start or "start_time", parameters=parameters 37 ) 38 self._partition_field_end = InterpolatedString.create( 39 self.partition_field_end or "end_time", parameters=parameters 40 ) 41 42 def get_request_params( 43 self, 44 *, 45 stream_state: Optional[StreamState] = None, 46 stream_slice: Optional[StreamSlice] = None, 47 next_page_token: Optional[Mapping[str, Any]] = None, 48 ) -> Mapping[str, Any]: 49 return self._get_request_options(RequestOptionType.request_parameter, stream_slice) 50 51 def get_request_headers( 52 self, 53 *, 54 stream_state: Optional[StreamState] = None, 55 stream_slice: Optional[StreamSlice] = None, 56 next_page_token: Optional[Mapping[str, Any]] = None, 57 ) -> Mapping[str, Any]: 58 return self._get_request_options(RequestOptionType.header, stream_slice) 59 60 def get_request_body_data( 61 self, 62 *, 63 stream_state: Optional[StreamState] = None, 64 stream_slice: Optional[StreamSlice] = None, 65 next_page_token: Optional[Mapping[str, Any]] = None, 66 ) -> Union[Mapping[str, Any], str]: 67 return self._get_request_options(RequestOptionType.body_data, stream_slice) 68 69 def get_request_body_json( 70 self, 71 *, 72 stream_state: Optional[StreamState] = None, 73 stream_slice: Optional[StreamSlice] = None, 74 next_page_token: Optional[Mapping[str, Any]] = None, 75 ) -> Mapping[str, Any]: 76 return self._get_request_options(RequestOptionType.body_json, stream_slice) 77 78 def _get_request_options( 79 self, option_type: RequestOptionType, stream_slice: Optional[StreamSlice] 80 ) -> Mapping[str, Any]: 81 options: MutableMapping[str, Any] = {} 82 if not stream_slice: 83 return options 84 85 if self.start_time_option and self.start_time_option.inject_into == option_type: 86 start_time_value = stream_slice.get(self._partition_field_start.eval(self.config)) 87 self.start_time_option.inject_into_request(options, start_time_value, self.config) 88 89 if self.end_time_option and self.end_time_option.inject_into == option_type: 90 end_time_value = stream_slice.get(self._partition_field_end.eval(self.config)) 91 self.end_time_option.inject_into_request(options, end_time_value, self.config) 92 93 return options
Request options provider that extracts fields from the stream_slice and injects them into the respective location in the outbound request being made
42 def get_request_params( 43 self, 44 *, 45 stream_state: Optional[StreamState] = None, 46 stream_slice: Optional[StreamSlice] = None, 47 next_page_token: Optional[Mapping[str, Any]] = None, 48 ) -> Mapping[str, Any]: 49 return self._get_request_options(RequestOptionType.request_parameter, stream_slice)
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
51 def get_request_headers( 52 self, 53 *, 54 stream_state: Optional[StreamState] = None, 55 stream_slice: Optional[StreamSlice] = None, 56 next_page_token: Optional[Mapping[str, Any]] = None, 57 ) -> Mapping[str, Any]: 58 return self._get_request_options(RequestOptionType.header, stream_slice)
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
60 def get_request_body_data( 61 self, 62 *, 63 stream_state: Optional[StreamState] = None, 64 stream_slice: Optional[StreamSlice] = None, 65 next_page_token: Optional[Mapping[str, Any]] = None, 66 ) -> Union[Mapping[str, Any], str]: 67 return self._get_request_options(RequestOptionType.body_data, stream_slice)
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
69 def get_request_body_json( 70 self, 71 *, 72 stream_state: Optional[StreamState] = None, 73 stream_slice: Optional[StreamSlice] = None, 74 next_page_token: Optional[Mapping[str, Any]] = None, 75 ) -> Mapping[str, Any]: 76 return self._get_request_options(RequestOptionType.body_json, stream_slice)
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
15@dataclass 16class DefaultRequestOptionsProvider(RequestOptionsProvider): 17 """ 18 Request options provider that extracts fields from the stream_slice and injects them into the respective location in the 19 outbound request being made 20 """ 21 22 parameters: InitVar[Mapping[str, Any]] 23 24 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 25 pass 26 27 def get_request_params( 28 self, 29 *, 30 stream_state: Optional[StreamState] = None, 31 stream_slice: Optional[StreamSlice] = None, 32 next_page_token: Optional[Mapping[str, Any]] = None, 33 ) -> Mapping[str, Any]: 34 return {} 35 36 def get_request_headers( 37 self, 38 *, 39 stream_state: Optional[StreamState] = None, 40 stream_slice: Optional[StreamSlice] = None, 41 next_page_token: Optional[Mapping[str, Any]] = None, 42 ) -> Mapping[str, Any]: 43 return {} 44 45 def get_request_body_data( 46 self, 47 *, 48 stream_state: Optional[StreamState] = None, 49 stream_slice: Optional[StreamSlice] = None, 50 next_page_token: Optional[Mapping[str, Any]] = None, 51 ) -> Union[Mapping[str, Any], str]: 52 return {} 53 54 def get_request_body_json( 55 self, 56 *, 57 stream_state: Optional[StreamState] = None, 58 stream_slice: Optional[StreamSlice] = None, 59 next_page_token: Optional[Mapping[str, Any]] = None, 60 ) -> Mapping[str, Any]: 61 return {}
Request options provider that extracts fields from the stream_slice and injects them into the respective location in the outbound request being made
27 def get_request_params( 28 self, 29 *, 30 stream_state: Optional[StreamState] = None, 31 stream_slice: Optional[StreamSlice] = None, 32 next_page_token: Optional[Mapping[str, Any]] = None, 33 ) -> Mapping[str, Any]: 34 return {}
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
36 def get_request_headers( 37 self, 38 *, 39 stream_state: Optional[StreamState] = None, 40 stream_slice: Optional[StreamSlice] = None, 41 next_page_token: Optional[Mapping[str, Any]] = None, 42 ) -> Mapping[str, Any]: 43 return {}
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
45 def get_request_body_data( 46 self, 47 *, 48 stream_state: Optional[StreamState] = None, 49 stream_slice: Optional[StreamSlice] = None, 50 next_page_token: Optional[Mapping[str, Any]] = None, 51 ) -> Union[Mapping[str, Any], str]: 52 return {}
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
54 def get_request_body_json( 55 self, 56 *, 57 stream_state: Optional[StreamState] = None, 58 stream_slice: Optional[StreamSlice] = None, 59 next_page_token: Optional[Mapping[str, Any]] = None, 60 ) -> Mapping[str, Any]: 61 return {}
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
25@dataclass 26class InterpolatedRequestOptionsProvider(RequestOptionsProvider): 27 """ 28 Defines the request options to set on an outgoing HTTP request by evaluating `InterpolatedMapping`s 29 30 Attributes: 31 config (Config): The user-provided configuration as specified by the source's spec 32 request_parameters (Union[str, Mapping[str, str]]): The request parameters to set on an outgoing HTTP request 33 request_headers (Union[str, Mapping[str, str]]): The request headers to set on an outgoing HTTP request 34 request_body_data (Union[str, Mapping[str, str]]): The body data to set on an outgoing HTTP request 35 request_body_json (Union[str, Mapping[str, str]]): The json content to set on an outgoing HTTP request 36 """ 37 38 parameters: InitVar[Mapping[str, Any]] 39 config: Config = field(default_factory=dict) 40 request_parameters: Optional[RequestInput] = None 41 request_headers: Optional[RequestInput] = None 42 request_body_data: Optional[RequestInput] = None 43 request_body_json: Optional[NestedMapping] = None 44 query_properties_key: Optional[str] = None 45 46 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 47 if self.request_parameters is None: 48 self.request_parameters = {} 49 if self.request_headers is None: 50 self.request_headers = {} 51 if self.request_body_data is None: 52 self.request_body_data = {} 53 if self.request_body_json is None: 54 self.request_body_json = {} 55 56 if self.request_body_json and self.request_body_data: 57 raise ValueError( 58 "RequestOptionsProvider should only contain either 'request_body_data' or 'request_body_json' not both" 59 ) 60 61 self._parameter_interpolator = InterpolatedRequestInputProvider( 62 config=self.config, request_inputs=self.request_parameters, parameters=parameters 63 ) 64 self._headers_interpolator = InterpolatedRequestInputProvider( 65 config=self.config, request_inputs=self.request_headers, parameters=parameters 66 ) 67 self._body_data_interpolator = InterpolatedRequestInputProvider( 68 config=self.config, request_inputs=self.request_body_data, parameters=parameters 69 ) 70 self._body_json_interpolator = InterpolatedNestedRequestInputProvider( 71 config=self.config, request_inputs=self.request_body_json, parameters=parameters 72 ) 73 74 def get_request_params( 75 self, 76 *, 77 stream_state: Optional[StreamState] = None, 78 stream_slice: Optional[StreamSlice] = None, 79 next_page_token: Optional[Mapping[str, Any]] = None, 80 ) -> MutableMapping[str, Any]: 81 interpolated_value = self._parameter_interpolator.eval_request_inputs( 82 stream_slice, 83 next_page_token, 84 valid_key_types=(str,), 85 valid_value_types=ValidRequestTypes, 86 ) 87 if isinstance(interpolated_value, dict): 88 if self.query_properties_key: 89 if not stream_slice: 90 raise ValueError( 91 "stream_slice should not be None if query properties in requests is enabled. Please contact Airbyte Support" 92 ) 93 elif ( 94 "query_properties" not in stream_slice.extra_fields 95 or stream_slice.extra_fields.get("query_properties") is None 96 ): 97 raise ValueError( 98 "QueryProperties component is defined but stream_partition does not contain query_properties. Please contact Airbyte Support" 99 ) 100 elif not isinstance(stream_slice.extra_fields.get("query_properties"), List): 101 raise ValueError( 102 "QueryProperties component is defined but stream_slice.extra_fields.query_properties is not a List. Please contact Airbyte Support" 103 ) 104 interpolated_value = { 105 **interpolated_value, 106 self.query_properties_key: ",".join( 107 stream_slice.extra_fields.get("query_properties") # type: ignore # Earlier type checks validate query_properties type 108 ), 109 } 110 return interpolated_value 111 return {} 112 113 def get_request_headers( 114 self, 115 *, 116 stream_state: Optional[StreamState] = None, 117 stream_slice: Optional[StreamSlice] = None, 118 next_page_token: Optional[Mapping[str, Any]] = None, 119 ) -> Mapping[str, Any]: 120 return self._headers_interpolator.eval_request_inputs(stream_slice, next_page_token) 121 122 def get_request_body_data( 123 self, 124 *, 125 stream_state: Optional[StreamState] = None, 126 stream_slice: Optional[StreamSlice] = None, 127 next_page_token: Optional[Mapping[str, Any]] = None, 128 ) -> Union[Mapping[str, Any], str]: 129 return self._body_data_interpolator.eval_request_inputs( 130 stream_slice, 131 next_page_token, 132 valid_key_types=(str,), 133 valid_value_types=ValidRequestTypes, 134 ) 135 136 def get_request_body_json( 137 self, 138 *, 139 stream_state: Optional[StreamState] = None, 140 stream_slice: Optional[StreamSlice] = None, 141 next_page_token: Optional[Mapping[str, Any]] = None, 142 ) -> Mapping[str, Any]: 143 return self._body_json_interpolator.eval_request_inputs(stream_slice, next_page_token)
Defines the request options to set on an outgoing HTTP request by evaluating InterpolatedMapping
s
Attributes:
- config (Config): The user-provided configuration as specified by the source's spec
- request_parameters (Union[str, Mapping[str, str]]): The request parameters to set on an outgoing HTTP request
- request_headers (Union[str, Mapping[str, str]]): The request headers to set on an outgoing HTTP request
- request_body_data (Union[str, Mapping[str, str]]): The body data to set on an outgoing HTTP request
- request_body_json (Union[str, Mapping[str, str]]): The json content to set on an outgoing HTTP request
74 def get_request_params( 75 self, 76 *, 77 stream_state: Optional[StreamState] = None, 78 stream_slice: Optional[StreamSlice] = None, 79 next_page_token: Optional[Mapping[str, Any]] = None, 80 ) -> MutableMapping[str, Any]: 81 interpolated_value = self._parameter_interpolator.eval_request_inputs( 82 stream_slice, 83 next_page_token, 84 valid_key_types=(str,), 85 valid_value_types=ValidRequestTypes, 86 ) 87 if isinstance(interpolated_value, dict): 88 if self.query_properties_key: 89 if not stream_slice: 90 raise ValueError( 91 "stream_slice should not be None if query properties in requests is enabled. Please contact Airbyte Support" 92 ) 93 elif ( 94 "query_properties" not in stream_slice.extra_fields 95 or stream_slice.extra_fields.get("query_properties") is None 96 ): 97 raise ValueError( 98 "QueryProperties component is defined but stream_partition does not contain query_properties. Please contact Airbyte Support" 99 ) 100 elif not isinstance(stream_slice.extra_fields.get("query_properties"), List): 101 raise ValueError( 102 "QueryProperties component is defined but stream_slice.extra_fields.query_properties is not a List. Please contact Airbyte Support" 103 ) 104 interpolated_value = { 105 **interpolated_value, 106 self.query_properties_key: ",".join( 107 stream_slice.extra_fields.get("query_properties") # type: ignore # Earlier type checks validate query_properties type 108 ), 109 } 110 return interpolated_value 111 return {}
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
113 def get_request_headers( 114 self, 115 *, 116 stream_state: Optional[StreamState] = None, 117 stream_slice: Optional[StreamSlice] = None, 118 next_page_token: Optional[Mapping[str, Any]] = None, 119 ) -> Mapping[str, Any]: 120 return self._headers_interpolator.eval_request_inputs(stream_slice, next_page_token)
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
122 def get_request_body_data( 123 self, 124 *, 125 stream_state: Optional[StreamState] = None, 126 stream_slice: Optional[StreamSlice] = None, 127 next_page_token: Optional[Mapping[str, Any]] = None, 128 ) -> Union[Mapping[str, Any], str]: 129 return self._body_data_interpolator.eval_request_inputs( 130 stream_slice, 131 next_page_token, 132 valid_key_types=(str,), 133 valid_value_types=ValidRequestTypes, 134 )
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
136 def get_request_body_json( 137 self, 138 *, 139 stream_state: Optional[StreamState] = None, 140 stream_slice: Optional[StreamSlice] = None, 141 next_page_token: Optional[Mapping[str, Any]] = None, 142 ) -> Mapping[str, Any]: 143 return self._body_json_interpolator.eval_request_inputs(stream_slice, next_page_token)
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
13@dataclass 14class RequestOptionsProvider: 15 """ 16 Defines the request options to set on an outgoing HTTP request 17 18 Options can be passed by 19 - request parameter 20 - request headers 21 - body data 22 - json content 23 """ 24 25 @abstractmethod 26 def get_request_params( 27 self, 28 *, 29 stream_state: Optional[StreamState] = None, 30 stream_slice: Optional[StreamSlice] = None, 31 next_page_token: Optional[Mapping[str, Any]] = None, 32 ) -> Mapping[str, Any]: 33 """ 34 Specifies the query parameters that should be set on an outgoing HTTP request given the inputs. 35 36 E.g: you might want to define query parameters for paging if next_page_token is not None. 37 """ 38 pass 39 40 @abstractmethod 41 def get_request_headers( 42 self, 43 *, 44 stream_state: Optional[StreamState] = None, 45 stream_slice: Optional[StreamSlice] = None, 46 next_page_token: Optional[Mapping[str, Any]] = None, 47 ) -> Mapping[str, Any]: 48 """Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.""" 49 50 @abstractmethod 51 def get_request_body_data( 52 self, 53 *, 54 stream_state: Optional[StreamState] = None, 55 stream_slice: Optional[StreamSlice] = None, 56 next_page_token: Optional[Mapping[str, Any]] = None, 57 ) -> Union[Mapping[str, Any], str]: 58 """ 59 Specifies how to populate the body of the request with a non-JSON payload. 60 61 If returns a ready text that it will be sent as is. 62 If returns a dict that it will be converted to a urlencoded form. 63 E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2" 64 65 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 66 """ 67 68 @abstractmethod 69 def get_request_body_json( 70 self, 71 *, 72 stream_state: Optional[StreamState] = None, 73 stream_slice: Optional[StreamSlice] = None, 74 next_page_token: Optional[Mapping[str, Any]] = None, 75 ) -> Mapping[str, Any]: 76 """ 77 Specifies how to populate the body of the request with a JSON payload. 78 79 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 80 """
Defines the request options to set on an outgoing HTTP request
Options can be passed by
- request parameter
- request headers
- body data
- json content
25 @abstractmethod 26 def get_request_params( 27 self, 28 *, 29 stream_state: Optional[StreamState] = None, 30 stream_slice: Optional[StreamSlice] = None, 31 next_page_token: Optional[Mapping[str, Any]] = None, 32 ) -> Mapping[str, Any]: 33 """ 34 Specifies the query parameters that should be set on an outgoing HTTP request given the inputs. 35 36 E.g: you might want to define query parameters for paging if next_page_token is not None. 37 """ 38 pass
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
40 @abstractmethod 41 def get_request_headers( 42 self, 43 *, 44 stream_state: Optional[StreamState] = None, 45 stream_slice: Optional[StreamSlice] = None, 46 next_page_token: Optional[Mapping[str, Any]] = None, 47 ) -> Mapping[str, Any]: 48 """Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method."""
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
50 @abstractmethod 51 def get_request_body_data( 52 self, 53 *, 54 stream_state: Optional[StreamState] = None, 55 stream_slice: Optional[StreamSlice] = None, 56 next_page_token: Optional[Mapping[str, Any]] = None, 57 ) -> Union[Mapping[str, Any], str]: 58 """ 59 Specifies how to populate the body of the request with a non-JSON payload. 60 61 If returns a ready text that it will be sent as is. 62 If returns a dict that it will be converted to a urlencoded form. 63 E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2" 64 65 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 66 """
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
68 @abstractmethod 69 def get_request_body_json( 70 self, 71 *, 72 stream_state: Optional[StreamState] = None, 73 stream_slice: Optional[StreamSlice] = None, 74 next_page_token: Optional[Mapping[str, Any]] = None, 75 ) -> Mapping[str, Any]: 76 """ 77 Specifies how to populate the body of the request with a JSON payload. 78 79 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 80 """
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.