airbyte_cdk.sources.declarative.requesters

1#
2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3#
4
5from airbyte_cdk.sources.declarative.requesters.http_requester import HttpRequester
6from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption
7from airbyte_cdk.sources.declarative.requesters.requester import Requester
8
9__all__ = ["HttpRequester", "RequestOption", "Requester"]
@dataclass
class HttpRequester(airbyte_cdk.sources.declarative.requesters.Requester):
 38@dataclass
 39class HttpRequester(Requester):
 40    """
 41    Default implementation of a Requester
 42
 43    Attributes:
 44        name (str): Name of the stream. Only used for request/response caching
 45        url_base (Union[InterpolatedString, str]): Base url to send requests to
 46        path (Union[InterpolatedString, str]): Path to send requests to
 47        http_method (Union[str, HttpMethod]): HTTP method to use when sending requests
 48        request_options_provider (Optional[InterpolatedRequestOptionsProvider]): request option provider defining the options to set on outgoing requests
 49        authenticator (DeclarativeAuthenticator): Authenticator defining how to authenticate to the source
 50        error_handler (Optional[ErrorHandler]): Error handler defining how to detect and handle errors
 51        backoff_strategies (Optional[List[BackoffStrategy]]): List of backoff strategies to use when retrying requests
 52        config (Config): The user-provided configuration as specified by the source's spec
 53        use_cache (bool): Indicates that data should be cached for this stream
 54    """
 55
 56    name: str
 57    url_base: Union[InterpolatedString, str]
 58    config: Config
 59    parameters: InitVar[Mapping[str, Any]]
 60
 61    path: Optional[Union[InterpolatedString, str]] = None
 62    authenticator: Optional[DeclarativeAuthenticator] = None
 63    http_method: Union[str, HttpMethod] = HttpMethod.GET
 64    request_options_provider: Optional[InterpolatedRequestOptionsProvider] = None
 65    error_handler: Optional[ErrorHandler] = None
 66    api_budget: Optional[APIBudget] = None
 67    disable_retries: bool = False
 68    message_repository: MessageRepository = NoopMessageRepository()
 69    use_cache: bool = False
 70    _exit_on_rate_limit: bool = False
 71    stream_response: bool = False
 72    decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={}))
 73
 74    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 75        self._url_base = InterpolatedString.create(self.url_base, parameters=parameters)
 76        self._path = InterpolatedString.create(
 77            self.path if self.path else EmptyString, parameters=parameters
 78        )
 79        if self.request_options_provider is None:
 80            self._request_options_provider = InterpolatedRequestOptionsProvider(
 81                config=self.config, parameters=parameters
 82            )
 83        elif isinstance(self.request_options_provider, dict):
 84            self._request_options_provider = InterpolatedRequestOptionsProvider(
 85                config=self.config, **self.request_options_provider
 86            )
 87        else:
 88            self._request_options_provider = self.request_options_provider
 89        self._authenticator = self.authenticator or NoAuth(parameters=parameters)
 90        self._http_method = (
 91            HttpMethod[self.http_method] if isinstance(self.http_method, str) else self.http_method
 92        )
 93        self.error_handler = self.error_handler
 94        self._parameters = parameters
 95
 96        if self.error_handler is not None and hasattr(self.error_handler, "backoff_strategies"):
 97            backoff_strategies = self.error_handler.backoff_strategies  # type: ignore
 98        else:
 99            backoff_strategies = None
