airbyte_cdk.sources.declarative.requesters

1#
2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3#
4
5from airbyte_cdk.sources.declarative.requesters.http_requester import HttpRequester
6from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption
7from airbyte_cdk.sources.declarative.requesters.requester import Requester
8
9__all__ = ["HttpRequester", "RequestOption", "Requester"]
@dataclass
class HttpRequester(airbyte_cdk.sources.declarative.requesters.Requester):
 37@dataclass
 38class HttpRequester(Requester):
 39    """
 40    Default implementation of a Requester
 41
 42    Attributes:
 43        name (str): Name of the stream. Only used for request/response caching
 44        url_base (Union[InterpolatedString, str]): Base url to send requests to
 45        path (Union[InterpolatedString, str]): Path to send requests to
 46        http_method (Union[str, HttpMethod]): HTTP method to use when sending requests
 47        request_options_provider (Optional[InterpolatedRequestOptionsProvider]): request option provider defining the options to set on outgoing requests
 48        authenticator (DeclarativeAuthenticator): Authenticator defining how to authenticate to the source
 49        error_handler (Optional[ErrorHandler]): Error handler defining how to detect and handle errors
 50        backoff_strategies (Optional[List[BackoffStrategy]]): List of backoff strategies to use when retrying requests
 51        config (Config): The user-provided configuration as specified by the source's spec
 52        use_cache (bool): Indicates that data should be cached for this stream
 53    """
 54
 55    name: str
 56    config: Config
 57    parameters: InitVar[Mapping[str, Any]]
 58
 59    url: Optional[Union[InterpolatedString, str]] = None
 60    url_base: Optional[Union[InterpolatedString, str]] = None
 61    path: Optional[Union[InterpolatedString, str]] = None
 62    authenticator: Optional[DeclarativeAuthenticator] = None
 63    http_method: Union[str, HttpMethod] = HttpMethod.GET
 64    request_options_provider: Optional[InterpolatedRequestOptionsProvider] = None
 65    error_handler: Optional[ErrorHandler] = None
 66    api_budget: Optional[APIBudget] = None
 67    disable_retries: bool = False
 68    message_repository: MessageRepository = NoopMessageRepository()
 69    use_cache: bool = False
 70    _exit_on_rate_limit: bool = False
 71    stream_response: bool = False
 72    decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={}))
 73
 74    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 75        self._url = InterpolatedString.create(
 76            self.url if self.url else EmptyString, parameters=parameters
 77        )
 78        # deprecated
 79        self._url_base = InterpolatedString.create(
 80            self.url_base if self.url_base else EmptyString, parameters=parameters
 81        )
 82        # deprecated
 83        self._path = InterpolatedString.create(
 84            self.path if self.path else EmptyString, parameters=parameters
 85        )
 86        if self.request_options_provider is None:
 87            self._request_options_provider = InterpolatedRequestOptionsProvider(
 88                config=self.config, parameters=parameters
 89            )
 90        elif isinstance(self.request_options_provider, dict):
 91            self._request_options_provider = InterpolatedRequestOptionsProvider(
 92                config=self.config, **self.request_options_provider
 93            )
 94        else:
 95            self._request_options_provider = self.request_options_provider
 96        self._authenticator = self.authenticator or NoAuth(parameters=parameters)
 97        self._http_method = (
 98            HttpMethod[self.http_method] if isinstance(self.http_method, str) else self.http_method
 99        )
