airbyte_cdk.sources.declarative.requesters

1#
2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3#
4
5from airbyte_cdk.sources.declarative.requesters.http_requester import HttpRequester
6from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption
7from airbyte_cdk.sources.declarative.requesters.requester import Requester
8
9__all__ = ["HttpRequester", "RequestOption", "Requester"]
@dataclass
class HttpRequester(airbyte_cdk.sources.declarative.requesters.Requester):
 37@dataclass
 38class HttpRequester(Requester):
 39    """
 40    Default implementation of a Requester
 41
 42    Attributes:
 43        name (str): Name of the stream. Only used for request/response caching
 44        url_base (Union[InterpolatedString, str]): Base url to send requests to
 45        path (Union[InterpolatedString, str]): Path to send requests to
 46        http_method (Union[str, HttpMethod]): HTTP method to use when sending requests
 47        request_options_provider (Optional[InterpolatedRequestOptionsProvider]): request option provider defining the options to set on outgoing requests
 48        authenticator (DeclarativeAuthenticator): Authenticator defining how to authenticate to the source
 49        error_handler (Optional[ErrorHandler]): Error handler defining how to detect and handle errors
 50        backoff_strategies (Optional[List[BackoffStrategy]]): List of backoff strategies to use when retrying requests
 51        config (Config): The user-provided configuration as specified by the source's spec
 52        use_cache (bool): Indicates that data should be cached for this stream
 53    """
 54
 55    name: str
 56    config: Config
 57    parameters: InitVar[Mapping[str, Any]]
 58
 59    url: Optional[Union[InterpolatedString, str]] = None
 60    url_base: Optional[Union[InterpolatedString, str]] = None
 61    path: Optional[Union[InterpolatedString, str]] = None
 62    authenticator: Optional[DeclarativeAuthenticator] = None
 63    http_method: Union[str, HttpMethod] = HttpMethod.GET
 64    request_options_provider: Optional[InterpolatedRequestOptionsProvider] = None
 65    error_handler: Optional[ErrorHandler] = None
 66    api_budget: Optional[APIBudget] = None
 67    disable_retries: bool = False
 68    message_repository: MessageRepository = NoopMessageRepository()
 69    use_cache: bool = False
 70    _exit_on_rate_limit: bool = False
 71    stream_response: bool = False
 72    decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={}))
 73
 74    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 75        self._url = InterpolatedString.create(
 76            self.url if self.url else EmptyString, parameters=parameters
 77        )
 78        # deprecated
 79        self._url_base = InterpolatedString.create(
 80            self.url_base if self.url_base else EmptyString, parameters=parameters
 81        )
 82        # deprecated
 83        self._path = InterpolatedString.create(
 84            self.path if self.path else EmptyString, parameters=parameters
 85        )
 86        if self.request_options_provider is None:
 87            self._request_options_provider = InterpolatedRequestOptionsProvider(
 88                config=self.config, parameters=parameters
 89            )
 90        elif isinstance(self.request_options_provider, dict):
 91            self._request_options_provider = InterpolatedRequestOptionsProvider(
 92                config=self.config, **self.request_options_provider
 93            )
 94        else:
 95            self._request_options_provider = self.request_options_provider
 96        self._authenticator = self.authenticator or NoAuth(parameters=parameters)
 97        self._http_method = (
 98            HttpMethod[self.http_method] if isinstance(self.http_method, str) else self.http_method
 99        )