100
101        self._http_client = HttpClient(
102            name=self.name,
103            logger=self.logger,
104            error_handler=self.error_handler,
105            api_budget=self.api_budget,
106            authenticator=self._authenticator,
107            use_cache=self.use_cache,
108            backoff_strategy=backoff_strategies,
109            disable_retries=self.disable_retries,
110            message_repository=self.message_repository,
111        )
112
113    @property
114    def exit_on_rate_limit(self) -> bool:
115        return self._exit_on_rate_limit
116
117    @exit_on_rate_limit.setter
118    def exit_on_rate_limit(self, value: bool) -> None:
119        self._exit_on_rate_limit = value
120
121    def get_authenticator(self) -> DeclarativeAuthenticator:
122        return self._authenticator
123
124    def get_url_base(
125        self,
126        *,
127        stream_state: Optional[StreamState] = None,
128        stream_slice: Optional[StreamSlice] = None,
129        next_page_token: Optional[Mapping[str, Any]] = None,
130    ) -> str:
131        interpolation_context = get_interpolation_context(
132            stream_state=stream_state,
133            stream_slice=stream_slice,
134            next_page_token=next_page_token,
135        )
136        return str(self._url_base.eval(self.config, **interpolation_context))
137
138    def get_path(
139        self,
140        *,
141        stream_state: Optional[StreamState] = None,
142        stream_slice: Optional[StreamSlice] = None,
143        next_page_token: Optional[Mapping[str, Any]] = None,
144    ) -> str:
145        interpolation_context = get_interpolation_context(
146            stream_state=stream_state,
147            stream_slice=stream_slice,
148            next_page_token=next_page_token,
149        )
150        path = str(self._path.eval(self.config, **interpolation_context))
151        return path.lstrip("/")
152
153    def get_method(self) -> HttpMethod:
154        return self._http_method
155
156    def get_request_params(
157        self,
158        *,
159        stream_state: Optional[StreamState] = None,
160        stream_slice: Optional[StreamSlice] = None,
161        next_page_token: Optional[Mapping[str, Any]] = None,
162    ) -> MutableMapping[str, Any]:
163        return self._request_options_provider.get_request_params(
164            stream_state=stream_state,
165            stream_slice=stream_slice,
166            next_page_token=next_page_token,
167        )
168
169    def get_request_headers(
170        self,
171        *,
172        stream_state: Optional[StreamState] = None,
173        stream_slice: Optional[StreamSlice] = None,
174        next_page_token: Optional[Mapping[str, Any]] = None,
175    ) -> Mapping[str, Any]:
176        return self._request_options_provider.get_request_headers(
177            stream_state=stream_state,
178            stream_slice=stream_slice,
179            next_page_token=next_page_token,
180        )
181
182    # fixing request options provider types has a lot of dependencies
183    def get_request_body_data(  # type: ignore
184        self,
185        *,
186        stream_state: Optional[StreamState] = None,
187        stream_slice: Optional[StreamSlice] = None,
188        next_page_token: Optional[Mapping[str, Any]] = None,
189    ) -> Union[Mapping[str, Any], str]:
190        return (
191            self._request_options_provider.get_request_body_data(
192                stream_state=stream_state,
193                stream_slice=stream_slice,
194                next_page_token=next_page_token,
195            )
196            or {}
197        )
198
199    # fixing request options provider types has a lot of dependencies
200    def get_request_body_json(  # type: ignore
201        self,
202        *,
203        stream_state: Optional[StreamState] = None,
204        stream_slice: Optional[StreamSlice] = None,
205        next_page_token: Optional[Mapping[str, Any]] = None,
206    ) -> Optional[Mapping[str, Any]]:
207        return self._request_options_provider.get_request_body_json(
208            stream_state=stream_state,
209            stream_slice=stream_slice,
210            next_page_token=next_page_token,
211        )
212
213    @property
214    def logger(self) -> logging.Logger:
215        return logging.getLogger(f"airbyte.HttpRequester.{self.name}")
216
217    def _get_request_options(
218        self,
219        stream_state: Optional[StreamState],
220        stream_slice: Optional[StreamSlice],
221        next_page_token: Optional[Mapping[str, Any]],
222        requester_method: Callable[..., Optional[Union[Mapping[str, Any], str]]],
223        auth_options_method: Callable[..., Optional[Union[Mapping[str, Any], str]]],
224        extra_options: Optional[Union[Mapping[str, Any], str]] = None,
225    ) -> Union[Mapping[str, Any], str]:
226        """
227        Get the request_option from the requester, the authenticator and extra_options passed in.
228        Raise a ValueError if there's a key collision
229        Returned merged mapping otherwise
230        """
231
232        is_body_json = requester_method.__name__ == "get_request_body_json"
233
234        return combine_mappings(
235            [
236                requester_method(
237                    stream_state=stream_state,
238                    stream_slice=stream_slice,
239                    next_page_token=next_page_token,
240                ),
241                auth_options_method(),
242                extra_options,
243            ],
244            allow_same_value_merge=is_body_json,
245        )
246
247    def _request_headers(
248        self,
249        stream_state: Optional[StreamState] = None,
250        stream_slice: Optional[StreamSlice] = None,
251        next_page_token: Optional[Mapping[str, Any]] = None,
252        extra_headers: Optional[Mapping[str, Any]] = None,
253    ) -> Mapping[str, Any]:
254        """
255        Specifies request headers.
256        Authentication headers will overwrite any overlapping headers returned from this method.
257        """
258        headers = self._get_request_options(
259            stream_state,
260            stream_slice,
261            next_page_token,
262            self.get_request_headers,
263            self.get_authenticator().get_auth_header,
264            extra_headers,
265        )
266        if isinstance(headers, str):
267            raise ValueError("Request headers cannot be a string")
268        return {str(k): str(v) for k, v in headers.items()}
269
270    def _request_params(
271        self,
272        stream_state: Optional[StreamState],
273        stream_slice: Optional[StreamSlice],
274        next_page_token: Optional[Mapping[str, Any]],
275        extra_params: Optional[Mapping[str, Any]] = None,
276    ) -> Mapping[str, Any]:
277        """
278        Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
279
280        E.g: you might want to define query parameters for paging if next_page_token is not None.
281        """
282        options = self._get_request_options(
283            stream_state,
284            stream_slice,
285            next_page_token,
286            self.get_request_params,
287            self.get_authenticator().get_request_params,
288            extra_params,
289        )
290        if isinstance(options, str):
291            raise ValueError("Request params cannot be a string")
292
293        for k, v in options.items():
294            if isinstance(v, (dict,)):
295                raise ValueError(
296                    f"Invalid value for `{k}` parameter. The values of request params cannot be an object."
297                )
298
299        return options
300
301    def _request_body_data(
302        self,
303        stream_state: Optional[StreamState],
304        stream_slice: Optional[StreamSlice],
305        next_page_token: Optional[Mapping[str, Any]],
306        extra_body_data: Optional[Union[Mapping[str, Any], str]] = None,
307    ) -> Optional[Union[Mapping[str, Any], str]]:
308        """
309        Specifies how to populate the body of the request with a non-JSON payload.
310
311        If returns a ready text that it will be sent as is.
312        If returns a dict that it will be converted to a urlencoded form.
313        E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
314
315        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
316        """
317        # Warning: use self.state instead of the stream_state passed as argument!
318        return self._get_request_options(
319            stream_state,
320            stream_slice,
321            next_page_token,
322            self.get_request_body_data,
323            self.get_authenticator().get_request_body_data,
324            extra_body_data,
325        )
326
327    def _request_body_json(
328        self,
329        stream_state: Optional[StreamState],
330        stream_slice: Optional[StreamSlice],
331        next_page_token: Optional[Mapping[str, Any]],
332        extra_body_json: Optional[Mapping[str, Any]] = None,
333    ) -> Optional[Mapping[str, Any]]:
334        """
335        Specifies how to populate the body of the request with a JSON payload.
336
337        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
338        """
339        # Warning: use self.state instead of the stream_state passed as argument!
340        options = self._get_request_options(
341            stream_state,
342            stream_slice,
343            next_page_token,
344            self.get_request_body_json,
345            self.get_authenticator().get_request_body_json,
346            extra_body_json,
347        )
348        if isinstance(options, str):
349            raise ValueError("Request body json cannot be a string")
350        return options
351
352    @classmethod
353    def _join_url(cls, url_base: str, path: str) -> str:
354        """
355        Joins a base URL with a given path and returns the resulting URL with any trailing slash removed.
356
357        This method ensures that there are no duplicate slashes when concatenating the base URL and the path,
358        which is useful when the full URL is provided from an interpolation context.
359
360        Args:
361            url_base (str): The base URL to which the path will be appended.
362            path (str): The path to join with the base URL.
363
364        Returns:
365            str: The resulting joined URL.
366
367        Note:
368            Related issue: https://github.com/airbytehq/airbyte-internal-issues/issues/11869
369            - If the path is an empty string or None, the method returns the base URL with any trailing slash removed.
370
371        Example:
372            1) _join_url("https://example.com/api/", "endpoint") >> 'https://example.com/api/endpoint'
373            2) _join_url("https://example.com/api", "/endpoint") >> 'https://example.com/api/endpoint'
374            3) _join_url("https://example.com/api/", "") >> 'https://example.com/api/'
375            4) _join_url("https://example.com/api", None) >> 'https://example.com/api'
376        """
377
378        # return a full-url if provided directly from interpolation context
379        if path == EmptyString or path is None:
380            return url_base
381        else:
382            # since we didn't provide a full-url, the url_base might not have a trailing slash
383            # so we join the url_base and path correctly
384            if not url_base.endswith("/"):
385                url_base += "/"
386
387        return urljoin(url_base, path)
388
389    def send_request(
390        self,
391        stream_state: Optional[StreamState] = None,
392        stream_slice: Optional[StreamSlice] = None,
393        next_page_token: Optional[Mapping[str, Any]] = None,
394        path: Optional[str] = None,
395        request_headers: Optional[Mapping[str, Any]] = None,
396        request_params: Optional[Mapping[str, Any]] = None,
397        request_body_data: Optional[Union[Mapping[str, Any], str]] = None,
398        request_body_json: Optional[Mapping[str, Any]] = None,
399        log_formatter: Optional[Callable[[requests.Response], Any]] = None,
400    ) -> Optional[requests.Response]:
401        request, response = self._http_client.send_request(
402            http_method=self.get_method().value,
403            url=self._join_url(
404                self.get_url_base(
405                    stream_state=stream_state,
406                    stream_slice=stream_slice,
407                    next_page_token=next_page_token,
408                ),
409                path
410                or self.get_path(
411                    stream_state=stream_state,
412                    stream_slice=stream_slice,
413                    next_page_token=next_page_token,
414                ),
415            ),
416            request_kwargs={"stream": self.stream_response},
417            headers=self._request_headers(
418                stream_state, stream_slice, next_page_token, request_headers
419            ),
420            params=self._request_params(
421                stream_state, stream_slice, next_page_token, request_params
422            ),
423            json=self._request_body_json(
424                stream_state, stream_slice, next_page_token, request_body_json
425            ),
426            data=self._request_body_data(
427                stream_state, stream_slice, next_page_token, request_body_data
428            ),
429            dedupe_query_params=True,
430            log_formatter=log_formatter,
431            exit_on_rate_limit=self._exit_on_rate_limit,
432        )
433
434        return response