100        self.error_handler = self.error_handler
101        self._parameters = parameters
102
103        if self.error_handler is not None and hasattr(self.error_handler, "backoff_strategies"):
104            backoff_strategies = self.error_handler.backoff_strategies  # type: ignore
105        else:
106            backoff_strategies = None
107
108        self._http_client = HttpClient(
109            name=self.name,
110            logger=self.logger,
111            error_handler=self.error_handler,
112            api_budget=self.api_budget,
113            authenticator=self._authenticator,
114            use_cache=self.use_cache,
115            backoff_strategy=backoff_strategies,
116            disable_retries=self.disable_retries,
117            message_repository=self.message_repository,
118        )
119
120    @property
121    def exit_on_rate_limit(self) -> bool:
122        return self._exit_on_rate_limit
123
124    @exit_on_rate_limit.setter
125    def exit_on_rate_limit(self, value: bool) -> None:
126        self._exit_on_rate_limit = value
127
128    def get_authenticator(self) -> DeclarativeAuthenticator:
129        return self._authenticator
130
131    def get_url(
132        self,
133        *,
134        stream_state: Optional[StreamState] = None,
135        stream_slice: Optional[StreamSlice] = None,
136        next_page_token: Optional[Mapping[str, Any]] = None,
137    ) -> str:
138        interpolation_context = get_interpolation_context(
139            stream_state=stream_state,
140            stream_slice=stream_slice,
141            next_page_token=next_page_token,
142        )
143
144        return str(self._url.eval(self.config, **interpolation_context))
145
146    def _get_url(
147        self,
148        *,
149        path: Optional[str] = None,
150        stream_state: Optional[StreamState] = None,
151        stream_slice: Optional[StreamSlice] = None,
152        next_page_token: Optional[Mapping[str, Any]] = None,
153    ) -> str:
154        url = self.get_url(
155            stream_state=stream_state,
156            stream_slice=stream_slice,
157            next_page_token=next_page_token,
158        )
159
160        url_base = self.get_url_base(
161            stream_state=stream_state,
162            stream_slice=stream_slice,
163            next_page_token=next_page_token,
164        )
165
166        path = path or self.get_path(
167            stream_state=stream_state,
168            stream_slice=stream_slice,
169            next_page_token=next_page_token,
170        )
171
172        full_url = self._join_url(url_base, path) if url_base else url + path if path else url
173
174        return full_url
175
176    def get_url_base(
177        self,
178        *,
179        stream_state: Optional[StreamState] = None,
180        stream_slice: Optional[StreamSlice] = None,
181        next_page_token: Optional[Mapping[str, Any]] = None,
182    ) -> str:
183        interpolation_context = get_interpolation_context(
184            stream_state=stream_state,
185            stream_slice=stream_slice,
186            next_page_token=next_page_token,
187        )
188        return str(self._url_base.eval(self.config, **interpolation_context))
189
190    def get_path(
191        self,
192        *,
193        stream_state: Optional[StreamState] = None,
194        stream_slice: Optional[StreamSlice] = None,
195        next_page_token: Optional[Mapping[str, Any]] = None,
196    ) -> str:
197        interpolation_context = get_interpolation_context(
198            stream_state=stream_state,
199            stream_slice=stream_slice,
200            next_page_token=next_page_token,
201        )
202        path = str(self._path.eval(self.config, **interpolation_context))
203        return path.lstrip("/")
204
205    def get_method(self) -> HttpMethod:
206        return self._http_method
207
208    def get_request_params(
209        self,
210        *,
211        stream_state: Optional[StreamState] = None,
212        stream_slice: Optional[StreamSlice] = None,
213        next_page_token: Optional[Mapping[str, Any]] = None,
214    ) -> MutableMapping[str, Any]:
215        return self._request_options_provider.get_request_params(
216            stream_state=stream_state,
217            stream_slice=stream_slice,
218            next_page_token=next_page_token,
219        )
220
221    def get_request_headers(
222        self,
223        *,
224        stream_state: Optional[StreamState] = None,
225        stream_slice: Optional[StreamSlice] = None,
226        next_page_token: Optional[Mapping[str, Any]] = None,
227    ) -> Mapping[str, Any]:
228        return self._request_options_provider.get_request_headers(
229            stream_state=stream_state,
230            stream_slice=stream_slice,
231            next_page_token=next_page_token,
232        )
233
234    # fixing request options provider types has a lot of dependencies
235    def get_request_body_data(  # type: ignore
236        self,
237        *,
238        stream_state: Optional[StreamState] = None,
239        stream_slice: Optional[StreamSlice] = None,
240        next_page_token: Optional[Mapping[str, Any]] = None,
241    ) -> Union[Mapping[str, Any], str]:
242        return (
243            self._request_options_provider.get_request_body_data(
244                stream_state=stream_state,
245                stream_slice=stream_slice,
246                next_page_token=next_page_token,
247            )
248            or {}
249        )
250
251    # fixing request options provider types has a lot of dependencies
252    def get_request_body_json(  # type: ignore
253        self,
254        *,
255        stream_state: Optional[StreamState] = None,
256        stream_slice: Optional[StreamSlice] = None,
257        next_page_token: Optional[Mapping[str, Any]] = None,
258    ) -> Optional[Mapping[str, Any]]:
259        return self._request_options_provider.get_request_body_json(
260            stream_state=stream_state,
261            stream_slice=stream_slice,
262            next_page_token=next_page_token,
263        )
264
265    @property
266    def logger(self) -> logging.Logger:
267        return logging.getLogger(f"airbyte.HttpRequester.{self.name}")
268
269    def _get_request_options(
270        self,
271        stream_state: Optional[StreamState],
272        stream_slice: Optional[StreamSlice],
273        next_page_token: Optional[Mapping[str, Any]],
274        requester_method: Callable[..., Optional[Union[Mapping[str, Any], str]]],
275        auth_options_method: Callable[..., Optional[Union[Mapping[str, Any], str]]],
276        extra_options: Optional[Union[Mapping[str, Any], str]] = None,
277    ) -> Union[Mapping[str, Any], str]:
278        """
279        Get the request_option from the requester, the authenticator and extra_options passed in.
280        Raise a ValueError if there's a key collision
281        Returned merged mapping otherwise
282        """
283
284        is_body_json = requester_method.__name__ == "get_request_body_json"
285
286        return combine_mappings(
287            [
288                requester_method(
289                    stream_state=stream_state,
290                    stream_slice=stream_slice,
291                    next_page_token=next_page_token,
292                ),
293                auth_options_method(),
294                extra_options,
295            ],
296            allow_same_value_merge=is_body_json,
297        )
298
299    def _request_headers(
300        self,
301        stream_state: Optional[StreamState] = None,
302        stream_slice: Optional[StreamSlice] = None,
303        next_page_token: Optional[Mapping[str, Any]] = None,
304        extra_headers: Optional[Mapping[str, Any]] = None,
305    ) -> Mapping[str, Any]:
306        """
307        Specifies request headers.
308        Authentication headers will overwrite any overlapping headers returned from this method.
309        """
310        headers = self._get_request_options(
311            stream_state,
312            stream_slice,
313            next_page_token,
314            self.get_request_headers,
315            self.get_authenticator().get_auth_header,
316            extra_headers,
317        )
318        if isinstance(headers, str):
319            raise ValueError("Request headers cannot be a string")
320        return {str(k): str(v) for k, v in headers.items()}
321
322    def _request_params(
323        self,
324        stream_state: Optional[StreamState],
325        stream_slice: Optional[StreamSlice],
326        next_page_token: Optional[Mapping[str, Any]],
327        extra_params: Optional[Mapping[str, Any]] = None,
328    ) -> Mapping[str, Any]:
329        """
330        Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
331
332        E.g: you might want to define query parameters for paging if next_page_token is not None.
333        """
334        options = self._get_request_options(
335            stream_state,
336            stream_slice,
337            next_page_token,
338            self.get_request_params,
339            self.get_authenticator().get_request_params,
340            extra_params,
341        )
342        if isinstance(options, str):
343            raise ValueError("Request params cannot be a string")
344
345        for k, v in options.items():
346            if isinstance(v, (dict,)):
347                raise ValueError(
348                    f"Invalid value for `{k}` parameter. The values of request params cannot be an object."
349                )
350
351        return options
352
353    def _request_body_data(
354        self,
355        stream_state: Optional[StreamState],
356        stream_slice: Optional[StreamSlice],
357        next_page_token: Optional[Mapping[str, Any]],
358        extra_body_data: Optional[Union[Mapping[str, Any], str]] = None,
359    ) -> Optional[Union[Mapping[str, Any], str]]:
360        """
361        Specifies how to populate the body of the request with a non-JSON payload.
362
363        If returns a ready text that it will be sent as is.
364        If returns a dict that it will be converted to a urlencoded form.
365        E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
366
367        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
368        """
369        # Warning: use self.state instead of the stream_state passed as argument!
370        return self._get_request_options(
371            stream_state,
372            stream_slice,
373            next_page_token,
374            self.get_request_body_data,
375            self.get_authenticator().get_request_body_data,
376            extra_body_data,
377        )
378
379    def _request_body_json(
380        self,
381        stream_state: Optional[StreamState],
382        stream_slice: Optional[StreamSlice],
383        next_page_token: Optional[Mapping[str, Any]],
384        extra_body_json: Optional[Mapping[str, Any]] = None,
385    ) -> Optional[Mapping[str, Any]]:
386        """
387        Specifies how to populate the body of the request with a JSON payload.
388
389        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
390        """
391        # Warning: use self.state instead of the stream_state passed as argument!
392        options = self._get_request_options(
393            stream_state,
394            stream_slice,
395            next_page_token,
396            self.get_request_body_json,
397            self.get_authenticator().get_request_body_json,
398            extra_body_json,
399        )
400        if isinstance(options, str):
401            raise ValueError("Request body json cannot be a string")
402        return options
403
404    @classmethod
405    def _join_url(cls, url_base: str, path: Optional[str] = None) -> str:
406        """
407        Joins a base URL with a given path and returns the resulting URL with any trailing slash removed.
408
409        This method ensures that there are no duplicate slashes when concatenating the base URL and the path,
410        which is useful when the full URL is provided from an interpolation context.
411
412        Args:
413            url_base (str): The base URL to which the path will be appended.
414            path (Optional[str]): The path to join with the base URL.
415
416        Returns:
417            str: The resulting joined URL.
418
419        Note:
420            Related issue: https://github.com/airbytehq/airbyte-internal-issues/issues/11869
421            - If the path is an empty string or None, the method returns the base URL with any trailing slash removed.
422
423        Example:
424            1) _join_url("https://example.com/api/", "endpoint") >> 'https://example.com/api/endpoint'
425            2) _join_url("https://example.com/api", "/endpoint") >> 'https://example.com/api/endpoint'
426            3) _join_url("https://example.com/api/", "") >> 'https://example.com/api/'
427            4) _join_url("https://example.com/api", None) >> 'https://example.com/api'
428        """
429
430        # return a full-url if provided directly from interpolation context
431        if path == EmptyString or path is None:
432            return url_base
433        else:
434            # since we didn't provide a full-url, the url_base might not have a trailing slash
435            # so we join the url_base and path correctly
436            if not url_base.endswith("/"):
437                url_base += "/"
438
439        return urljoin(url_base, path)
440
441    def send_request(
442        self,
443        stream_state: Optional[StreamState] = None,
444        stream_slice: Optional[StreamSlice] = None,
445        next_page_token: Optional[Mapping[str, Any]] = None,
446        path: Optional[str] = None,
447        request_headers: Optional[Mapping[str, Any]] = None,
448        request_params: Optional[Mapping[str, Any]] = None,
449        request_body_data: Optional[Union[Mapping[str, Any], str]] = None,
450        request_body_json: Optional[Mapping[str, Any]] = None,
451        log_formatter: Optional[Callable[[requests.Response], Any]] = None,
452    ) -> Optional[requests.Response]:
453        request, response = self._http_client.send_request(
454            http_method=self.get_method().value,
455            url=self._get_url(
456                path=path,
457                stream_state=stream_state,
458                stream_slice=stream_slice,
459                next_page_token=next_page_token,
460            ),
461            request_kwargs={"stream": self.stream_response},
462            headers=self._request_headers(
463                stream_state, stream_slice, next_page_token, request_headers
464            ),
465            params=self._request_params(
466                stream_state, stream_slice, next_page_token, request_params
467            ),
468            json=self._request_body_json(
469                stream_state, stream_slice, next_page_token, request_body_json
470            ),
471            data=self._request_body_data(
472                stream_state, stream_slice, next_page_token, request_body_data
473            ),
474            dedupe_query_params=True,
475            log_formatter=log_formatter,
476            exit_on_rate_limit=self._exit_on_rate_limit,
477        )
478
479        return response

