airbyte_cdk.sources.declarative.requesters.request_options

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5from airbyte_cdk.sources.declarative.requesters.request_options.datetime_based_request_options_provider import (
 6    DatetimeBasedRequestOptionsProvider,
 7)
 8from airbyte_cdk.sources.declarative.requesters.request_options.default_request_options_provider import (
 9    DefaultRequestOptionsProvider,
10)
11from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import (
12    InterpolatedRequestOptionsProvider,
13)
14from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import (
15    RequestOptionsProvider,
16)
17
18__all__ = [
19    "DatetimeBasedRequestOptionsProvider",
20    "DefaultRequestOptionsProvider",
21    "InterpolatedRequestOptionsProvider",
22    "RequestOptionsProvider",
23]
@dataclass
class DatetimeBasedRequestOptionsProvider(airbyte_cdk.sources.declarative.requesters.request_options.RequestOptionsProvider):
20@dataclass
21class DatetimeBasedRequestOptionsProvider(RequestOptionsProvider):
22    """
23    Request options provider that extracts fields from the stream_slice and injects them into the respective location in the
24    outbound request being made
25    """
26
27    config: Config
28    parameters: InitVar[Mapping[str, Any]]
29    start_time_option: Optional[RequestOption] = None
30    end_time_option: Optional[RequestOption] = None
31    partition_field_start: Optional[str] = None
32    partition_field_end: Optional[str] = None
33
34    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
35        self._partition_field_start = InterpolatedString.create(
36            self.partition_field_start or "start_time", parameters=parameters
37        )
38        self._partition_field_end = InterpolatedString.create(
39            self.partition_field_end or "end_time", parameters=parameters
40        )
41
42    def get_request_params(
43        self,
44        *,
45        stream_state: Optional[StreamState] = None,
46        stream_slice: Optional[StreamSlice] = None,
47        next_page_token: Optional[Mapping[str, Any]] = None,
48    ) -> Mapping[str, Any]:
49        return self._get_request_options(RequestOptionType.request_parameter, stream_slice)
50
51    def get_request_headers(
52        self,
53        *,
54        stream_state: Optional[StreamState] = None,
55        stream_slice: Optional[StreamSlice] = None,
56        next_page_token: Optional[Mapping[str, Any]] = None,
57    ) -> Mapping[str, Any]:
58        return self._get_request_options(RequestOptionType.header, stream_slice)
59
60    def get_request_body_data(
61        self,
62        *,
63        stream_state: Optional[StreamState] = None,
64        stream_slice: Optional[StreamSlice] = None,
65        next_page_token: Optional[Mapping[str, Any]] = None,
66    ) -> Union[Mapping[str, Any], str]:
67        return self._get_request_options(RequestOptionType.body_data, stream_slice)
68
69    def get_request_body_json(
70        self,
71        *,
72        stream_state: Optional[StreamState] = None,
73        stream_slice: Optional[StreamSlice] = None,
74        next_page_token: Optional[Mapping[str, Any]] = None,
75    ) -> Mapping[str, Any]:
76        return self._get_request_options(RequestOptionType.body_json, stream_slice)
77
78    def _get_request_options(
79        self, option_type: RequestOptionType, stream_slice: Optional[StreamSlice]
80    ) -> Mapping[str, Any]:
81        options: MutableMapping[str, Any] = {}
82        if not stream_slice:
83            return options
84
85        if self.start_time_option and self.start_time_option.inject_into == option_type:
86            start_time_value = stream_slice.get(self._partition_field_start.eval(self.config))
87            self.start_time_option.inject_into_request(options, start_time_value, self.config)
88
89        if self.end_time_option and self.end_time_option.inject_into == option_type:
90            end_time_value = stream_slice.get(self._partition_field_end.eval(self.config))
91            self.end_time_option.inject_into_request(options, end_time_value, self.config)
92
93        return options

Request options provider that extracts fields from the stream_slice and injects them into the respective location in the outbound request being made