Default implementation of a Requester

Attributes:
  • name (str): Name of the stream. Only used for request/response caching
  • url_base (Union[InterpolatedString, str]): Base url to send requests to
  • path (Union[InterpolatedString, str]): Path to send requests to
  • http_method (Union[str, HttpMethod]): HTTP method to use when sending requests
  • request_options_provider (Optional[InterpolatedRequestOptionsProvider]): request option provider defining the options to set on outgoing requests
  • authenticator (DeclarativeAuthenticator): Authenticator defining how to authenticate to the source
  • error_handler (Optional[ErrorHandler]): Error handler defining how to detect and handle errors
  • backoff_strategies (Optional[List[BackoffStrategy]]): List of backoff strategies to use when retrying requests
  • config (Config): The user-provided configuration as specified by the source's spec
  • use_cache (bool): Indicates that data should be cached for this stream
HttpRequester( name: str, url_base: Union[airbyte_cdk.InterpolatedString, str], config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], path: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, authenticator: Optional[airbyte_cdk.DeclarativeAuthenticator] = None, http_method: Union[str, airbyte_cdk.HttpMethod] = <HttpMethod.GET: 'GET'>, request_options_provider: Optional[airbyte_cdk.sources.declarative.requesters.request_options.InterpolatedRequestOptionsProvider] = None, error_handler: Optional[airbyte_cdk.sources.streams.http.error_handlers.ErrorHandler] = None, api_budget: Optional[airbyte_cdk.sources.streams.call_rate.APIBudget] = None, disable_retries: bool = False, message_repository: airbyte_cdk.MessageRepository = <airbyte_cdk.sources.message.NoopMessageRepository object>, use_cache: bool = False, _exit_on_rate_limit: bool = False, stream_response: bool = False, decoder: airbyte_cdk.Decoder = <factory>)
name: str
url_base: Union[airbyte_cdk.InterpolatedString, str]
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
path: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
authenticator: Optional[airbyte_cdk.DeclarativeAuthenticator] = None
http_method: Union[str, airbyte_cdk.HttpMethod] = <HttpMethod.GET: 'GET'>
disable_retries: bool = False
use_cache: bool = False
stream_response: bool = False
exit_on_rate_limit: bool
113    @property
114    def exit_on_rate_limit(self) -> bool:
115        return self._exit_on_rate_limit
def get_authenticator( self) -> airbyte_cdk.DeclarativeAuthenticator:
121    def get_authenticator(self) -> DeclarativeAuthenticator:
122        return self._authenticator