Default implementation of a Requester

Attributes:
  • name (str): Name of the stream. Only used for request/response caching
  • url_base (Union[InterpolatedString, str]): Base url to send requests to
  • path (Union[InterpolatedString, str]): Path to send requests to
  • http_method (Union[str, HttpMethod]): HTTP method to use when sending requests
  • request_options_provider (Optional[InterpolatedRequestOptionsProvider]): request option provider defining the options to set on outgoing requests
  • authenticator (DeclarativeAuthenticator): Authenticator defining how to authenticate to the source
  • error_handler (Optional[ErrorHandler]): Error handler defining how to detect and handle errors
  • backoff_strategies (Optional[List[BackoffStrategy]]): List of backoff strategies to use when retrying requests
  • config (Config): The user-provided configuration as specified by the source's spec
  • use_cache (bool): Indicates that data should be cached for this stream
HttpRequester( name: str, config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], url: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, url_base: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, path: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, authenticator: Optional[airbyte_cdk.DeclarativeAuthenticator] = None, http_method: Union[str, airbyte_cdk.HttpMethod] = <HttpMethod.GET: 'GET'>, request_options_provider: Optional[airbyte_cdk.sources.declarative.requesters.request_options.InterpolatedRequestOptionsProvider] = None, error_handler: Optional[airbyte_cdk.sources.streams.http.error_handlers.ErrorHandler] = None, api_budget: Optional[airbyte_cdk.sources.streams.call_rate.APIBudget] = None, disable_retries: bool = False, message_repository: airbyte_cdk.MessageRepository = <airbyte_cdk.sources.message.NoopMessageRepository object>, use_cache: bool = False, _exit_on_rate_limit: bool = False, stream_response: bool = False, decoder: airbyte_cdk.Decoder = <factory>)
name: str
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
url: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
url_base: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
path: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
authenticator: Optional[airbyte_cdk.DeclarativeAuthenticator] = None
http_method: Union[str, airbyte_cdk.HttpMethod] = <HttpMethod.GET: 'GET'>
disable_retries: bool = False
use_cache: bool = False
stream_response: bool = False
exit_on_rate_limit: bool
120    @property
121    def exit_on_rate_limit(self) -> bool:
122        return self._exit_on_rate_limit
def get_authenticator( self) -> airbyte_cdk.DeclarativeAuthenticator:
128    def get_authenticator(self) -> DeclarativeAuthenticator:
129        return self._authenticator