100        self.error_handler = self.error_handler
101        self._parameters = parameters
102
103        if self.error_handler is not None and hasattr(self.error_handler, "backoff_strategies"):
104            backoff_strategies = self.error_handler.backoff_strategies  # type: ignore
105        else:
106            backoff_strategies = None
107
108        self._http_client = HttpClient(
109            name=self.name,
110            logger=self.logger,
111            error_handler=self.error_handler,
112            api_budget=self.api_budget,
113            authenticator=self._authenticator,
114            use_cache=self.use_cache,
115            backoff_strategy=backoff_strategies,
116            disable_retries=self.disable_retries,
117            message_repository=self.message_repository,
118        )
119
120    @property
121    def exit_on_rate_limit(self) -> bool:
122        return self._exit_on_rate_limit
123
124    @exit_on_rate_limit.setter
125    def exit_on_rate_limit(self, value: bool) -> None:
126        self._exit_on_rate_limit = value
127
128    def get_authenticator(self) -> DeclarativeAuthenticator:
129        return self._authenticator
130
131    def get_url(
132        self,
133        *,
134        stream_state: Optional[StreamState] = None,
135        stream_slice: Optional[StreamSlice] = None,
136        next_page_token: Optional[Mapping[str, Any]] = None,
137    ) -> str:
138        interpolation_context = get_interpolation_context(
139            stream_state=stream_state,
140            stream_slice=stream_slice,
141            next_page_token=next_page_token,
142        )
143
144        return str(self._url.eval(self.config, **interpolation_context))
145
146    def _get_url(
147        self,
148        *,
149        path: Optional[str] = None,
150        stream_state: Optional[StreamState] = None,
151        stream_slice: Optional[StreamSlice] = None,
152        next_page_token: Optional[Mapping[str, Any]] = None,
153    ) -> str:
154        url = self.get_url(
155            stream_state=stream_state,
156            stream_slice=stream_slice,
157            next_page_token=next_page_token,
158        )
159
160        url_base = self.get_url_base(
161            stream_state=stream_state,
162            stream_slice=stream_slice,
163            next_page_token=next_page_token,
164        )
165
166        path = path or self.get_path(
167            stream_state=stream_state,
168            stream_slice=stream_slice,
169            next_page_token=next_page_token,
170        )
171
172        full_url = (
173            self._join_url(url_base, path)
174            if url_base
175            else self._join_url(url, path)
176            if path
177            else url
178        )
179
180        return full_url
181
182    def get_url_base(
183        self,
184        *,
185        stream_state: Optional[StreamState] = None,
186        stream_slice: Optional[StreamSlice] = None,
187        next_page_token: Optional[Mapping[str, Any]] = None,
188    ) -> str:
189        interpolation_context = get_interpolation_context(
190            stream_state=stream_state,
191            stream_slice=stream_slice,
192            next_page_token=next_page_token,
193        )
194        return str(self._url_base.eval(self.config, **interpolation_context))
195
196    def get_path(
197        self,
198        *,
199        stream_state: Optional[StreamState] = None,
200        stream_slice: Optional[StreamSlice] = None,
201        next_page_token: Optional[Mapping[str, Any]] = None,
202    ) -> str:
203        interpolation_context = get_interpolation_context(
204            stream_state=stream_state,
205            stream_slice=stream_slice,
206            next_page_token=next_page_token,
207        )
208        path = str(self._path.eval(self.config, **interpolation_context))
209        return path.lstrip("/")
210
211    def get_method(self) -> HttpMethod:
212        return self._http_method
213
214    def get_request_params(
215        self,
216        *,
217        stream_state: Optional[StreamState] = None,
218        stream_slice: Optional[StreamSlice] = None,
219        next_page_token: Optional[Mapping[str, Any]] = None,
220    ) -> MutableMapping[str, Any]:
221        return self._request_options_provider.get_request_params(
222            stream_state=stream_state,
223            stream_slice=stream_slice,
224            next_page_token=next_page_token,
225        )
226
227    def get_request_headers(
228        self,
229        *,
230        stream_state: Optional[StreamState] = None,
231        stream_slice: Optional[StreamSlice] = None,
232        next_page_token: Optional[Mapping[str, Any]] = None,
233    ) -> Mapping[str, Any]:
234        return self._request_options_provider.get_request_headers(
235            stream_state=stream_state,
236            stream_slice=stream_slice,
237            next_page_token=next_page_token,
238        )
239
240    # fixing request options provider types has a lot of dependencies
241    def get_request_body_data(  # type: ignore
242        self,
243        *,
244        stream_state: Optional[StreamState] = None,
245        stream_slice: Optional[StreamSlice] = None,
246        next_page_token: Optional[Mapping[str, Any]] = None,
247    ) -> Union[Mapping[str, Any], str]:
248        return (
249            self._request_options_provider.get_request_body_data(
250                stream_state=stream_state,
251                stream_slice=stream_slice,
252                next_page_token=next_page_token,
253            )
254            or {}
255        )
256
257    # fixing request options provider types has a lot of dependencies
258    def get_request_body_json(  # type: ignore
259        self,
260        *,
261        stream_state: Optional[StreamState] = None,
262        stream_slice: Optional[StreamSlice] = None,
263        next_page_token: Optional[Mapping[str, Any]] = None,
264    ) -> Optional[Mapping[str, Any]]:
265        return self._request_options_provider.get_request_body_json(
266            stream_state=stream_state,
267            stream_slice=stream_slice,
268            next_page_token=next_page_token,
269        )
270
271    @property
272    def logger(self) -> logging.Logger:
273        return logging.getLogger(f"airbyte.HttpRequester.{self.name}")
274
275    def _get_request_options(
276        self,
277        stream_state: Optional[StreamState],
278        stream_slice: Optional[StreamSlice],
279        next_page_token: Optional[Mapping[str, Any]],
280        requester_method: Callable[..., Optional[Union[Mapping[str, Any], str]]],
281        auth_options_method: Callable[..., Optional[Union[Mapping[str, Any], str]]],
282        extra_options: Optional[Union[Mapping[str, Any], str]] = None,
283    ) -> Union[Mapping[str, Any], str]:
284        """
285        Get the request_option from the requester, the authenticator and extra_options passed in.
286        Raise a ValueError if there's a key collision
287        Returned merged mapping otherwise
288        """
289
290        is_body_json = requester_method.__name__ == "get_request_body_json"
291
292        return combine_mappings(
293            [
294                requester_method(
295                    stream_state=stream_state,
296                    stream_slice=stream_slice,
297                    next_page_token=next_page_token,
298                ),
299                auth_options_method(),
300                extra_options,
301            ],
302            allow_same_value_merge=is_body_json,
303        )
304
305    def _request_headers(
306        self,
307        stream_state: Optional[StreamState] = None,
308        stream_slice: Optional[StreamSlice] = None,
309        next_page_token: Optional[Mapping[str, Any]] = None,
310        extra_headers: Optional[Mapping[str, Any]] = None,
311    ) -> Mapping[str, Any]:
312        """
313        Specifies request headers.
314        Authentication headers will overwrite any overlapping headers returned from this method.
315        """
316        headers = self._get_request_options(
317            stream_state,
318            stream_slice,
319            next_page_token,
320            self.get_request_headers,
321            self.get_authenticator().get_auth_header,
322            extra_headers,
323        )
324        if isinstance(headers, str):
325            raise ValueError("Request headers cannot be a string")
326        return {str(k): str(v) for k, v in headers.items()}
327
328    def _request_params(
329        self,
330        stream_state: Optional[StreamState],
331        stream_slice: Optional[StreamSlice],
332        next_page_token: Optional[Mapping[str, Any]],
333        extra_params: Optional[Mapping[str, Any]] = None,
334    ) -> Mapping[str, Any]:
335        """
336        Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
337
338        E.g: you might want to define query parameters for paging if next_page_token is not None.
339        """
340        options = self._get_request_options(
341            stream_state,
342            stream_slice,
343            next_page_token,
344            self.get_request_params,
345            self.get_authenticator().get_request_params,
346            extra_params,
347        )
348        if isinstance(options, str):
349            raise ValueError("Request params cannot be a string")
350
351        for k, v in options.items():
352            if isinstance(v, (dict,)):
353                raise ValueError(
354                    f"Invalid value for `{k}` parameter. The values of request params cannot be an object."
355                )
356
357        return options
358
359    def _request_body_data(
360        self,
361        stream_state: Optional[StreamState],
362        stream_slice: Optional[StreamSlice],
363        next_page_token: Optional[Mapping[str, Any]],
364        extra_body_data: Optional[Union[Mapping[str, Any], str]] = None,
365    ) -> Optional[Union[Mapping[str, Any], str]]:
366        """
367        Specifies how to populate the body of the request with a non-JSON payload.
368
369        If returns a ready text that it will be sent as is.
370        If returns a dict that it will be converted to a urlencoded form.
371        E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
372
373        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
374        """
375        # Warning: use self.state instead of the stream_state passed as argument!
376        return self._get_request_options(
377            stream_state,
378            stream_slice,
379            next_page_token,
380            self.get_request_body_data,
381            self.get_authenticator().get_request_body_data,
382            extra_body_data,
383        )
384
385    def _request_body_json(
386        self,
387        stream_state: Optional[StreamState],
388        stream_slice: Optional[StreamSlice],
389        next_page_token: Optional[Mapping[str, Any]],
390        extra_body_json: Optional[Mapping[str, Any]] = None,
391    ) -> Optional[Mapping[str, Any]]:
392        """
393        Specifies how to populate the body of the request with a JSON payload.
394
395        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
396        """
397        # Warning: use self.state instead of the stream_state passed as argument!
398        options = self._get_request_options(
399            stream_state,
400            stream_slice,
401            next_page_token,
402            self.get_request_body_json,
403            self.get_authenticator().get_request_body_json,
404            extra_body_json,
405        )
406        if isinstance(options, str):
407            raise ValueError("Request body json cannot be a string")
408        return options
409
410    @classmethod
411    def _join_url(cls, url_base: str, path: Optional[str] = None) -> str:
412        """
413        Joins a base URL with a given path and returns the resulting URL with any trailing slash removed.
414
415        This method ensures that there are no duplicate slashes when concatenating the base URL and the path,
416        which is useful when the full URL is provided from an interpolation context.
417
418        Args:
419            url_base (str): The base URL to which the path will be appended.
420            path (Optional[str]): The path to join with the base URL.
421
422        Returns:
423            str: The resulting joined URL.
424
425        Note:
426            Related issue: https://github.com/airbytehq/airbyte-internal-issues/issues/11869
427            - If the path is an empty string or None, the method returns the base URL with any trailing slash removed.
428
429        Example:
430            1) _join_url("https://example.com/api/", "endpoint") >> 'https://example.com/api/endpoint'
431            2) _join_url("https://example.com/api", "/endpoint") >> 'https://example.com/api/endpoint'
432            3) _join_url("https://example.com/api/", "") >> 'https://example.com/api/'
433            4) _join_url("https://example.com/api", None) >> 'https://example.com/api'
434        """
435
436        # return a full-url if provided directly from interpolation context
437        if path == EmptyString or path is None:
438            return url_base
439        else:
440            # since we didn't provide a full-url, the url_base might not have a trailing slash
441            # so we join the url_base and path correctly
442            if not url_base.endswith("/"):
443                url_base += "/"
444
445        return urljoin(url_base, path)
446
447    def send_request(
448        self,
449        stream_state: Optional[StreamState] = None,
450        stream_slice: Optional[StreamSlice] = None,
451        next_page_token: Optional[Mapping[str, Any]] = None,
452        path: Optional[str] = None,
453        request_headers: Optional[Mapping[str, Any]] = None,
454        request_params: Optional[Mapping[str, Any]] = None,
455        request_body_data: Optional[Union[Mapping[str, Any], str]] = None,
456        request_body_json: Optional[Mapping[str, Any]] = None,
457        log_formatter: Optional[Callable[[requests.Response], Any]] = None,
458    ) -> Optional[requests.Response]:
459        request, response = self._http_client.send_request(
460            http_method=self.get_method().value,
461            url=self._get_url(
462                path=path,
463                stream_state=stream_state,
464                stream_slice=stream_slice,
465                next_page_token=next_page_token,
466            ),
467            request_kwargs={"stream": self.stream_response},
468            headers=self._request_headers(
469                stream_state, stream_slice, next_page_token, request_headers
470            ),
471            params=self._request_params(
472                stream_state, stream_slice, next_page_token, request_params
473            ),
474            json=self._request_body_json(
475                stream_state, stream_slice, next_page_token, request_body_json
476            ),
477            data=self._request_body_data(
478                stream_state, stream_slice, next_page_token, request_body_data
479            ),
480            dedupe_query_params=True,
481            log_formatter=log_formatter,
482            exit_on_rate_limit=self._exit_on_rate_limit,
483        )
484
485        return response