Specifies the authenticator to use when submitting requests

def get_url_base( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> str:
124    def get_url_base(
125        self,
126        *,
127        stream_state: Optional[StreamState] = None,
128        stream_slice: Optional[StreamSlice] = None,
129        next_page_token: Optional[Mapping[str, Any]] = None,
130    ) -> str:
131        interpolation_context = get_interpolation_context(
132            stream_state=stream_state,
133            stream_slice=stream_slice,
134            next_page_token=next_page_token,
135        )
136        return str(self._url_base.eval(self.config, **interpolation_context))
Returns

URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"

def get_path( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> str:
138    def get_path(
139        self,
140        *,
141        stream_state: Optional[StreamState] = None,
142        stream_slice: Optional[StreamSlice] = None,
143        next_page_token: Optional[Mapping[str, Any]] = None,
144    ) -> str:
145        interpolation_context = get_interpolation_context(
146            stream_state=stream_state,
147            stream_slice=stream_slice,
148            next_page_token=next_page_token,
149        )
150        path = str(self._path.eval(self.config, **interpolation_context))
151        return path.lstrip("/")

Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity"

def get_method(self) -> airbyte_cdk.HttpMethod:
153    def get_method(self) -> HttpMethod:
154        return self._http_method

Specifies the HTTP method to use

def get_request_params( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> MutableMapping[str, Any]:
156    def get_request_params(
157        self,
158        *,
159        stream_state: Optional[StreamState] = None,
160        stream_slice: Optional[StreamSlice] = None,
161        next_page_token: Optional[Mapping[str, Any]] = None,
162    ) -> MutableMapping[str, Any]:
163        return self._request_options_provider.get_request_params(
164            stream_state=stream_state,
165            stream_slice=stream_slice,
166            next_page_token=next_page_token,
167        )

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def get_request_headers( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
169    def get_request_headers(
170        self,
171        *,
172        stream_state: Optional[StreamState] = None,
173        stream_slice: Optional[StreamSlice] = None,
174        next_page_token: Optional[Mapping[str, Any]] = None,
175    ) -> Mapping[str, Any]:
176        return self._request_options_provider.get_request_headers(
177            stream_state=stream_state,
178            stream_slice=stream_slice,
179            next_page_token=next_page_token,
180        )

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def get_request_body_data( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Union[Mapping[str, Any], str]:
183    def get_request_body_data(  # type: ignore
184        self,
185        *,
186        stream_state: Optional[StreamState] = None,
187        stream_slice: Optional[StreamSlice] = None,
188        next_page_token: Optional[Mapping[str, Any]] = None,
189    ) -> Union[Mapping[str, Any], str]:
190        return (
191            self._request_options_provider.get_request_body_data(
192                stream_state=stream_state,
193                stream_slice=stream_slice,
194                next_page_token=next_page_token,
195            )
196            or {}
197        )

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_request_body_json( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Optional[Mapping[str, Any]]:
200    def get_request_body_json(  # type: ignore
201        self,
202        *,
203        stream_state: Optional[StreamState] = None,
204        stream_slice: Optional[StreamSlice] = None,
205        next_page_token: Optional[Mapping[str, Any]] = None,
206    ) -> Optional[Mapping[str, Any]]:
207        return self._request_options_provider.get_request_body_json(
208            stream_state=stream_state,
209            stream_slice=stream_slice,
210            next_page_token=next_page_token,
211        )

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

logger: logging.Logger
213    @property
214    def logger(self) -> logging.Logger:
215        return logging.getLogger(f"airbyte.HttpRequester.{self.name}")
def send_request( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, path: Optional[str] = None, request_headers: Optional[Mapping[str, Any]] = None, request_params: Optional[Mapping[str, Any]] = None, request_body_data: Union[str, Mapping[str, Any], NoneType] = None, request_body_json: Optional[Mapping[str, Any]] = None, log_formatter: Optional[Callable[[requests.models.Response], Any]] = None) -> Optional[requests.models.Response]:
389    def send_request(
390        self,
391        stream_state: Optional[StreamState] = None,
392        stream_slice: Optional[StreamSlice] = None,
393        next_page_token: Optional[Mapping[str, Any]] = None,
394        path: Optional[str] = None,
395        request_headers: Optional[Mapping[str, Any]] = None,
396        request_params: Optional[Mapping[str, Any]] = None,
397        request_body_data: Optional[Union[Mapping[str, Any], str]] = None,
398        request_body_json: Optional[Mapping[str, Any]] = None,
399        log_formatter: Optional[Callable[[requests.Response], Any]] = None,
400    ) -> Optional[requests.Response]:
401        request, response = self._http_client.send_request(
402            http_method=self.get_method().value,
403            url=self._join_url(
404                self.get_url_base(
405                    stream_state=stream_state,
406                    stream_slice=stream_slice,
407                    next_page_token=next_page_token,
408                ),
409                path
410                or self.get_path(
411                    stream_state=stream_state,
412                    stream_slice=stream_slice,
413                    next_page_token=next_page_token,
414                ),
415            ),
416            request_kwargs={"stream": self.stream_response},
417            headers=self._request_headers(
418                stream_state, stream_slice, next_page_token, request_headers
419            ),
420            params=self._request_params(
421                stream_state, stream_slice, next_page_token, request_params
422            ),
423            json=self._request_body_json(
424                stream_state, stream_slice, next_page_token, request_body_json
425            ),
426            data=self._request_body_data(
427                stream_state, stream_slice, next_page_token, request_body_data
428            ),
429            dedupe_query_params=True,
430            log_formatter=log_formatter,
431            exit_on_rate_limit=self._exit_on_rate_limit,
432        )
433
434        return response

Sends a request and returns the response. Might return no response if the error handler chooses to ignore the response or throw an exception in case of an error. If path is set, the path configured on the requester itself is ignored. If header, params and body are set, they are merged with the ones configured on the requester itself.

If a log formatter is provided, it's used to log the performed request and response. If it's not provided, no logging is performed.

@dataclass
class RequestOption:
 25@dataclass
 26class RequestOption:
 27    """
 28    Describes an option to set on a request
 29
 30    Attributes:
 31        field_name (str): Describes the name of the parameter to inject. Mutually exclusive with field_path.
 32        field_path (list(str)): Describes the path to a nested field as a list of field names.
 33          Only valid for body_json injection type, and mutually exclusive with field_name.
 34        inject_into (RequestOptionType): Describes where in the HTTP request to inject the parameter
 35    """
 36
 37    inject_into: RequestOptionType
 38    parameters: InitVar[Mapping[str, Any]]
 39    field_name: Optional[Union[InterpolatedString, str]] = None
 40    field_path: Optional[List[Union[InterpolatedString, str]]] = None
 41
 42    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 43        # Validate inputs. We should expect either field_name or field_path, but not both
 44        if self.field_name is None and self.field_path is None:
 45            raise ValueError("RequestOption requires either a field_name or field_path")
 46
 47        if self.field_name is not None and self.field_path is not None:
 48            raise ValueError(
 49                "Only one of field_name or field_path can be provided to RequestOption"
 50            )
 51
 52        # Nested field injection is only supported for body JSON injection
 53        if self.field_path is not None and self.inject_into != RequestOptionType.body_json:
 54            raise ValueError(
 55                "Nested field injection is only supported for body JSON injection. Please use a top-level field_name for other injection types."
 56            )
 57
 58        # Convert field_name and field_path into InterpolatedString objects if they are strings
 59        if self.field_name is not None:
 60            self.field_name = InterpolatedString.create(self.field_name, parameters=parameters)
 61        elif self.field_path is not None:
 62            self.field_path = [
 63                InterpolatedString.create(segment, parameters=parameters)
 64                for segment in self.field_path
 65            ]
 66
 67    @property
 68    def _is_field_path(self) -> bool:
 69        """Returns whether this option is a field path (ie, a nested field)"""
 70        return self.field_path is not None
 71
 72    def inject_into_request(
 73        self,
 74        target: MutableMapping[str, Any],
 75        value: Any,
 76        config: Config,
 77    ) -> None:
 78        """
 79        Inject a request option value into a target request structure using either field_name or field_path.
 80        For non-body-json injection, only top-level field names are supported.
 81        For body-json injection, both field names and nested field paths are supported.
 82
 83        Args:
 84            target: The request structure to inject the value into
 85            value: The value to inject
 86            config: The config object to use for interpolation
 87        """
 88        if self._is_field_path:
 89            if self.inject_into != RequestOptionType.body_json:
 90                raise ValueError(
 91                    "Nested field injection is only supported for body JSON injection. Please use a top-level field_name for other injection types."
 92                )
 93
 94            assert self.field_path is not None  # for type checker
 95            current = target
 96            # Convert path segments into strings, evaluating any interpolated segments
 97            # Example: ["data", "{{ config[user_type] }}", "id"] -> ["data", "admin", "id"]
 98            *path_parts, final_key = [
 99                str(
100                    segment.eval(config=config)
101                    if isinstance(segment, InterpolatedString)
102                    else segment
103                )
104                for segment in self.field_path
105            ]
106
107            # Build a nested dictionary structure and set the final value at the deepest level
108            for part in path_parts:
109                current = current.setdefault(part, {})
110            current[final_key] = value
111        else:
112            # For non-nested fields, evaluate the field name if it's an interpolated string
113            key = (
114                self.field_name.eval(config=config)
115                if isinstance(self.field_name, InterpolatedString)
116                else self.field_name
117            )
118            target[str(key)] = value

Describes an option to set on a request

Attributes:
  • field_name (str): Describes the name of the parameter to inject. Mutually exclusive with field_path.
  • field_path (list(str)): Describes the path to a nested field as a list of field names. Only valid for body_json injection type, and mutually exclusive with field_name.
  • inject_into (RequestOptionType): Describes where in the HTTP request to inject the parameter
RequestOption( inject_into: airbyte_cdk.RequestOptionType, parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], field_name: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, field_path: Optional[List[Union[airbyte_cdk.InterpolatedString, str]]] = None)
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
field_name: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
field_path: Optional[List[Union[airbyte_cdk.InterpolatedString, str]]] = None
def inject_into_request( self, target: MutableMapping[str, Any], value: Any, config: Mapping[str, Any]) -> None:
 72    def inject_into_request(
 73        self,
 74        target: MutableMapping[str, Any],
 75        value: Any,
 76        config: Config,
 77    ) -> None:
 78        """
 79        Inject a request option value into a target request structure using either field_name or field_path.
 80        For non-body-json injection, only top-level field names are supported.
 81        For body-json injection, both field names and nested field paths are supported.
 82
 83        Args:
 84            target: The request structure to inject the value into
 85            value: The value to inject
 86            config: The config object to use for interpolation
 87        """
 88        if self._is_field_path:
 89            if self.inject_into != RequestOptionType.body_json:
 90                raise ValueError(
 91                    "Nested field injection is only supported for body JSON injection. Please use a top-level field_name for other injection types."
 92                )
 93
 94            assert self.field_path is not None  # for type checker
 95            current = target
 96            # Convert path segments into strings, evaluating any interpolated segments
 97            # Example: ["data", "{{ config[user_type] }}", "id"] -> ["data", "admin", "id"]
 98            *path_parts, final_key = [
 99                str(
100                    segment.eval(config=config)
101                    if isinstance(segment, InterpolatedString)
102                    else segment
103                )
104                for segment in self.field_path
105            ]
106
107            # Build a nested dictionary structure and set the final value at the deepest level
108            for part in path_parts:
109                current = current.setdefault(part, {})
110            current[final_key] = value
111        else:
112            # For non-nested fields, evaluate the field name if it's an interpolated string
113            key = (
114                self.field_name.eval(config=config)
115                if isinstance(self.field_name, InterpolatedString)
116                else self.field_name
117            )
118            target[str(key)] = value

Inject a request option value into a target request structure using either field_name or field_path. For non-body-json injection, only top-level field names are supported. For body-json injection, both field names and nested field paths are supported.

Arguments:
  • target: The request structure to inject the value into
  • value: The value to inject
  • config: The config object to use for interpolation
 30class Requester(RequestOptionsProvider):
 31    @abstractmethod
 32    def get_authenticator(self) -> DeclarativeAuthenticator:
 33        """
 34        Specifies the authenticator to use when submitting requests
 35        """
 36        pass
 37
 38    @abstractmethod
 39    def get_url_base(
 40        self,
 41        *,
 42        stream_state: Optional[StreamState],
 43        stream_slice: Optional[StreamSlice],
 44        next_page_token: Optional[Mapping[str, Any]],
 45    ) -> str:
 46        """
 47        :return: URL base for the  API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"
 48        """
 49
 50    @abstractmethod
 51    def get_path(
 52        self,
 53        *,
 54        stream_state: Optional[StreamState],
 55        stream_slice: Optional[StreamSlice],
 56        next_page_token: Optional[Mapping[str, Any]],
 57    ) -> str:
 58        """
 59        Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity"
 60        """
 61
 62    @abstractmethod
 63    def get_method(self) -> HttpMethod:
 64        """
 65        Specifies the HTTP method to use
 66        """
 67
 68    @abstractmethod
 69    def get_request_params(
 70        self,
 71        *,
 72        stream_state: Optional[StreamState] = None,
 73        stream_slice: Optional[StreamSlice] = None,
 74        next_page_token: Optional[Mapping[str, Any]] = None,
 75    ) -> MutableMapping[str, Any]:
 76        """
 77        Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
 78
 79        E.g: you might want to define query parameters for paging if next_page_token is not None.
 80        """
 81
 82    @abstractmethod
 83    def get_request_headers(
 84        self,
 85        *,
 86        stream_state: Optional[StreamState] = None,
 87        stream_slice: Optional[StreamSlice] = None,
 88        next_page_token: Optional[Mapping[str, Any]] = None,
 89    ) -> Mapping[str, Any]:
 90        """
 91        Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
 92        """
 93
 94    @abstractmethod
 95    def get_request_body_data(
 96        self,
 97        *,
 98        stream_state: Optional[StreamState] = None,
 99        stream_slice: Optional[StreamSlice] = None,
100        next_page_token: Optional[Mapping[str, Any]] = None,
101    ) -> Union[Mapping[str, Any], str]:
102        """
103        Specifies how to populate the body of the request with a non-JSON payload.
104
105        If returns a ready text that it will be sent as is.
106        If returns a dict that it will be converted to a urlencoded form.
107        E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
108
109        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
110        """
111
112    @abstractmethod
113    def get_request_body_json(
114        self,
115        *,
116        stream_state: Optional[StreamState] = None,
117        stream_slice: Optional[StreamSlice] = None,
118        next_page_token: Optional[Mapping[str, Any]] = None,
119    ) -> Mapping[str, Any]:
120        """
121        Specifies how to populate the body of the request with a JSON payload.
122
123        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
124        """
125
126    @abstractmethod
127    def send_request(
128        self,
129        stream_state: Optional[StreamState] = None,
130        stream_slice: Optional[StreamSlice] = None,
131        next_page_token: Optional[Mapping[str, Any]] = None,
132        path: Optional[str] = None,
133        request_headers: Optional[Mapping[str, Any]] = None,
134        request_params: Optional[Mapping[str, Any]] = None,
135        request_body_data: Optional[Union[Mapping[str, Any], str]] = None,
136        request_body_json: Optional[Mapping[str, Any]] = None,
137        log_formatter: Optional[Callable[[requests.Response], Any]] = None,
138    ) -> Optional[requests.Response]:
139        """
140        Sends a request and returns the response. Might return no response if the error handler chooses to ignore the response or throw an exception in case of an error.
141        If path is set, the path configured on the requester itself is ignored.
142        If header, params and body are set, they are merged with the ones configured on the requester itself.
143
144        If a log formatter is provided, it's used to log the performed request and response. If it's not provided, no logging is performed.
145        """

Defines the request options to set on an outgoing HTTP request

Options can be passed by

  • request parameter
  • request headers
  • body data
  • json content
@abstractmethod
def get_authenticator( self) -> airbyte_cdk.DeclarativeAuthenticator:
31    @abstractmethod
32    def get_authenticator(self) -> DeclarativeAuthenticator:
33        """
34        Specifies the authenticator to use when submitting requests
35        """
36        pass

Specifies the authenticator to use when submitting requests

@abstractmethod
def get_url_base( self, *, stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[airbyte_cdk.StreamSlice], next_page_token: Optional[Mapping[str, Any]]) -> str:
38    @abstractmethod
39    def get_url_base(
40        self,
41        *,
42        stream_state: Optional[StreamState],
43        stream_slice: Optional[StreamSlice],
44        next_page_token: Optional[Mapping[str, Any]],
45    ) -> str:
46        """
47        :return: URL base for the  API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"
48        """
Returns

URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"

@abstractmethod
def get_path( self, *, stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[airbyte_cdk.StreamSlice], next_page_token: Optional[Mapping[str, Any]]) -> str:
50    @abstractmethod
51    def get_path(
52        self,
53        *,
54        stream_state: Optional[StreamState],
55        stream_slice: Optional[StreamSlice],
56        next_page_token: Optional[Mapping[str, Any]],
57    ) -> str:
58        """
59        Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity"
60        """

Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity"

@abstractmethod
def get_method(self) -> airbyte_cdk.HttpMethod:
62    @abstractmethod
63    def get_method(self) -> HttpMethod:
64        """
65        Specifies the HTTP method to use
66        """

Specifies the HTTP method to use

@abstractmethod
def get_request_params( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> MutableMapping[str, Any]:
68    @abstractmethod
69    def get_request_params(
70        self,
71        *,
72        stream_state: Optional[StreamState] = None,
73        stream_slice: Optional[StreamSlice] = None,
74        next_page_token: Optional[Mapping[str, Any]] = None,
75    ) -> MutableMapping[str, Any]:
76        """
77        Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
78
79        E.g: you might want to define query parameters for paging if next_page_token is not None.
80        """

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

@abstractmethod
def get_request_headers( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
82    @abstractmethod
83    def get_request_headers(
84        self,
85        *,
86        stream_state: Optional[StreamState] = None,
87        stream_slice: Optional[StreamSlice] = None,
88        next_page_token: Optional[Mapping[str, Any]] = None,
89    ) -> Mapping[str, Any]:
90        """
91        Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
92        """

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

@abstractmethod
def get_request_body_data( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Union[Mapping[str, Any], str]:
 94    @abstractmethod
 95    def get_request_body_data(
 96        self,
 97        *,
 98        stream_state: Optional[StreamState] = None,
 99        stream_slice: Optional[StreamSlice] = None,
100        next_page_token: Optional[Mapping[str, Any]] = None,
101    ) -> Union[Mapping[str, Any], str]:
102        """
103        Specifies how to populate the body of the request with a non-JSON payload.
104
105        If returns a ready text that it will be sent as is.
106        If returns a dict that it will be converted to a urlencoded form.
107        E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
108
109        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
110        """

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

@abstractmethod
def get_request_body_json( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
112    @abstractmethod
113    def get_request_body_json(
114        self,
115        *,
116        stream_state: Optional[StreamState] = None,
117        stream_slice: Optional[StreamSlice] = None,
118        next_page_token: Optional[Mapping[str, Any]] = None,
119    ) -> Mapping[str, Any]:
120        """
121        Specifies how to populate the body of the request with a JSON payload.
122
123        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
124        """

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

@abstractmethod
def send_request( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, path: Optional[str] = None, request_headers: Optional[Mapping[str, Any]] = None, request_params: Optional[Mapping[str, Any]] = None, request_body_data: Union[str, Mapping[str, Any], NoneType] = None, request_body_json: Optional[Mapping[str, Any]] = None, log_formatter: Optional[Callable[[requests.models.Response], Any]] = None) -> Optional[requests.models.Response]:
126    @abstractmethod
127    def send_request(
128        self,
129        stream_state: Optional[StreamState] = None,
130        stream_slice: Optional[StreamSlice] = None,
131        next_page_token: Optional[Mapping[str, Any]] = None,
132        path: Optional[str] = None,
133        request_headers: Optional[Mapping[str, Any]] = None,
134        request_params: Optional[Mapping[str, Any]] = None,
135        request_body_data: Optional[Union[Mapping[str, Any], str]] = None,
136        request_body_json: Optional[Mapping[str, Any]] = None,
137        log_formatter: Optional[Callable[[requests.Response], Any]] = None,
138    ) -> Optional[requests.Response]:
139        """
140        Sends a request and returns the response. Might return no response if the error handler chooses to ignore the response or throw an exception in case of an error.
141        If path is set, the path configured on the requester itself is ignored.
142        If header, params and body are set, they are merged with the ones configured on the requester itself.
143
144        If a log formatter is provided, it's used to log the performed request and response. If it's not provided, no logging is performed.
145        """

Sends a request and returns the response. Might return no response if the error handler chooses to ignore the response or throw an exception in case of an error. If path is set, the path configured on the requester itself is ignored. If header, params and body are set, they are merged with the ones configured on the requester itself.

If a log formatter is provided, it's used to log the performed request and response. If it's not provided, no logging is performed.