Specifies the authenticator to use when submitting requests

def get_url( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> str:
131    def get_url(
132        self,
133        *,
134        stream_state: Optional[StreamState] = None,
135        stream_slice: Optional[StreamSlice] = None,
136        next_page_token: Optional[Mapping[str, Any]] = None,
137    ) -> str:
138        interpolation_context = get_interpolation_context(
139            stream_state=stream_state,
140            stream_slice=stream_slice,
141            next_page_token=next_page_token,
142        )
143
144        return str(self._url.eval(self.config, **interpolation_context))
Returns

URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"

def get_url_base( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> str:
176    def get_url_base(
177        self,
178        *,
179        stream_state: Optional[StreamState] = None,
180        stream_slice: Optional[StreamSlice] = None,
181        next_page_token: Optional[Mapping[str, Any]] = None,
182    ) -> str:
183        interpolation_context = get_interpolation_context(
184            stream_state=stream_state,
185            stream_slice=stream_slice,
186            next_page_token=next_page_token,
187        )
188        return str(self._url_base.eval(self.config, **interpolation_context))
Returns

URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"

def get_path( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> str:
190    def get_path(
191        self,
192        *,
193        stream_state: Optional[StreamState] = None,
194        stream_slice: Optional[StreamSlice] = None,
195        next_page_token: Optional[Mapping[str, Any]] = None,
196    ) -> str:
197        interpolation_context = get_interpolation_context(
198            stream_state=stream_state,
199            stream_slice=stream_slice,
200            next_page_token=next_page_token,
201        )
202        path = str(self._path.eval(self.config, **interpolation_context))
203        return path.lstrip("/")

Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity"

def get_method(self) -> airbyte_cdk.HttpMethod:
205    def get_method(self) -> HttpMethod:
206        return self._http_method

Specifies the HTTP method to use

def get_request_params( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> MutableMapping[str, Any]:
208    def get_request_params(
209        self,
210        *,
211        stream_state: Optional[StreamState] = None,
212        stream_slice: Optional[StreamSlice] = None,
213        next_page_token: Optional[Mapping[str, Any]] = None,
214    ) -> MutableMapping[str, Any]:
215        return self._request_options_provider.get_request_params(
216            stream_state=stream_state,
217            stream_slice=stream_slice,
218            next_page_token=next_page_token,
219        )

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def get_request_headers( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
221    def get_request_headers(
222        self,
223        *,
224        stream_state: Optional[StreamState] = None,
225        stream_slice: Optional[StreamSlice] = None,
226        next_page_token: Optional[Mapping[str, Any]] = None,
227    ) -> Mapping[str, Any]:
228        return self._request_options_provider.get_request_headers(
229            stream_state=stream_state,
230            stream_slice=stream_slice,
231            next_page_token=next_page_token,
232        )

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def get_request_body_data( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Union[Mapping[str, Any], str]:
235    def get_request_body_data(  # type: ignore
236        self,
237        *,
238        stream_state: Optional[StreamState] = None,
239        stream_slice: Optional[StreamSlice] = None,
240        next_page_token: Optional[Mapping[str, Any]] = None,
241    ) -> Union[Mapping[str, Any], str]:
242        return (
243            self._request_options_provider.get_request_body_data(
244                stream_state=stream_state,
245                stream_slice=stream_slice,
246                next_page_token=next_page_token,
247            )
248            or {}
249        )

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_request_body_json( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Optional[Mapping[str, Any]]:
252    def get_request_body_json(  # type: ignore
253        self,
254        *,
255        stream_state: Optional[StreamState] = None,
256        stream_slice: Optional[StreamSlice] = None,
257        next_page_token: Optional[Mapping[str, Any]] = None,
258    ) -> Optional[Mapping[str, Any]]:
259        return self._request_options_provider.get_request_body_json(
260            stream_state=stream_state,
261            stream_slice=stream_slice,
262            next_page_token=next_page_token,
263        )

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

logger: logging.Logger
265    @property
266    def logger(self) -> logging.Logger:
267        return logging.getLogger(f"airbyte.HttpRequester.{self.name}")
def send_request( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, path: Optional[str] = None, request_headers: Optional[Mapping[str, Any]] = None, request_params: Optional[Mapping[str, Any]] = None, request_body_data: Union[Mapping[str, Any], str, NoneType] = None, request_body_json: Optional[Mapping[str, Any]] = None, log_formatter: Optional[Callable[[requests.models.Response], Any]] = None) -> Optional[requests.models.Response]:
441    def send_request(
442        self,
443        stream_state: Optional[StreamState] = None,
444        stream_slice: Optional[StreamSlice] = None,
445        next_page_token: Optional[Mapping[str, Any]] = None,
446        path: Optional[str] = None,
447        request_headers: Optional[Mapping[str, Any]] = None,
448        request_params: Optional[Mapping[str, Any]] = None,
449        request_body_data: Optional[Union[Mapping[str, Any], str]] = None,
450        request_body_json: Optional[Mapping[str, Any]] = None,
451        log_formatter: Optional[Callable[[requests.Response], Any]] = None,
452    ) -> Optional[requests.Response]:
453        request, response = self._http_client.send_request(
454            http_method=self.get_method().value,
455            url=self._get_url(
456                path=path,
457                stream_state=stream_state,
458                stream_slice=stream_slice,
459                next_page_token=next_page_token,
460            ),
461            request_kwargs={"stream": self.stream_response},
462            headers=self._request_headers(
463                stream_state, stream_slice, next_page_token, request_headers
464            ),
465            params=self._request_params(
466                stream_state, stream_slice, next_page_token, request_params
467            ),
468            json=self._request_body_json(
469                stream_state, stream_slice, next_page_token, request_body_json
470            ),
471            data=self._request_body_data(
472                stream_state, stream_slice, next_page_token, request_body_data
473            ),
474            dedupe_query_params=True,
475            log_formatter=log_formatter,
476            exit_on_rate_limit=self._exit_on_rate_limit,
477        )
478
479        return response

Sends a request and returns the response. Might return no response if the error handler chooses to ignore the response or throw an exception in case of an error. If path is set, the path configured on the requester itself is ignored. If header, params and body are set, they are merged with the ones configured on the requester itself.

If a log formatter is provided, it's used to log the performed request and response. If it's not provided, no logging is performed.

@dataclass
class RequestOption:
 25@dataclass
 26class RequestOption:
 27    """
 28    Describes an option to set on a request
 29
 30    Attributes:
 31        field_name (str): Describes the name of the parameter to inject. Mutually exclusive with field_path.
 32        field_path (list(str)): Describes the path to a nested field as a list of field names.
 33          Only valid for body_json injection type, and mutually exclusive with field_name.
 34        inject_into (RequestOptionType): Describes where in the HTTP request to inject the parameter
 35    """
 36
 37    inject_into: RequestOptionType
 38    parameters: InitVar[Mapping[str, Any]]
 39    field_name: Optional[Union[InterpolatedString, str]] = None
 40    field_path: Optional[List[Union[InterpolatedString, str]]] = None
 41
 42    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 43        # Validate inputs. We should expect either field_name or field_path, but not both
 44        if self.field_name is None and self.field_path is None:
 45            raise ValueError("RequestOption requires either a field_name or field_path")
 46
 47        if self.field_name is not None and self.field_path is not None:
 48            raise ValueError(
 49                "Only one of field_name or field_path can be provided to RequestOption"
 50            )
 51
 52        # Nested field injection is only supported for body JSON injection
 53        if self.field_path is not None and self.inject_into != RequestOptionType.body_json:
 54            raise ValueError(
 55                "Nested field injection is only supported for body JSON injection. Please use a top-level field_name for other injection types."
 56            )
 57
 58        # Convert field_name and field_path into InterpolatedString objects if they are strings
 59        if self.field_name is not None:
 60            self.field_name = InterpolatedString.create(self.field_name, parameters=parameters)
 61        elif self.field_path is not None:
 62            self.field_path = [
 63                InterpolatedString.create(segment, parameters=parameters)
 64                for segment in self.field_path
 65            ]
 66
 67    @property
 68    def _is_field_path(self) -> bool:
 69        """Returns whether this option is a field path (ie, a nested field)"""
 70        return self.field_path is not None
 71
 72    def inject_into_request(
 73        self,
 74        target: MutableMapping[str, Any],
 75        value: Any,
 76        config: Config,
 77    ) -> None:
 78        """
 79        Inject a request option value into a target request structure using either field_name or field_path.
 80        For non-body-json injection, only top-level field names are supported.
 81        For body-json injection, both field names and nested field paths are supported.
 82
 83        Args:
 84            target: The request structure to inject the value into
 85            value: The value to inject
 86            config: The config object to use for interpolation
 87        """
 88        if self._is_field_path:
 89            if self.inject_into != RequestOptionType.body_json:
 90                raise ValueError(
 91                    "Nested field injection is only supported for body JSON injection. Please use a top-level field_name for other injection types."
 92                )
 93
 94            assert self.field_path is not None  # for type checker
 95            current = target
 96            # Convert path segments into strings, evaluating any interpolated segments
 97            # Example: ["data", "{{ config[user_type] }}", "id"] -> ["data", "admin", "id"]
 98            *path_parts, final_key = [
 99                str(
100                    segment.eval(config=config)
101                    if isinstance(segment, InterpolatedString)
102                    else segment
103                )
104                for segment in self.field_path
105            ]
106
107            # Build a nested dictionary structure and set the final value at the deepest level
108            for part in path_parts:
109                current = current.setdefault(part, {})
110            current[final_key] = value
111        else:
112            # For non-nested fields, evaluate the field name if it's an interpolated string
113            key = (
114                self.field_name.eval(config=config)
115                if isinstance(self.field_name, InterpolatedString)
116                else self.field_name
117            )
118            target[str(key)] = value

Describes an option to set on a request

Attributes:
  • field_name (str): Describes the name of the parameter to inject. Mutually exclusive with field_path.
  • field_path (list(str)): Describes the path to a nested field as a list of field names. Only valid for body_json injection type, and mutually exclusive with field_name.
  • inject_into (RequestOptionType): Describes where in the HTTP request to inject the parameter
RequestOption( inject_into: airbyte_cdk.RequestOptionType, parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], field_name: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, field_path: Optional[List[Union[airbyte_cdk.InterpolatedString, str]]] = None)
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
field_name: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
field_path: Optional[List[Union[airbyte_cdk.InterpolatedString, str]]] = None
def inject_into_request( self, target: MutableMapping[str, Any], value: Any, config: Mapping[str, Any]) -> None:
 72    def inject_into_request(
 73        self,
 74        target: MutableMapping[str, Any],
 75        value: Any,
 76        config: Config,
 77    ) -> None:
 78        """
 79        Inject a request option value into a target request structure using either field_name or field_path.
 80        For non-body-json injection, only top-level field names are supported.
 81        For body-json injection, both field names and nested field paths are supported.
 82
 83        Args:
 84            target: The request structure to inject the value into
 85            value: The value to inject
 86            config: The config object to use for interpolation
 87        """
 88        if self._is_field_path:
 89            if self.inject_into != RequestOptionType.body_json:
 90                raise ValueError(
 91                    "Nested field injection is only supported for body JSON injection. Please use a top-level field_name for other injection types."
 92                )
 93
 94            assert self.field_path is not None  # for type checker
 95            current = target
 96            # Convert path segments into strings, evaluating any interpolated segments
 97            # Example: ["data", "{{ config[user_type] }}", "id"] -> ["data", "admin", "id"]
 98            *path_parts, final_key = [
 99                str(
100                    segment.eval(config=config)
101                    if isinstance(segment, InterpolatedString)
102                    else segment
103                )
104                for segment in self.field_path
105            ]
106
107            # Build a nested dictionary structure and set the final value at the deepest level
108            for part in path_parts:
109                current = current.setdefault(part, {})
110            current[final_key] = value
111        else:
112            # For non-nested fields, evaluate the field name if it's an interpolated string
113            key = (
114                self.field_name.eval(config=config)
115                if isinstance(self.field_name, InterpolatedString)
116                else self.field_name
117            )
118            target[str(key)] = value

Inject a request option value into a target request structure using either field_name or field_path. For non-body-json injection, only top-level field names are supported. For body-json injection, both field names and nested field paths are supported.

Arguments:
  • target: The request structure to inject the value into
  • value: The value to inject
  • config: The config object to use for interpolation
 30class Requester(RequestOptionsProvider):
 31    @abstractmethod
 32    def get_authenticator(self) -> DeclarativeAuthenticator:
 33        """
 34        Specifies the authenticator to use when submitting requests
 35        """
 36        pass
 37
 38    @abstractmethod
 39    def get_url(
 40        self,
 41        *,
 42        stream_state: Optional[StreamState],
 43        stream_slice: Optional[StreamSlice],
 44        next_page_token: Optional[Mapping[str, Any]],
 45    ) -> str:
 46        """
 47        :return: URL base for the  API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"
 48        """
 49
 50    @abstractmethod
 51    def get_url_base(
 52        self,
 53        *,
 54        stream_state: Optional[StreamState],
 55        stream_slice: Optional[StreamSlice],
 56        next_page_token: Optional[Mapping[str, Any]],
 57    ) -> str:
 58        """
 59        :return: URL base for the  API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"
 60        """
 61
 62    @abstractmethod
 63    def get_path(
 64        self,
 65        *,
 66        stream_state: Optional[StreamState],
 67        stream_slice: Optional[StreamSlice],
 68        next_page_token: Optional[Mapping[str, Any]],
 69    ) -> str:
 70        """
 71        Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity"
 72        """
 73
 74    @abstractmethod
 75    def get_method(self) -> HttpMethod:
 76        """
 77        Specifies the HTTP method to use
 78        """
 79
 80    @abstractmethod
 81    def get_request_params(
 82        self,
 83        *,
 84        stream_state: Optional[StreamState] = None,
 85        stream_slice: Optional[StreamSlice] = None,
 86        next_page_token: Optional[Mapping[str, Any]] = None,
 87    ) -> MutableMapping[str, Any]:
 88        """
 89        Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
 90
 91        E.g: you might want to define query parameters for paging if next_page_token is not None.
 92        """
 93
 94    @abstractmethod
 95    def get_request_headers(
 96        self,
 97        *,
 98        stream_state: Optional[StreamState] = None,
 99        stream_slice: Optional[StreamSlice] = None,
100        next_page_token: Optional[Mapping[str, Any]] = None,
101    ) -> Mapping[str, Any]:
102        """
103        Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
104        """
105
106    @abstractmethod
107    def get_request_body_data(
108        self,
109        *,
110        stream_state: Optional[StreamState] = None,
111        stream_slice: Optional[StreamSlice] = None,
112        next_page_token: Optional[Mapping[str, Any]] = None,
113    ) -> Union[Mapping[str, Any], str]:
114        """
115        Specifies how to populate the body of the request with a non-JSON payload.
116
117        If returns a ready text that it will be sent as is.
118        If returns a dict that it will be converted to a urlencoded form.
119        E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
120
121        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
122        """
123
124    @abstractmethod
125    def get_request_body_json(
126        self,
127        *,
128        stream_state: Optional[StreamState] = None,
129        stream_slice: Optional[StreamSlice] = None,
130        next_page_token: Optional[Mapping[str, Any]] = None,
131    ) -> Mapping[str, Any]:
132        """
133        Specifies how to populate the body of the request with a JSON payload.
134
135        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
136        """
137
138    @abstractmethod
139    def send_request(
140        self,
141        stream_state: Optional[StreamState] = None,
142        stream_slice: Optional[StreamSlice] = None,
143        next_page_token: Optional[Mapping[str, Any]] = None,
144        path: Optional[str] = None,
145        request_headers: Optional[Mapping[str, Any]] = None,
146        request_params: Optional[Mapping[str, Any]] = None,
147        request_body_data: Optional[Union[Mapping[str, Any], str]] = None,
148        request_body_json: Optional[Mapping[str, Any]] = None,
149        log_formatter: Optional[Callable[[requests.Response], Any]] = None,
150    ) -> Optional[requests.Response]:
151        """
152        Sends a request and returns the response. Might return no response if the error handler chooses to ignore the response or throw an exception in case of an error.
153        If path is set, the path configured on the requester itself is ignored.
154        If header, params and body are set, they are merged with the ones configured on the requester itself.
155
156        If a log formatter is provided, it's used to log the performed request and response. If it's not provided, no logging is performed.
157        """

Defines the request options to set on an outgoing HTTP request

Options can be passed by

  • request parameter
  • request headers
  • body data
  • json content
@abstractmethod
def get_authenticator( self) -> airbyte_cdk.DeclarativeAuthenticator:
31    @abstractmethod
32    def get_authenticator(self) -> DeclarativeAuthenticator:
33        """
34        Specifies the authenticator to use when submitting requests
35        """
36        pass

Specifies the authenticator to use when submitting requests

@abstractmethod
def get_url( self, *, stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[airbyte_cdk.StreamSlice], next_page_token: Optional[Mapping[str, Any]]) -> str:
38    @abstractmethod
39    def get_url(
40        self,
41        *,
42        stream_state: Optional[StreamState],
43        stream_slice: Optional[StreamSlice],
44        next_page_token: Optional[Mapping[str, Any]],
45    ) -> str:
46        """
47        :return: URL base for the  API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"
48        """
Returns

URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"

@abstractmethod
def get_url_base( self, *, stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[airbyte_cdk.StreamSlice], next_page_token: Optional[Mapping[str, Any]]) -> str:
50    @abstractmethod
51    def get_url_base(
52        self,
53        *,
54        stream_state: Optional[StreamState],
55        stream_slice: Optional[StreamSlice],
56        next_page_token: Optional[Mapping[str, Any]],
57    ) -> str:
58        """
59        :return: URL base for the  API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"
60        """
Returns

URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"

@abstractmethod
def get_path( self, *, stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[airbyte_cdk.StreamSlice], next_page_token: Optional[Mapping[str, Any]]) -> str:
62    @abstractmethod
63    def get_path(
64        self,
65        *,
66        stream_state: Optional[StreamState],
67        stream_slice: Optional[StreamSlice],
68        next_page_token: Optional[Mapping[str, Any]],
69    ) -> str:
70        """
71        Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity"
72        """

Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity"

@abstractmethod
def get_method(self) -> airbyte_cdk.HttpMethod:
74    @abstractmethod
75    def get_method(self) -> HttpMethod:
76        """
77        Specifies the HTTP method to use
78        """

Specifies the HTTP method to use

@abstractmethod
def get_request_params( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> MutableMapping[str, Any]:
80    @abstractmethod
81    def get_request_params(
82        self,
83        *,
84        stream_state: Optional[StreamState] = None,
85        stream_slice: Optional[StreamSlice] = None,
86        next_page_token: Optional[Mapping[str, Any]] = None,
87    ) -> MutableMapping[str, Any]:
88        """
89        Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
90
91        E.g: you might want to define query parameters for paging if next_page_token is not None.
92        """

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

@abstractmethod
def get_request_headers( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
 94    @abstractmethod
 95    def get_request_headers(
 96        self,
 97        *,
 98        stream_state: Optional[StreamState] = None,
 99        stream_slice: Optional[StreamSlice] = None,
100        next_page_token: Optional[Mapping[str, Any]] = None,
101    ) -> Mapping[str, Any]:
102        """
103        Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
104        """

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

@abstractmethod
def get_request_body_data( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Union[Mapping[str, Any], str]:
106    @abstractmethod
107    def get_request_body_data(
108        self,
109        *,
110        stream_state: Optional[StreamState] = None,
111        stream_slice: Optional[StreamSlice] = None,
112        next_page_token: Optional[Mapping[str, Any]] = None,
113    ) -> Union[Mapping[str, Any], str]:
114        """
115        Specifies how to populate the body of the request with a non-JSON payload.
116
117        If returns a ready text that it will be sent as is.
118        If returns a dict that it will be converted to a urlencoded form.
119        E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
120
121        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
122        """

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

@abstractmethod
def get_request_body_json( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
124    @abstractmethod
125    def get_request_body_json(
126        self,
127        *,
128        stream_state: Optional[StreamState] = None,
129        stream_slice: Optional[StreamSlice] = None,
130        next_page_token: Optional[Mapping[str, Any]] = None,
131    ) -> Mapping[str, Any]:
132        """
133        Specifies how to populate the body of the request with a JSON payload.
134
135        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
136        """

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

@abstractmethod
def send_request( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, path: Optional[str] = None, request_headers: Optional[Mapping[str, Any]] = None, request_params: Optional[Mapping[str, Any]] = None, request_body_data: Union[Mapping[str, Any], str, NoneType] = None, request_body_json: Optional[Mapping[str, Any]] = None, log_formatter: Optional[Callable[[requests.models.Response], Any]] = None) -> Optional[requests.models.Response]:
138    @abstractmethod
139    def send_request(
140        self,
141        stream_state: Optional[StreamState] = None,
142        stream_slice: Optional[StreamSlice] = None,
143        next_page_token: Optional[Mapping[str, Any]] = None,
144        path: Optional[str] = None,
145        request_headers: Optional[Mapping[str, Any]] = None,
146        request_params: Optional[Mapping[str, Any]] = None,
147        request_body_data: Optional[Union[Mapping[str, Any], str]] = None,
148        request_body_json: Optional[Mapping[str, Any]] = None,
149        log_formatter: Optional[Callable[[requests.Response], Any]] = None,
150    ) -> Optional[requests.Response]:
151        """
152        Sends a request and returns the response. Might return no response if the error handler chooses to ignore the response or throw an exception in case of an error.
153        If path is set, the path configured on the requester itself is ignored.
154        If header, params and body are set, they are merged with the ones configured on the requester itself.
155
156        If a log formatter is provided, it's used to log the performed request and response. If it's not provided, no logging is performed.
157        """

Sends a request and returns the response. Might return no response if the error handler chooses to ignore the response or throw an exception in case of an error. If path is set, the path configured on the requester itself is ignored. If header, params and body are set, they are merged with the ones configured on the requester itself.

If a log formatter is provided, it's used to log the performed request and response. If it's not provided, no logging is performed.