Default implementation of a Requester

Attributes:
  • name (str): Name of the stream. Only used for request/response caching
  • url_base (Union[InterpolatedString, str]): Base url to send requests to
  • path (Union[InterpolatedString, str]): Path to send requests to
  • http_method (Union[str, HttpMethod]): HTTP method to use when sending requests
  • request_options_provider (Optional[InterpolatedRequestOptionsProvider]): request option provider defining the options to set on outgoing requests
  • authenticator (DeclarativeAuthenticator): Authenticator defining how to authenticate to the source
  • error_handler (Optional[ErrorHandler]): Error handler defining how to detect and handle errors
  • backoff_strategies (Optional[List[BackoffStrategy]]): List of backoff strategies to use when retrying requests
  • config (Config): The user-provided configuration as specified by the source's spec
  • use_cache (bool): Indicates that data should be cached for this stream
HttpRequester( name: str, config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], url: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, url_base: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, path: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, authenticator: Optional[airbyte_cdk.DeclarativeAuthenticator] = None, http_method: Union[str, airbyte_cdk.HttpMethod] = <HttpMethod.GET: 'GET'>, request_options_provider: Optional[airbyte_cdk.sources.declarative.requesters.request_options.InterpolatedRequestOptionsProvider] = None, error_handler: Optional[airbyte_cdk.sources.streams.http.error_handlers.ErrorHandler] = None, api_budget: Optional[airbyte_cdk.sources.streams.call_rate.APIBudget] = None, disable_retries: bool = False, message_repository: airbyte_cdk.MessageRepository = <airbyte_cdk.sources.message.NoopMessageRepository object>, use_cache: bool = False, _exit_on_rate_limit: bool = False, stream_response: bool = False, decoder: airbyte_cdk.Decoder = <factory>)
name: str
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
url: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
url_base: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
path: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
authenticator: Optional[airbyte_cdk.DeclarativeAuthenticator] = None
http_method: Union[str, airbyte_cdk.HttpMethod] = <HttpMethod.GET: 'GET'>
disable_retries: bool = False
use_cache: bool = False
stream_response: bool = False
exit_on_rate_limit: bool
120    @property
121    def exit_on_rate_limit(self) -> bool:
122        return self._exit_on_rate_limit
def get_authenticator( self) -> airbyte_cdk.DeclarativeAuthenticator:
128    def get_authenticator(self) -> DeclarativeAuthenticator:
129        return self._authenticator