DatetimeBasedRequestOptionsProvider( config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], start_time_option: Optional[airbyte_cdk.RequestOption] = None, end_time_option: Optional[airbyte_cdk.RequestOption] = None, partition_field_start: Optional[str] = None, partition_field_end: Optional[str] = None)
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
start_time_option: Optional[airbyte_cdk.RequestOption] = None
end_time_option: Optional[airbyte_cdk.RequestOption] = None
partition_field_start: Optional[str] = None
partition_field_end: Optional[str] = None
def get_request_params( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
42    def get_request_params(
43        self,
44        *,
45        stream_state: Optional[StreamState] = None,
46        stream_slice: Optional[StreamSlice] = None,
47        next_page_token: Optional[Mapping[str, Any]] = None,
48    ) -> Mapping[str, Any]:
49        return self._get_request_options(RequestOptionType.request_parameter, stream_slice)

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def get_request_headers( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
51    def get_request_headers(
52        self,
53        *,
54        stream_state: Optional[StreamState] = None,
55        stream_slice: Optional[StreamSlice] = None,
56        next_page_token: Optional[Mapping[str, Any]] = None,
57    ) -> Mapping[str, Any]:
58        return self._get_request_options(RequestOptionType.header, stream_slice)

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def get_request_body_data( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Union[Mapping[str, Any], str]:
60    def get_request_body_data(
61        self,
62        *,
63        stream_state: Optional[StreamState] = None,
64        stream_slice: Optional[StreamSlice] = None,
65        next_page_token: Optional[Mapping[str, Any]] = None,
66    ) -> Union[Mapping[str, Any], str]:
67        return self._get_request_options(RequestOptionType.body_data, stream_slice)

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_request_body_json( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
69    def get_request_body_json(
70        self,
71        *,
72        stream_state: Optional[StreamState] = None,
73        stream_slice: Optional[StreamSlice] = None,
74        next_page_token: Optional[Mapping[str, Any]] = None,
75    ) -> Mapping[str, Any]:
76        return self._get_request_options(RequestOptionType.body_json, stream_slice)

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

@dataclass
class DefaultRequestOptionsProvider(airbyte_cdk.sources.declarative.requesters.request_options.RequestOptionsProvider):
15@dataclass
16class DefaultRequestOptionsProvider(RequestOptionsProvider):
17    """
18    Request options provider that extracts fields from the stream_slice and injects them into the respective location in the
19    outbound request being made
20    """
21
22    parameters: InitVar[Mapping[str, Any]]
23
24    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
25        pass
26
27    def get_request_params(
28        self,
29        *,
30        stream_state: Optional[StreamState] = None,
31        stream_slice: Optional[StreamSlice] = None,
32        next_page_token: Optional[Mapping[str, Any]] = None,
33    ) -> Mapping[str, Any]:
34        return {}
35
36    def get_request_headers(
37        self,
38        *,
39        stream_state: Optional[StreamState] = None,
40        stream_slice: Optional[StreamSlice] = None,
41        next_page_token: Optional[Mapping[str, Any]] = None,
42    ) -> Mapping[str, Any]:
43        return {}
44
45    def get_request_body_data(
46        self,
47        *,
48        stream_state: Optional[StreamState] = None,
49        stream_slice: Optional[StreamSlice] = None,
50        next_page_token: Optional[Mapping[str, Any]] = None,
51    ) -> Union[Mapping[str, Any], str]:
52        return {}
53
54    def get_request_body_json(
55        self,
56        *,
57        stream_state: Optional[StreamState] = None,
58        stream_slice: Optional[StreamSlice] = None,
59        next_page_token: Optional[Mapping[str, Any]] = None,
60    ) -> Mapping[str, Any]:
61        return {}

Request options provider that extracts fields from the stream_slice and injects them into the respective location in the outbound request being made

DefaultRequestOptionsProvider(parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def get_request_params( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
27    def get_request_params(
28        self,
29        *,
30        stream_state: Optional[StreamState] = None,
31        stream_slice: Optional[StreamSlice] = None,
32        next_page_token: Optional[Mapping[str, Any]] = None,
33    ) -> Mapping[str, Any]:
34        return {}

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def get_request_headers( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
36    def get_request_headers(
37        self,
38        *,
39        stream_state: Optional[StreamState] = None,
40        stream_slice: Optional[StreamSlice] = None,
41        next_page_token: Optional[Mapping[str, Any]] = None,
42    ) -> Mapping[str, Any]:
43        return {}

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def get_request_body_data( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Union[Mapping[str, Any], str]:
45    def get_request_body_data(
46        self,
47        *,
48        stream_state: Optional[StreamState] = None,
49        stream_slice: Optional[StreamSlice] = None,
50        next_page_token: Optional[Mapping[str, Any]] = None,
51    ) -> Union[Mapping[str, Any], str]:
52        return {}

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_request_body_json( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
54    def get_request_body_json(
55        self,
56        *,
57        stream_state: Optional[StreamState] = None,
58        stream_slice: Optional[StreamSlice] = None,
59        next_page_token: Optional[Mapping[str, Any]] = None,
60    ) -> Mapping[str, Any]:
61        return {}

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

@dataclass
class InterpolatedRequestOptionsProvider(airbyte_cdk.sources.declarative.requesters.request_options.RequestOptionsProvider):
 31@dataclass
 32class InterpolatedRequestOptionsProvider(RequestOptionsProvider):
 33    """
 34    Defines the request options to set on an outgoing HTTP request by evaluating `InterpolatedMapping`s
 35
 36    Attributes:
 37        config (Config): The user-provided configuration as specified by the source's spec
 38        request_parameters (Union[str, Mapping[str, str]]): The request parameters to set on an outgoing HTTP request
 39        request_headers (Union[str, Mapping[str, str]]): The request headers to set on an outgoing HTTP request
 40        request_body_data (Union[str, Mapping[str, str]]): The body data to set on an outgoing HTTP request
 41        request_body_json (Union[str, Mapping[str, str]]): The json content to set on an outgoing HTTP request
 42    """
 43
 44    parameters: InitVar[Mapping[str, Any]]
 45    config: Config = field(default_factory=dict)
 46    request_parameters: Optional[RequestInput] = None
 47    request_headers: Optional[RequestInput] = None
 48    request_body: Optional[
 49        Union[
 50            RequestBodyGraphQL,
 51            RequestBodyJsonObject,
 52            RequestBodyPlainText,
 53            RequestBodyUrlEncodedForm,
 54        ]
 55    ] = None
 56    request_body_data: Optional[RequestInput] = None
 57    request_body_json: Optional[NestedMapping] = None
 58    query_properties_key: Optional[str] = None
 59
 60    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 61        if self.request_parameters is None:
 62            self.request_parameters = {}
 63        if self.request_headers is None:
 64            self.request_headers = {}
 65        # resolve the request body to either data or json
 66        self._resolve_request_body()
 67        # If request_body is not provided, set request_body_data and request_body_json to empty dicts
 68        if self.request_body_data is None:
 69            self.request_body_data = {}
 70        if self.request_body_json is None:
 71            self.request_body_json = {}
 72        # If both request_body_data and request_body_json are provided, raise an error
 73        if self.request_body_json and self.request_body_data:
 74            raise ValueError(
 75                "RequestOptionsProvider should only contain either 'request_body_data' or 'request_body_json' not both"
 76            )
 77        # set interpolators
 78        self._parameter_interpolator = InterpolatedRequestInputProvider(
 79            config=self.config, request_inputs=self.request_parameters, parameters=parameters
 80        )
 81        self._headers_interpolator = InterpolatedRequestInputProvider(
 82            config=self.config, request_inputs=self.request_headers, parameters=parameters
 83        )
 84        self._body_data_interpolator = InterpolatedRequestInputProvider(
 85            config=self.config, request_inputs=self.request_body_data, parameters=parameters
 86        )
 87        self._body_json_interpolator = InterpolatedNestedRequestInputProvider(
 88            config=self.config, request_inputs=self.request_body_json, parameters=parameters
 89        )
 90
 91    def _resolve_request_body(self) -> None:
 92        """
 93        Resolves the request body configuration by setting either `request_body_data` or `request_body_json`
 94        based on the type specified in `self.request_body`. If neither is provided, both are initialized as empty
 95        dictionaries. Raises a ValueError if both `request_body_data` and `request_body_json` are set simultaneously.
 96        Raises:
 97            ValueError: if an unsupported request body type is provided.
 98        """
 99        # Resolve the request body to either data or json
100        if self.request_body is not None and self.request_body.type is not None:
101            if self.request_body.type == "RequestBodyUrlEncodedForm":
102                self.request_body_data = self.request_body.value
103            elif self.request_body.type == "RequestBodyGraphQL":
104                self.request_body_json = {"query": self.request_body.value.query}
105            elif self.request_body.type in ("RequestBodyJsonObject", "RequestBodyPlainText"):
106                self.request_body_json = self.request_body.value
107            else:
108                raise ValueError(f"Unsupported request body type: {self.request_body.type}")
109
110    def get_request_params(
111        self,
112        *,
113        stream_state: Optional[StreamState] = None,
114        stream_slice: Optional[StreamSlice] = None,
115        next_page_token: Optional[Mapping[str, Any]] = None,
116    ) -> MutableMapping[str, Any]:
117        interpolated_value = self._parameter_interpolator.eval_request_inputs(
118            stream_slice,
119            next_page_token,
120            valid_key_types=(str,),
121            valid_value_types=ValidRequestTypes,
122        )
123        if isinstance(interpolated_value, dict):
124            if self.query_properties_key:
125                if not stream_slice:
126                    raise ValueError(
127                        "stream_slice should not be None if query properties in requests is enabled. Please contact Airbyte Support"
128                    )
129                elif (
130                    "query_properties" not in stream_slice.extra_fields
131                    or stream_slice.extra_fields.get("query_properties") is None
132                ):
133                    raise ValueError(
134                        "QueryProperties component is defined but stream_partition does not contain query_properties. Please contact Airbyte Support"
135                    )
136                elif not isinstance(stream_slice.extra_fields.get("query_properties"), List):
137                    raise ValueError(
138                        "QueryProperties component is defined but stream_slice.extra_fields.query_properties is not a List. Please contact Airbyte Support"
139                    )
140                interpolated_value = {
141                    **interpolated_value,
142                    self.query_properties_key: ",".join(
143                        stream_slice.extra_fields.get("query_properties")  # type: ignore  # Earlier type checks validate query_properties type
144                    ),
145                }
146            return interpolated_value
147        return {}
148
149    def get_request_headers(
150        self,
151        *,
152        stream_state: Optional[StreamState] = None,
153        stream_slice: Optional[StreamSlice] = None,
154        next_page_token: Optional[Mapping[str, Any]] = None,
155    ) -> Mapping[str, Any]:
156        return self._headers_interpolator.eval_request_inputs(stream_slice, next_page_token)
157
158    def get_request_body_data(
159        self,
160        *,
161        stream_state: Optional[StreamState] = None,
162        stream_slice: Optional[StreamSlice] = None,
163        next_page_token: Optional[Mapping[str, Any]] = None,
164    ) -> Union[Mapping[str, Any], str]:
165        return self._body_data_interpolator.eval_request_inputs(
166            stream_slice,
167            next_page_token,
168            valid_key_types=(str,),
169            valid_value_types=ValidRequestTypes,
170        )
171
172    def get_request_body_json(
173        self,
174        *,
175        stream_state: Optional[StreamState] = None,
176        stream_slice: Optional[StreamSlice] = None,
177        next_page_token: Optional[Mapping[str, Any]] = None,
178    ) -> Mapping[str, Any]:
179        return self._body_json_interpolator.eval_request_inputs(stream_slice, next_page_token)

Defines the request options to set on an outgoing HTTP request by evaluating InterpolatedMappings

Attributes:
  • config (Config): The user-provided configuration as specified by the source's spec
  • request_parameters (Union[str, Mapping[str, str]]): The request parameters to set on an outgoing HTTP request
  • request_headers (Union[str, Mapping[str, str]]): The request headers to set on an outgoing HTTP request
  • request_body_data (Union[str, Mapping[str, str]]): The body data to set on an outgoing HTTP request
  • request_body_json (Union[str, Mapping[str, str]]): The json content to set on an outgoing HTTP request
InterpolatedRequestOptionsProvider( parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], config: Mapping[str, Any] = <factory>, request_parameters: Union[str, Mapping[str, str], NoneType] = None, request_headers: Union[str, Mapping[str, str], NoneType] = None, request_body: Union[airbyte_cdk.sources.declarative.models.declarative_component_schema.RequestBodyPlainText, airbyte_cdk.sources.declarative.models.declarative_component_schema.RequestBodyUrlEncodedForm, airbyte_cdk.sources.declarative.models.declarative_component_schema.RequestBodyJsonObject, airbyte_cdk.sources.declarative.models.declarative_component_schema.RequestBodyGraphQL, NoneType] = None, request_body_data: Union[str, Mapping[str, str], NoneType] = None, request_body_json: Union[str, dict[str, Union[dict[str, Union[dict[str, Union[dict[str, ForwardRef('NestedMapping')], list[ForwardRef('NestedMapping')], str, int, float, bool, NoneType]], str, dict[str, Any]]], list[Union[dict[str, Union[dict[str, ForwardRef('NestedMapping')], list[ForwardRef('NestedMapping')], str, int, float, bool, NoneType]], str, dict[str, Any]]], str, int, float, bool, NoneType]], dict[str, Any], NoneType] = None, query_properties_key: Optional[str] = None)
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
config: Mapping[str, Any]
request_parameters: Union[str, Mapping[str, str], NoneType] = None
request_headers: Union[str, Mapping[str, str], NoneType] = None
request_body_data: Union[str, Mapping[str, str], NoneType] = None
request_body_json: Union[str, dict[str, Union[dict[str, Union[dict[str, Union[dict[str, ForwardRef('NestedMapping')], list[ForwardRef('NestedMapping')], str, int, float, bool, NoneType]], str, dict[str, Any]]], list[Union[dict[str, Union[dict[str, ForwardRef('NestedMapping')], list[ForwardRef('NestedMapping')], str, int, float, bool, NoneType]], str, dict[str, Any]]], str, int, float, bool, NoneType]], dict[str, Any], NoneType] = None
query_properties_key: Optional[str] = None
def get_request_params( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> MutableMapping[str, Any]:
110    def get_request_params(
111        self,
112        *,
113        stream_state: Optional[StreamState] = None,
114        stream_slice: Optional[StreamSlice] = None,
115        next_page_token: Optional[Mapping[str, Any]] = None,
116    ) -> MutableMapping[str, Any]:
117        interpolated_value = self._parameter_interpolator.eval_request_inputs(
118            stream_slice,
119            next_page_token,
120            valid_key_types=(str,),
121            valid_value_types=ValidRequestTypes,
122        )
123        if isinstance(interpolated_value, dict):
124            if self.query_properties_key:
125                if not stream_slice:
126                    raise ValueError(
127                        "stream_slice should not be None if query properties in requests is enabled. Please contact Airbyte Support"
128                    )
129                elif (
130                    "query_properties" not in stream_slice.extra_fields
131                    or stream_slice.extra_fields.get("query_properties") is None
132                ):
133                    raise ValueError(
134                        "QueryProperties component is defined but stream_partition does not contain query_properties. Please contact Airbyte Support"
135                    )
136                elif not isinstance(stream_slice.extra_fields.get("query_properties"), List):
137                    raise ValueError(
138                        "QueryProperties component is defined but stream_slice.extra_fields.query_properties is not a List. Please contact Airbyte Support"
139                    )
140                interpolated_value = {
141                    **interpolated_value,
142                    self.query_properties_key: ",".join(
143                        stream_slice.extra_fields.get("query_properties")  # type: ignore  # Earlier type checks validate query_properties type
144                    ),
145                }
146            return interpolated_value
147        return {}

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def get_request_headers( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
149    def get_request_headers(
150        self,
151        *,
152        stream_state: Optional[StreamState] = None,
153        stream_slice: Optional[StreamSlice] = None,
154        next_page_token: Optional[Mapping[str, Any]] = None,
155    ) -> Mapping[str, Any]:
156        return self._headers_interpolator.eval_request_inputs(stream_slice, next_page_token)

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def get_request_body_data( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Union[Mapping[str, Any], str]:
158    def get_request_body_data(
159        self,
160        *,
161        stream_state: Optional[StreamState] = None,
162        stream_slice: Optional[StreamSlice] = None,
163        next_page_token: Optional[Mapping[str, Any]] = None,
164    ) -> Union[Mapping[str, Any], str]:
165        return self._body_data_interpolator.eval_request_inputs(
166            stream_slice,
167            next_page_token,
168            valid_key_types=(str,),
169            valid_value_types=ValidRequestTypes,
170        )

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_request_body_json( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
172    def get_request_body_json(
173        self,
174        *,
175        stream_state: Optional[StreamState] = None,
176        stream_slice: Optional[StreamSlice] = None,
177        next_page_token: Optional[Mapping[str, Any]] = None,
178    ) -> Mapping[str, Any]:
179        return self._body_json_interpolator.eval_request_inputs(stream_slice, next_page_token)

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

@dataclass
class RequestOptionsProvider:
13@dataclass
14class RequestOptionsProvider:
15    """
16    Defines the request options to set on an outgoing HTTP request
17
18    Options can be passed by
19    - request parameter
20    - request headers
21    - body data
22    - json content
23    """
24
25    @abstractmethod
26    def get_request_params(
27        self,
28        *,
29        stream_state: Optional[StreamState] = None,
30        stream_slice: Optional[StreamSlice] = None,
31        next_page_token: Optional[Mapping[str, Any]] = None,
32    ) -> Mapping[str, Any]:
33        """
34        Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
35
36        E.g: you might want to define query parameters for paging if next_page_token is not None.
37        """
38        pass
39
40    @abstractmethod
41    def get_request_headers(
42        self,
43        *,
44        stream_state: Optional[StreamState] = None,
45        stream_slice: Optional[StreamSlice] = None,
46        next_page_token: Optional[Mapping[str, Any]] = None,
47    ) -> Mapping[str, Any]:
48        """Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method."""
49
50    @abstractmethod
51    def get_request_body_data(
52        self,
53        *,
54        stream_state: Optional[StreamState] = None,
55        stream_slice: Optional[StreamSlice] = None,
56        next_page_token: Optional[Mapping[str, Any]] = None,
57    ) -> Union[Mapping[str, Any], str]:
58        """
59        Specifies how to populate the body of the request with a non-JSON payload.
60
61        If returns a ready text that it will be sent as is.
62        If returns a dict that it will be converted to a urlencoded form.
63        E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
64
65        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
66        """
67
68    @abstractmethod
69    def get_request_body_json(
70        self,
71        *,
72        stream_state: Optional[StreamState] = None,
73        stream_slice: Optional[StreamSlice] = None,
74        next_page_token: Optional[Mapping[str, Any]] = None,
75    ) -> Mapping[str, Any]:
76        """
77        Specifies how to populate the body of the request with a JSON payload.
78
79        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
80        """

Defines the request options to set on an outgoing HTTP request

Options can be passed by

  • request parameter
  • request headers
  • body data
  • json content
@abstractmethod
def get_request_params( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
25    @abstractmethod
26    def get_request_params(
27        self,
28        *,
29        stream_state: Optional[StreamState] = None,
30        stream_slice: Optional[StreamSlice] = None,
31        next_page_token: Optional[Mapping[str, Any]] = None,
32    ) -> Mapping[str, Any]:
33        """
34        Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
35
36        E.g: you might want to define query parameters for paging if next_page_token is not None.
37        """
38        pass

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

@abstractmethod
def get_request_headers( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
40    @abstractmethod
41    def get_request_headers(
42        self,
43        *,
44        stream_state: Optional[StreamState] = None,
45        stream_slice: Optional[StreamSlice] = None,
46        next_page_token: Optional[Mapping[str, Any]] = None,
47    ) -> Mapping[str, Any]:
48        """Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method."""

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

@abstractmethod
def get_request_body_data( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Union[Mapping[str, Any], str]:
50    @abstractmethod
51    def get_request_body_data(
52        self,
53        *,
54        stream_state: Optional[StreamState] = None,
55        stream_slice: Optional[StreamSlice] = None,
56        next_page_token: Optional[Mapping[str, Any]] = None,
57    ) -> Union[Mapping[str, Any], str]:
58        """
59        Specifies how to populate the body of the request with a non-JSON payload.
60
61        If returns a ready text that it will be sent as is.
62        If returns a dict that it will be converted to a urlencoded form.
63        E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
64
65        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
66        """

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

@abstractmethod
def get_request_body_json( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
68    @abstractmethod
69    def get_request_body_json(
70        self,
71        *,
72        stream_state: Optional[StreamState] = None,
73        stream_slice: Optional[StreamSlice] = None,
74        next_page_token: Optional[Mapping[str, Any]] = None,
75    ) -> Mapping[str, Any]:
76        """
77        Specifies how to populate the body of the request with a JSON payload.
78
79        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
80        """

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.