Specifies the authenticator to use when submitting requests

def get_url( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> str:
131    def get_url(
132        self,
133        *,
134        stream_state: Optional[StreamState] = None,
135        stream_slice: Optional[StreamSlice] = None,
136        next_page_token: Optional[Mapping[str, Any]] = None,
137    ) -> str:
138        interpolation_context = get_interpolation_context(
139            stream_state=stream_state,
140            stream_slice=stream_slice,
141            next_page_token=next_page_token,
142        )
143
144        return str(self._url.eval(self.config, **interpolation_context))
Returns

URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"

def get_url_base( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> str:
182    def get_url_base(
183        self,
184        *,
185        stream_state: Optional[StreamState] = None,
186        stream_slice: Optional[StreamSlice] = None,
187        next_page_token: Optional[Mapping[str, Any]] = None,
188    ) -> str:
189        interpolation_context = get_interpolation_context(
190            stream_state=stream_state,
191            stream_slice=stream_slice,
192            next_page_token=next_page_token,
193        )
194        return str(self._url_base.eval(self.config, **interpolation_context))
Returns

URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"

def get_path( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> str:
196    def get_path(
197        self,
198        *,
199        stream_state: Optional[StreamState] = None,
200        stream_slice: Optional[StreamSlice] = None,
201        next_page_token: Optional[Mapping[str, Any]] = None,
202    ) -> str:
203        interpolation_context = get_interpolation_context(
204            stream_state=stream_state,
205            stream_slice=stream_slice,
206            next_page_token=next_page_token,
207        )
208        path = str(self._path.eval(self.config, **interpolation_context))
209        return path.lstrip("/")

Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity"

def get_method(self) -> airbyte_cdk.HttpMethod:
211    def get_method(self) -> HttpMethod:
212        return self._http_method

Specifies the HTTP method to use

def get_request_params( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> MutableMapping[str, Any]:
214    def get_request_params(
215        self,
216        *,
217        stream_state: Optional[StreamState] = None,
218        stream_slice: Optional[StreamSlice] = None,
219        next_page_token: Optional[Mapping[str, Any]] = None,
220    ) -> MutableMapping[str, Any]:
221        return self._request_options_provider.get_request_params(
222            stream_state=stream_state,
223            stream_slice=stream_slice,
224            next_page_token=next_page_token,
225        )

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def get_request_headers( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
227    def get_request_headers(
228        self,
229        *,
230        stream_state: Optional[StreamState] = None,
231        stream_slice: Optional[StreamSlice] = None,
232        next_page_token: Optional[Mapping[str, Any]] = None,
233    ) -> Mapping[str, Any]:
234        return self._request_options_provider.get_request_headers(
235            stream_state=stream_state,
236            stream_slice=stream_slice,
237            next_page_token=next_page_token,
238        )

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def get_request_body_data( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Union[Mapping[str, Any], str]:
241    def get_request_body_data(  # type: ignore
242        self,
243        *,
244        stream_state: Optional[StreamState] = None,
245        stream_slice: Optional[StreamSlice] = None,
246        next_page_token: Optional[Mapping[str, Any]] = None,
247    ) -> Union[Mapping[str, Any], str]:
248        return (
249            self._request_options_provider.get_request_body_data(
250                stream_state=stream_state,
251                stream_slice=stream_slice,
252                next_page_token=next_page_token,
253            )
254            or {}
255        )

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_request_body_json( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Optional[Mapping[str, Any]]:
258    def get_request_body_json(  # type: ignore
259        self,
260        *,
261        stream_state: Optional[StreamState] = None,
262        stream_slice: Optional[StreamSlice] = None,
263        next_page_token: Optional[Mapping[str, Any]] = None,
264    ) -> Optional[Mapping[str, Any]]:
265        return self._request_options_provider.get_request_body_json(
266            stream_state=stream_state,
267            stream_slice=stream_slice,
268            next_page_token=next_page_token,
269        )

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

logger: logging.Logger
271    @property
272    def logger(self) -> logging.Logger:
273        return logging.getLogger(f"airbyte.HttpRequester.{self.name}")
def send_request( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, path: Optional[str] = None, request_headers: Optional[Mapping[str, Any]] = None, request_params: Optional[Mapping[str, Any]] = None, request_body_data: Union[Mapping[str, Any], str, NoneType] = None, request_body_json: Optional[Mapping[str, Any]] = None, log_formatter: Optional[Callable[[requests.models.Response], Any]] = None) -> Optional[requests.models.Response]:
447    def send_request(
448        self,
449        stream_state: Optional[StreamState] = None,
450        stream_slice: Optional[StreamSlice] = None,
451        next_page_token: Optional[Mapping[str, Any]] = None,
452        path: Optional[str] = None,
453        request_headers: Optional[Mapping[str, Any]] = None,
454        request_params: Optional[Mapping[str, Any]] = None,
455        request_body_data: Optional[Union[Mapping[str, Any], str]] = None,
456        request_body_json: Optional[Mapping[str, Any]] = None,
457        log_formatter: Optional[Callable[[requests.Response], Any]] = None,
458    ) -> Optional[requests.Response]:
459        request, response = self._http_client.send_request(
460            http_method=self.get_method().value,
461            url=self._get_url(
462                path=path,
463                stream_state=stream_state,
464                stream_slice=stream_slice,
465                next_page_token=next_page_token,
466            ),
467            request_kwargs={"stream": self.stream_response},
468            headers=self._request_headers(
469                stream_state, stream_slice, next_page_token, request_headers
470            ),
471            params=self._request_params(
472                stream_state, stream_slice, next_page_token, request_params
473            ),
474            json=self._request_body_json(
475                stream_state, stream_slice, next_page_token, request_body_json
476            ),
477            data=self._request_body_data(
478                stream_state, stream_slice, next_page_token, request_body_data
479            ),
480            dedupe_query_params=True,
481            log_formatter=log_formatter,
482            exit_on_rate_limit=self._exit_on_rate_limit,
483        )
484
485        return response

Sends a request and returns the response. Might return no response if the error handler chooses to ignore the response or throw an exception in case of an error. If path is set, the path configured on the requester itself is ignored. If header, params and body are set, they are merged with the ones configured on the requester itself.

If a log formatter is provided, it's used to log the performed request and response. If it's not provided, no logging is performed.

@dataclass
class RequestOption:
 25@dataclass
 26class RequestOption:
 27    """
 28    Describes an option to set on a request
 29
 30    Attributes:
 31        field_name (str): Describes the name of the parameter to inject. Mutually exclusive with field_path.
 32        field_path (list(str)): Describes the path to a nested field as a list of field names.
 33          Only valid for body_json injection type, and mutually exclusive with field_name.
 34        inject_into (RequestOptionType): Describes where in the HTTP request to inject the parameter
 35    """
 36
 37    inject_into: RequestOptionType
 38    parameters: InitVar[Mapping[str, Any]]
 39    field_name: Optional[Union[InterpolatedString, str]] = None
 40    field_path: Optional[List[Union[InterpolatedString, str]]] = None
 41
 42    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 43        # Validate inputs. We should expect either field_name or field_path, but not both
 44        if self.field_name is None and self.field_path is None:
 45            raise ValueError("RequestOption requires either a field_name or field_path")
 46
 47        if self.field_name is not None and self.field_path is not None:
 48            raise ValueError(
 49                "Only one of field_name or field_path can be provided to RequestOption"
 50            )
 51
 52        # Nested field injection is only supported for body JSON injection
 53        if self.field_path is not None and self.inject_into != RequestOptionType.body_json:
 54            raise ValueError(
 55                "Nested field injection is only supported for body JSON injection. Please use a top-level field_name for other injection types."
 56            )
 57
 58        # Convert field_name and field_path into InterpolatedString objects if they are strings
 59        if self.field_name is not None:
 60            self.field_name = InterpolatedString.create(self.field_name, parameters=parameters)
 61        elif self.field_path is not None:
 62            self.field_path = [
 63                InterpolatedString.create(segment, parameters=parameters)
 64                for segment in self.field_path
 65            ]
 66
 67    @property
 68    def _is_field_path(self) -> bool:
 69        """Returns whether this option is a field path (ie, a nested field)"""
 70        return self.field_path is not None
 71
 72    def inject_into_request(
 73        self,
 74        target: MutableMapping[str, Any],
 75        value: Any,
 76        config: Config,
 77    ) -> None:
 78        """
 79        Inject a request option value into a target request structure using either field_name or field_path.
 80        For non-body-json injection, only top-level field names are supported.
 81        For body-json injection, both field names and nested field paths are supported.
 82
 83        Args:
 84            target: The request structure to inject the value into
 85            value: The value to inject
 86            config: The config object to use for interpolation
 87        """
 88        if self._is_field_path:
 89            if self.inject_into != RequestOptionType.body_json:
 90                raise ValueError(
 91                    "Nested field injection is only supported for body JSON injection. Please use a top-level field_name for other injection types."
 92                )
 93
 94            assert self.field_path is not None  # for type checker
 95            current = target
 96            # Convert path segments into strings, evaluating any interpolated segments
 97            # Example: ["data", "{{ config[user_type] }}", "id"] -> ["data", "admin", "id"]
 98            *path_parts, final_key = [
 99                str(
100                    segment.eval(config=config)
101                    if isinstance(segment, InterpolatedString)
102                    else segment
103                )
104                for segment in self.field_path
105            ]
106
107            # Build a nested dictionary structure and set the final value at the deepest level
108            for part in path_parts:
109                current = current.setdefault(part, {})
110            current[final_key] = value
111        else:
112            # For non-nested fields, evaluate the field name if it's an interpolated string
113            key = (
114                self.field_name.eval(config=config)
115                if isinstance(self.field_name, InterpolatedString)
116                else self.field_name
117            )
118            target[str(key)] = value

Describes an option to set on a request

Attributes:
  • field_name (str): Describes the name of the parameter to inject. Mutually exclusive with field_path.
  • field_path (list(str)): Describes the path to a nested field as a list of field names. Only valid for body_json injection type, and mutually exclusive with field_name.
  • inject_into (RequestOptionType): Describes where in the HTTP request to inject the parameter
RequestOption( inject_into: airbyte_cdk.RequestOptionType, parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], field_name: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, field_path: Optional[List[Union[airbyte_cdk.InterpolatedString, str]]] = None)
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
field_name: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
field_path: Optional[List[Union[airbyte_cdk.InterpolatedString, str]]] = None
def inject_into_request( self, target: MutableMapping[str, Any], value: Any, config: Mapping[str, Any]) -> None:
 72    def inject_into_request(
 73        self,
 74        target: MutableMapping[str, Any],
 75        value: Any,
 76        config: Config,
 77    ) -> None:
 78        """
 79        Inject a request option value into a target request structure using either field_name or field_path.
 80        For non-body-json injection, only top-level field names are supported.
 81        For body-json injection, both field names and nested field paths are supported.
 82
 83        Args:
 84            target: The request structure to inject the value into
 85            value: The value to inject
 86            config: The config object to use for interpolation
 87        """
 88        if self._is_field_path:
 89            if self.inject_into != RequestOptionType.body_json:
 90                raise ValueError(
 91                    "Nested field injection is only supported for body JSON injection. Please use a top-level field_name for other injection types."
 92                )
 93
 94            assert self.field_path is not None  # for type checker
 95            current = target
 96            # Convert path segments into strings, evaluating any interpolated segments
 97            # Example: ["data", "{{ config[user_type] }}", "id"] -> ["data", "admin", "id"]
 98            *path_parts, final_key = [
 99                str(
100                    segment.eval(config=config)
101                    if isinstance(segment, InterpolatedString)
102                    else segment
103                )
104                for segment in self.field_path
105            ]
106
107            # Build a nested dictionary structure and set the final value at the deepest level
108            for part in path_parts:
109                current = current.setdefault(part, {})
110            current[final_key] = value
111        else:
112            # For non-nested fields, evaluate the field name if it's an interpolated string
113            key = (
114                self.field_name.eval(config=config)
115                if isinstance(self.field_name, InterpolatedString)
116                else self.field_name
117            )
118            target[str(key)] = value

Inject a request option value into a target request structure using either field_name or field_path. For non-body-json injection, only top-level field names are supported. For body-json injection, both field names and nested field paths are supported.

Arguments:
  • target: The request structure to inject the value into
  • value: The value to inject
  • config: The config object to use for interpolation
 30class Requester(RequestOptionsProvider):
 31    @abstractmethod
 32    def get_authenticator(self) -> DeclarativeAuthenticator:
 33        """
 34        Specifies the authenticator to use when submitting requests
 35        """
 36        pass
 37
 38    @abstractmethod
 39    def get_url(
 40        self,
 41        *,
 42        stream_state: Optional[StreamState],
 43        stream_slice: Optional[StreamSlice],
 44        next_page_token: Optional[Mapping[str, Any]],
 45    ) -> str:
 46        """
 47        :return: URL base for the  API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"
 48        """
 49
 50    @abstractmethod
 51    def get_url_base(
 52        self,
 53        *,
 54        stream_state: Optional[StreamState],
 55        stream_slice: Optional[StreamSlice],
 56        next_page_token: Optional[Mapping[str, Any]],
 57    ) -> str:
 58        """
 59        :return: URL base for the  API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"
 60        """
 61
 62    @abstractmethod
 63    def get_path(
 64        self,
 65        *,
 66        stream_state: Optional[StreamState],
 67        stream_slice: Optional[StreamSlice],
 68        next_page_token: Optional[Mapping[str, Any]],
 69    ) -> str:
 70        """
 71        Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity"
 72        """
 73
 74    @abstractmethod
 75    def get_method(self) -> HttpMethod:
 76        """
 77        Specifies the HTTP method to use
 78        """
 79
 80    @abstractmethod
 81    def get_request_params(
 82        self,
 83        *,
 84        stream_state: Optional[StreamState] = None,
 85        stream_slice: Optional[StreamSlice] = None,
 86        next_page_token: Optional[Mapping[str, Any]] = None,
 87    ) -> MutableMapping[str, Any]:
 88        """
 89        Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
 90
 91        E.g: you might want to define query parameters for paging if next_page_token is not None.
 92        """
 93
 94    @abstractmethod
 95    def get_request_headers(
 96        self,
 97        *,
 98        stream_state: Optional[StreamState] = None,
 99        stream_slice: Optional[StreamSlice] = None,
100        next_page_token: Optional[Mapping[str, Any]] = None,
101    ) -> Mapping[str, Any]:
102        """
103        Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
104        """
105
106    @abstractmethod
107    def get_request_body_data(
108        self,
109        *,
110        stream_state: Optional[StreamState] = None,
111        stream_slice: Optional[StreamSlice] = None,
112        next_page_token: Optional[Mapping[str, Any]] = None,
113    ) -> Union[Mapping[str, Any], str]:
114        """
115        Specifies how to populate the body of the request with a non-JSON payload.
116
117        If returns a ready text that it will be sent as is.
118        If returns a dict that it will be converted to a urlencoded form.
119        E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
120
121        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
122        """
123
124    @abstractmethod
125    def get_request_body_json(
126        self,
127        *,
128        stream_state: Optional[StreamState] = None,
129        stream_slice: Optional[StreamSlice] = None,
130        next_page_token: Optional[Mapping[str, Any]] = None,
131    ) -> Mapping[str, Any]:
132        """
133        Specifies how to populate the body of the request with a JSON payload.
134
135        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
136        """
137
138    @abstractmethod
139    def send_request(
140        self,
141        stream_state: Optional[StreamState] = None,
142        stream_slice: Optional[StreamSlice] = None,
143        next_page_token: Optional[Mapping[str, Any]] = None,
144        path: Optional[str] = None,
145        request_headers: Optional[Mapping[str, Any]] = None,
146        request_params: Optional[Mapping[str, Any]] = None,
147        request_body_data: Optional[Union[Mapping[str, Any], str]] = None,
148        request_body_json: Optional[Mapping[str, Any]] = None,
149        log_formatter: Optional[Callable[[requests.Response], Any]] = None,
150    ) -> Optional[requests.Response]:
151        """
152        Sends a request and returns the response. Might return no response if the error handler chooses to ignore the response or throw an exception in case of an error.
153        If path is set, the path configured on the requester itself is ignored.
154        If header, params and body are set, they are merged with the ones configured on the requester itself.
155
156        If a log formatter is provided, it's used to log the performed request and response. If it's not provided, no logging is performed.
157        """

Defines the request options to set on an outgoing HTTP request

Options can be passed by

  • request parameter
  • request headers
  • body data
  • json content
@abstractmethod
def get_authenticator( self) -> airbyte_cdk.DeclarativeAuthenticator:
31    @abstractmethod
32    def get_authenticator(self) -> DeclarativeAuthenticator:
33        """
34        Specifies the authenticator to use when submitting requests
35        """
36        pass

Specifies the authenticator to use when submitting requests

@abstractmethod
def get_url( self, *, stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[airbyte_cdk.StreamSlice], next_page_token: Optional[Mapping[str, Any]]) -> str:
38    @abstractmethod
39    def get_url(
40        self,
41        *,
42        stream_state: Optional[StreamState],
43        stream_slice: Optional[StreamSlice],
44        next_page_token: Optional[Mapping[str, Any]],
45    ) -> str:
46        """
47        :return: URL base for the  API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"
48        """
Returns

URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"

@abstractmethod
def get_url_base( self, *, stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[airbyte_cdk.StreamSlice], next_page_token: Optional[Mapping[str, Any]]) -> str:
50    @abstractmethod
51    def get_url_base(
52        self,
53        *,
54        stream_state: Optional[StreamState],
55        stream_slice: Optional[StreamSlice],
56        next_page_token: Optional[Mapping[str, Any]],
57    ) -> str:
58        """
59        :return: URL base for the  API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"
60        """
Returns

URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"

@abstractmethod
def get_path( self, *, stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[airbyte_cdk.StreamSlice], next_page_token: Optional[Mapping[str, Any]]) -> str:
62    @abstractmethod
63    def get_path(
64        self,
65        *,
66        stream_state: Optional[StreamState],
67        stream_slice: Optional[StreamSlice],
68        next_page_token: Optional[Mapping[str, Any]],
69    ) -> str:
70        """
71        Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity"
72        """

Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity"

@abstractmethod
def get_method(self) -> airbyte_cdk.HttpMethod:
74    @abstractmethod
75    def get_method(self) -> HttpMethod:
76        """
77        Specifies the HTTP method to use
78        """

Specifies the HTTP method to use

@abstractmethod
def get_request_params( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> MutableMapping[str, Any]:
80    @abstractmethod
81    def get_request_params(
82        self,
83        *,
84        stream_state: Optional[StreamState] = None,
85        stream_slice: Optional[StreamSlice] = None,
86        next_page_token: Optional[Mapping[str, Any]] = None,
87    ) -> MutableMapping[str, Any]:
88        """
89        Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
90
91        E.g: you might want to define query parameters for paging if next_page_token is not None.
92        """

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

@abstractmethod
def get_request_headers( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
 94    @abstractmethod
 95    def get_request_headers(
 96        self,
 97        *,
 98        stream_state: Optional[StreamState] = None,
 99        stream_slice: Optional[StreamSlice] = None,
100        next_page_token: Optional[Mapping[str, Any]] = None,
101    ) -> Mapping[str, Any]:
102        """
103        Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
104        """

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

@abstractmethod
def get_request_body_data( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Union[Mapping[str, Any], str]:
106    @abstractmethod
107    def get_request_body_data(
108        self,
109        *,
110        stream_state: Optional[StreamState] = None,
111        stream_slice: Optional[StreamSlice] = None,
112        next_page_token: Optional[Mapping[str, Any]] = None,
113    ) -> Union[Mapping[str, Any], str]:
114        """
115        Specifies how to populate the body of the request with a non-JSON payload.
116
117        If returns a ready text that it will be sent as is.
118        If returns a dict that it will be converted to a urlencoded form.
119        E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
120
121        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
122        """

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

@abstractmethod
def get_request_body_json( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
124    @abstractmethod
125    def get_request_body_json(
126        self,
127        *,
128        stream_state: Optional[StreamState] = None,
129        stream_slice: Optional[StreamSlice] = None,
130        next_page_token: Optional[Mapping[str, Any]] = None,
131    ) -> Mapping[str, Any]:
132        """
133        Specifies how to populate the body of the request with a JSON payload.
134
135        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
136        """

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

@abstractmethod
def send_request( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, path: Optional[str] = None, request_headers: Optional[Mapping[str, Any]] = None, request_params: Optional[Mapping[str, Any]] = None, request_body_data: Union[Mapping[str, Any], str, NoneType] = None, request_body_json: Optional[Mapping[str, Any]] = None, log_formatter: Optional[Callable[[requests.models.Response], Any]] = None) -> Optional[requests.models.Response]:
138    @abstractmethod
139    def send_request(
140        self,
141        stream_state: Optional[StreamState] = None,
142        stream_slice: Optional[StreamSlice] = None,
143        next_page_token: Optional[Mapping[str, Any]] = None,
144        path: Optional[str] = None,
145        request_headers: Optional[Mapping[str, Any]] = None,
146        request_params: Optional[Mapping[str, Any]] = None,
147        request_body_data: Optional[Union[Mapping[str, Any], str]] = None,
148        request_body_json: Optional[Mapping[str, Any]] = None,
149        log_formatter: Optional[Callable[[requests.Response], Any]] = None,
150    ) -> Optional[requests.Response]:
151        """
152        Sends a request and returns the response. Might return no response if the error handler chooses to ignore the response or throw an exception in case of an error.
153        If path is set, the path configured on the requester itself is ignored.
154        If header, params and body are set, they are merged with the ones configured on the requester itself.
155
156        If a log formatter is provided, it's used to log the performed request and response. If it's not provided, no logging is performed.
157        """

Sends a request and returns the response. Might return no response if the error handler chooses to ignore the response or throw an exception in case of an error. If path is set, the path configured on the requester itself is ignored. If header, params and body are set, they are merged with the ones configured on the requester itself.

If a log formatter is provided, it's used to log the performed request and response. If it's not provided, no logging is performed.