airbyte_cdk.sources.streams.http

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5# Initialize Streams Package
 6from .exceptions import UserDefinedBackoffException
 7from .http import HttpStream, HttpSubStream
 8from .http_client import HttpClient
 9
10__all__ = ["HttpClient", "HttpStream", "HttpSubStream", "UserDefinedBackoffException"]
class HttpClient:
 86class HttpClient:
 87    _DEFAULT_MAX_RETRY: int = 5
 88    _DEFAULT_MAX_TIME: int = 60 * 10
 89    _ACTIONS_TO_RETRY_ON = {
 90        ResponseAction.RETRY,
 91        ResponseAction.RATE_LIMITED,
 92        ResponseAction.REFRESH_TOKEN_THEN_RETRY,
 93    }
 94
 95    def __init__(
 96        self,
 97        name: str,
 98        logger: logging.Logger,
 99        error_handler: Optional[ErrorHandler] = None,
100        api_budget: Optional[APIBudget] = None,
101        session: Optional[Union[requests.Session, requests_cache.CachedSession]] = None,
102        authenticator: Optional[AuthBase] = None,
103        use_cache: bool = False,
104        backoff_strategy: Optional[Union[BackoffStrategy, List[BackoffStrategy]]] = None,
105        error_message_parser: Optional[ErrorMessageParser] = None,
106        disable_retries: bool = False,
107        message_repository: Optional[MessageRepository] = None,
108    ):
109        self._name = name
110        self._api_budget: APIBudget = api_budget or APIBudget(policies=[])
111        if session:
112            self._session = session
113        else:
114            self._use_cache = use_cache
115            self._session = self._request_session()
116            self._session.mount(
117                "https://",
118                requests.adapters.HTTPAdapter(
119                    pool_connections=MAX_CONNECTION_POOL_SIZE, pool_maxsize=MAX_CONNECTION_POOL_SIZE
120                ),
121            )
122        if isinstance(authenticator, AuthBase):
123            self._session.auth = authenticator
124        self._logger = logger
125        self._error_handler = error_handler or HttpStatusErrorHandler(self._logger)
126        if backoff_strategy is not None:
127            if isinstance(backoff_strategy, list):
128                self._backoff_strategies = backoff_strategy
129            else:
130                self._backoff_strategies = [backoff_strategy]
131        else:
132            self._backoff_strategies = [DefaultBackoffStrategy()]
133        self._error_message_parser = error_message_parser or JsonErrorMessageParser()
134        self._request_attempt_count: Dict[requests.PreparedRequest, int] = {}
135        self._disable_retries = disable_retries
136        self._message_repository = message_repository
137
138    @property
139    def cache_filename(self) -> str:
140        """
141        Override if needed. Return the name of cache file
142        Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.
143        """
144        return f"{self._name}.sqlite"
145
146    def _request_session(self) -> requests.Session:
147        """
148        Session factory based on use_cache property and call rate limits (api_budget parameter)
149        :return: instance of request-based session
150        """
151        if self._use_cache:
152            cache_dir = os.getenv(ENV_REQUEST_CACHE_PATH)
153            # Use in-memory cache if cache_dir is not set
154            # This is a non-obvious interface, but it ensures we don't write sql files when running unit tests
155            # Use in-memory cache if cache_dir is not set
156            # This is a non-obvious interface, but it ensures we don't write sql files when running unit tests
157            sqlite_path = (
158                str(Path(cache_dir) / self.cache_filename)
159                if cache_dir
160                else "file::memory:?cache=shared"
161            )
162            # By using `PRAGMA synchronous=OFF` and `PRAGMA journal_mode=WAL`, we reduce the possible occurrences of `database table is locked` errors.
163            # Note that those were blindly added at the same time and one or the other might be sufficient to prevent the issues but we have seen good results with both. Feel free to revisit given more information.
164            # There are strong signals that `fast_save` might create problems but if the sync crashes, we start back from the beginning in terms of sqlite anyway so the impact should be minimal. Signals are:
165            # * https://github.com/requests-cache/requests-cache/commit/7fa89ffda300331c37d8fad7f773348a3b5b0236#diff-f43db4a5edf931647c32dec28ea7557aae4cae8444af4b26c8ecbe88d8c925aaR238
166            # * https://github.com/requests-cache/requests-cache/commit/7fa89ffda300331c37d8fad7f773348a3b5b0236#diff-2e7f95b7d7be270ff1a8118f817ea3e6663cdad273592e536a116c24e6d23c18R164-R168
167            # * `If the application running SQLite crashes, the data will be safe, but the database [might become corrupted](https://www.sqlite.org/howtocorrupt.html#cfgerr) if the operating system crashes or the computer loses power before that data has been written to the disk surface.` in [this description](https://www.sqlite.org/pragma.html#pragma_synchronous).
168            backend = requests_cache.SQLiteCache(sqlite_path, fast_save=True, wal=True)
169            return CachedLimiterSession(
170                cache_name=sqlite_path,
171                backend=backend,
172                api_budget=self._api_budget,
173                match_headers=True,
174            )
175        else:
176            return LimiterSession(api_budget=self._api_budget)
177
178    def clear_cache(self) -> None:
179        """
180        Clear cached requests for current session, can be called any time
181        """
182        if isinstance(self._session, requests_cache.CachedSession):
183            self._session.cache.clear()  # type: ignore # cache.clear is not typed
184
185    def _dedupe_query_params(
186        self, url: str, params: Optional[Mapping[str, str]]
187    ) -> Mapping[str, str]:
188        """
189        Remove query parameters from params mapping if they are already encoded in the URL.
190        :param url: URL with
191        :param params:
192        :return:
193        """
194        if params is None:
195            params = {}
196        query_string = urllib.parse.urlparse(url).query
197        query_dict = {k: v[0] for k, v in urllib.parse.parse_qs(query_string).items()}
198
199        duplicate_keys_with_same_value = {
200            k for k in query_dict.keys() if str(params.get(k)) == str(query_dict[k])
201        }
202        return {k: v for k, v in params.items() if k not in duplicate_keys_with_same_value}
203
204    def _create_prepared_request(
205        self,
206        http_method: str,
207        url: str,
208        dedupe_query_params: bool = False,
209        headers: Optional[Mapping[str, str]] = None,
210        params: Optional[Mapping[str, str]] = None,
211        json: Optional[Mapping[str, Any]] = None,
212        data: Optional[Union[str, Mapping[str, Any]]] = None,
213    ) -> requests.PreparedRequest:
214        if dedupe_query_params:
215            query_params = self._dedupe_query_params(url, params)
216        else:
217            query_params = params or {}
218        args = {"method": http_method, "url": url, "headers": headers, "params": query_params}
219        if http_method.upper() in BODY_REQUEST_METHODS:
220            if json and data:
221                raise RequestBodyException(
222                    "At the same time only one of the 'request_body_data' and 'request_body_json' functions can return data"
223                )
224            elif json:
225                args["json"] = json
226            elif data:
227                args["data"] = data
228        prepared_request: requests.PreparedRequest = self._session.prepare_request(
229            requests.Request(**args)
230        )
231
232        return prepared_request
233
234    @property
235    def _max_retries(self) -> int:
236        """
237        Determines the max retries based on the provided error handler.
238        """
239        max_retries = None
240        if self._disable_retries:
241            max_retries = 0
242        else:
243            max_retries = self._error_handler.max_retries
244        return max_retries if max_retries is not None else self._DEFAULT_MAX_RETRY
245
246    @property
247    def _max_time(self) -> int:
248        """
249        Determines the max time based on the provided error handler.
250        """
251        return (
252            self._error_handler.max_time
253            if self._error_handler.max_time is not None
254            else self._DEFAULT_MAX_TIME
255        )
256
257    def _send_with_retry(
258        self,
259        request: requests.PreparedRequest,
260        request_kwargs: Mapping[str, Any],
261        log_formatter: Optional[Callable[[requests.Response], Any]] = None,
262        exit_on_rate_limit: Optional[bool] = False,
263    ) -> requests.Response:
264        """
265        Sends a request with retry logic.
266
267        Args:
268            request (requests.PreparedRequest): The prepared HTTP request to send.
269            request_kwargs (Mapping[str, Any]): Additional keyword arguments for the request.
270
271        Returns:
272            requests.Response: The HTTP response received from the server after retries.
273        """
274
275        max_retries = self._max_retries
276        max_tries = max(0, max_retries) + 1
277        max_time = self._max_time
278
279        user_backoff_handler = user_defined_backoff_handler(max_tries=max_tries, max_time=max_time)(
280            self._send
281        )
282        rate_limit_backoff_handler = rate_limit_default_backoff_handler(max_tries=max_tries)
283        backoff_handler = http_client_default_backoff_handler(
284            max_tries=max_tries, max_time=max_time
285        )
286        # backoff handlers wrap _send, so it will always return a response -- except when all retries are exhausted
287        try:
288            response = backoff_handler(rate_limit_backoff_handler(user_backoff_handler))(
289                request,
290                request_kwargs,
291                log_formatter=log_formatter,
292                exit_on_rate_limit=exit_on_rate_limit,
293            )  # type: ignore # mypy can't infer that backoff_handler wraps _send
294
295            return response
296        except BaseBackoffException as e:
297            self._logger.error(f"Retries exhausted with backoff exception.", exc_info=True)
298            raise AirbyteTracedException(
299                internal_message=f"Exhausted available request attempts. Exception: {e}",
300                message=f"Exhausted available request attempts. Please see logs for more details. Exception: {e}",
301                failure_type=e.failure_type or FailureType.system_error,
302                exception=e,
303                stream_descriptor=StreamDescriptor(name=self._name),
304            )
305
306    def _send(
307        self,
308        request: requests.PreparedRequest,
309        request_kwargs: Mapping[str, Any],
310        log_formatter: Optional[Callable[[requests.Response], Any]] = None,
311        exit_on_rate_limit: Optional[bool] = False,
312    ) -> requests.Response:
313        if request not in self._request_attempt_count:
314            self._request_attempt_count[request] = 1
315        else:
316            self._request_attempt_count[request] += 1
317            if hasattr(self._session, "auth") and isinstance(self._session.auth, AuthBase):
318                self._session.auth(request)
319
320        self._logger.debug(
321            "Making outbound API request",
322            extra={"headers": request.headers, "url": request.url, "request_body": request.body},
323        )
324
325        response: Optional[requests.Response] = None
326        exc: Optional[requests.RequestException] = None
327
328        try:
329            response = self._session.send(request, **request_kwargs)
330        except requests.RequestException as e:
331            exc = e
332
333        error_resolution: ErrorResolution = self._error_handler.interpret_response(
334            response if response is not None else exc
335        )
336
337        # Evaluation of response.text can be heavy, for example, if streaming a large response
338        # Do it only in debug mode
339        if self._logger.isEnabledFor(logging.DEBUG) and response is not None:
340            if request_kwargs.get("stream"):
341                self._logger.debug(
342                    "Receiving response, but not logging it as the response is streamed",
343                    extra={"headers": response.headers, "status": response.status_code},
344                )
345            else:
346                self._logger.debug(
347                    "Receiving response",
348                    extra={
349                        "headers": response.headers,
350                        "status": response.status_code,
351                        "body": response.text,
352                    },
353                )
354
355        # Request/response logging for declarative cdk
356        if (
357            log_formatter is not None
358            and response is not None
359            and self._message_repository is not None
360        ):
361            formatter = log_formatter
362            self._message_repository.log_message(
363                Level.DEBUG,
364                lambda: formatter(response),
365            )
366
367        self._handle_error_resolution(
368            response=response,
369            exc=exc,
370            request=request,
371            error_resolution=error_resolution,
372            exit_on_rate_limit=exit_on_rate_limit,
373        )
374
375        return response  # type: ignore # will either return a valid response of type requests.Response or raise an exception
376
377    def _get_response_body(self, response: requests.Response) -> Optional[JsonType]:
378        """
379        Extracts and returns the body of an HTTP response.
380
381        This method attempts to parse the response body as JSON. If the response
382        body is not valid JSON, it falls back to decoding the response content
383        as a UTF-8 string. If both attempts fail, it returns None.
384
385        Args:
386            response (requests.Response): The HTTP response object.
387
388        Returns:
389            Optional[JsonType]: The parsed JSON object as a string, the decoded
390            response content as a string, or None if both parsing attempts fail.
391        """
392        try:
393            return str(response.json())
394        except requests.exceptions.JSONDecodeError:
395            try:
396                return response.content.decode("utf-8")
397            except Exception:
398                return "The Content of the Response couldn't be decoded."
399
400    def _evict_key(self, prepared_request: requests.PreparedRequest) -> None:
401        """
402        Addresses high memory consumption when enabling concurrency in https://github.com/airbytehq/oncall/issues/6821.
403
404        The `_request_attempt_count` attribute keeps growing as multiple requests are made using the same `http_client`.
405        To mitigate this issue, we evict keys for completed requests once we confirm that no further retries are needed.
406        This helps manage memory usage more efficiently while maintaining the necessary logic for retry attempts.
407        """
408        if prepared_request in self._request_attempt_count:
409            del self._request_attempt_count[prepared_request]
410
411    def _handle_error_resolution(
412        self,
413        response: Optional[requests.Response],
414        exc: Optional[requests.RequestException],
415        request: requests.PreparedRequest,
416        error_resolution: ErrorResolution,
417        exit_on_rate_limit: Optional[bool] = False,
418    ) -> None:
419        if error_resolution.response_action not in self._ACTIONS_TO_RETRY_ON:
420            self._evict_key(request)
421
422        if error_resolution.response_action == ResponseAction.RESET_PAGINATION:
423            raise PaginationResetRequiredException()
424
425        # Emit stream status RUNNING with the reason RATE_LIMITED to log that the rate limit has been reached
426        if error_resolution.response_action == ResponseAction.RATE_LIMITED:
427            # TODO: Update to handle with message repository when concurrent message repository is ready
428            reasons = [AirbyteStreamStatusReason(type=AirbyteStreamStatusReasonType.RATE_LIMITED)]
429            message = orjson.dumps(
430                AirbyteMessageSerializer.dump(
431                    stream_status_as_airbyte_message(
432                        StreamDescriptor(name=self._name), AirbyteStreamStatus.RUNNING, reasons
433                    )
434                )
435            ).decode()
436
437            # Simply printing the stream status is a temporary solution and can cause future issues. Currently, the _send method is
438            # wrapped with backoff decorators, and we can only emit messages by iterating record_iterator in the abstract source at the
439            # end of the retry decorator behavior. This approach does not allow us to emit messages in the queue before exiting the
440            # backoff retry loop. Adding `\n` to the message and ignore 'end' ensure that few messages are printed at the same time.
441            print(f"{message}\n", end="", flush=True)
442
443        # Handle REFRESH_TOKEN_THEN_RETRY: Force refresh the OAuth token before retry
444        # This is useful when the API returns 401 but the stored token expiry hasn't been reached yet
445        # Only OAuth authenticators have refresh_and_set_access_token method
446        # Non-OAuth auth types (e.g., BearerAuthenticator) will fall through to normal retry
447        if error_resolution.response_action == ResponseAction.REFRESH_TOKEN_THEN_RETRY:
448            if (
449                hasattr(self._session, "auth")
450                and self._session.auth is not None
451                and hasattr(self._session.auth, "refresh_and_set_access_token")
452            ):
453                try:
454                    self._session.auth.refresh_and_set_access_token()  # type: ignore[union-attr]
455                    self._logger.info(
456                        "Refreshed OAuth token due to REFRESH_TOKEN_THEN_RETRY response action"
457                    )
458                except Exception as refresh_error:
459                    self._logger.warning(
460                        f"Failed to refresh OAuth token: {refresh_error}. Proceeding with retry using existing token."
461                    )
462            else:
463                self._logger.warning(
464                    "REFRESH_TOKEN_THEN_RETRY action received but authenticator does not support token refresh. "
465                    "Proceeding with normal retry."
466                )
467
468        if error_resolution.response_action == ResponseAction.FAIL:
469            if response is not None:
470                filtered_response_message = filter_secrets(
471                    f"Request (body): '{str(request.body)}'. Response (body): '{self._get_response_body(response)}'. Response (headers): '{response.headers}'."
472                )
473                error_message = f"'{request.method}' request to '{request.url}' failed with status code '{response.status_code}' and error message: '{self._error_message_parser.parse_response_error_message(response)}'. {filtered_response_message}"
474            else:
475                error_message = (
476                    f"'{request.method}' request to '{request.url}' failed with exception: '{exc}'"
477                )
478
479            # ensure the exception message is emitted before raised
480            self._logger.error(error_message)
481
482            raise AirbyteTracedException(
483                internal_message=error_message,
484                message=error_resolution.error_message or error_message,
485                failure_type=error_resolution.failure_type,
486            )
487
488        elif error_resolution.response_action == ResponseAction.IGNORE:
489            if response is not None:
490                log_message = f"Ignoring response for '{request.method}' request to '{request.url}' with response code '{response.status_code}'"
491            else:
492                log_message = f"Ignoring response for '{request.method}' request to '{request.url}' with error '{exc}'"
493
494            self._logger.info(error_resolution.error_message or log_message)
495
496        # TODO: Consider dynamic retry count depending on subsequent error codes
497        elif error_resolution.response_action in (
498            ResponseAction.RETRY,
499            ResponseAction.RATE_LIMITED,
500            ResponseAction.REFRESH_TOKEN_THEN_RETRY,
501        ):
502            user_defined_backoff_time = None
503            for backoff_strategy in self._backoff_strategies:
504                backoff_time = backoff_strategy.backoff_time(
505                    response_or_exception=response if response is not None else exc,
506                    attempt_count=self._request_attempt_count[request],
507                )
508                if backoff_time:
509                    user_defined_backoff_time = backoff_time
510                    break
511            error_message = (
512                error_resolution.error_message
513                or f"Request to {request.url} failed with failure type {error_resolution.failure_type}, response action {error_resolution.response_action}."
514            )
515
516            retry_endlessly = (
517                error_resolution.response_action == ResponseAction.RATE_LIMITED
518                and not exit_on_rate_limit
519            )
520
521            if user_defined_backoff_time:
522                raise UserDefinedBackoffException(
523                    backoff=user_defined_backoff_time,
524                    request=request,
525                    response=(response if response is not None else exc),
526                    error_message=error_message,
527                    failure_type=error_resolution.failure_type,
528                )
529
530            elif retry_endlessly:
531                raise RateLimitBackoffException(
532                    request=request,
533                    response=(response if response is not None else exc),
534                    error_message=error_message,
535                    failure_type=error_resolution.failure_type,
536                )
537
538            raise DefaultBackoffException(
539                request=request,
540                response=(response if response is not None else exc),
541                error_message=error_message,
542                failure_type=error_resolution.failure_type,
543            )
544
545        elif response:
546            try:
547                response.raise_for_status()
548            except requests.HTTPError as e:
549                self._logger.error(response.text)
550                raise e
551
552    @property
553    def name(self) -> str:
554        return self._name
555
556    def send_request(
557        self,
558        http_method: str,
559        url: str,
560        request_kwargs: Mapping[str, Any],
561        headers: Optional[Mapping[str, str]] = None,
562        params: Optional[Mapping[str, str]] = None,
563        json: Optional[Mapping[str, Any]] = None,
564        data: Optional[Union[str, Mapping[str, Any]]] = None,
565        dedupe_query_params: bool = False,
566        log_formatter: Optional[Callable[[requests.Response], Any]] = None,
567        exit_on_rate_limit: Optional[bool] = False,
568    ) -> Tuple[requests.PreparedRequest, requests.Response]:
569        """
570        Prepares and sends request and return request and response objects.
571        """
572
573        request: requests.PreparedRequest = self._create_prepared_request(
574            http_method=http_method,
575            url=url,
576            dedupe_query_params=dedupe_query_params,
577            headers=headers,
578            params=params,
579            json=json,
580            data=data,
581        )
582
583        env_settings = self._session.merge_environment_settings(
584            url=request.url,
585            proxies=request_kwargs.get("proxies", {}),
586            stream=request_kwargs.get("stream"),
587            verify=request_kwargs.get("verify"),
588            cert=request_kwargs.get("cert"),
589        )
590        request_kwargs = {**request_kwargs, **env_settings}
591
592        response: requests.Response = self._send_with_retry(
593            request=request,
594            request_kwargs=request_kwargs,
595            log_formatter=log_formatter,
596            exit_on_rate_limit=exit_on_rate_limit,
597        )
598
599        return request, response
HttpClient( name: str, logger: logging.Logger, error_handler: Optional[airbyte_cdk.sources.streams.http.error_handlers.ErrorHandler] = None, api_budget: Optional[airbyte_cdk.sources.streams.call_rate.APIBudget] = None, session: Union[requests.sessions.Session, requests_cache.session.CachedSession, NoneType] = None, authenticator: Optional[requests.auth.AuthBase] = None, use_cache: bool = False, backoff_strategy: Union[airbyte_cdk.BackoffStrategy, List[airbyte_cdk.BackoffStrategy], NoneType] = None, error_message_parser: Optional[airbyte_cdk.sources.streams.http.error_handlers.ErrorMessageParser] = None, disable_retries: bool = False, message_repository: Optional[airbyte_cdk.MessageRepository] = None)
 95    def __init__(
 96        self,
 97        name: str,
 98        logger: logging.Logger,
 99        error_handler: Optional[ErrorHandler] = None,
100        api_budget: Optional[APIBudget] = None,
101        session: Optional[Union[requests.Session, requests_cache.CachedSession]] = None,
102        authenticator: Optional[AuthBase] = None,
103        use_cache: bool = False,
104        backoff_strategy: Optional[Union[BackoffStrategy, List[BackoffStrategy]]] = None,
105        error_message_parser: Optional[ErrorMessageParser] = None,
106        disable_retries: bool = False,
107        message_repository: Optional[MessageRepository] = None,
108    ):
109        self._name = name
110        self._api_budget: APIBudget = api_budget or APIBudget(policies=[])
111        if session:
112            self._session = session
113        else:
114            self._use_cache = use_cache
115            self._session = self._request_session()
116            self._session.mount(
117                "https://",
118                requests.adapters.HTTPAdapter(
119                    pool_connections=MAX_CONNECTION_POOL_SIZE, pool_maxsize=MAX_CONNECTION_POOL_SIZE
120                ),
121            )
122        if isinstance(authenticator, AuthBase):
123            self._session.auth = authenticator
124        self._logger = logger
125        self._error_handler = error_handler or HttpStatusErrorHandler(self._logger)
126        if backoff_strategy is not None:
127            if isinstance(backoff_strategy, list):
128                self._backoff_strategies = backoff_strategy
129            else:
130                self._backoff_strategies = [backoff_strategy]
131        else:
132            self._backoff_strategies = [DefaultBackoffStrategy()]
133        self._error_message_parser = error_message_parser or JsonErrorMessageParser()
134        self._request_attempt_count: Dict[requests.PreparedRequest, int] = {}
135        self._disable_retries = disable_retries
136        self._message_repository = message_repository
cache_filename: str
138    @property
139    def cache_filename(self) -> str:
140        """
141        Override if needed. Return the name of cache file
142        Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.
143        """
144        return f"{self._name}.sqlite"

Override if needed. Return the name of cache file Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.

def clear_cache(self) -> None:
178    def clear_cache(self) -> None:
179        """
180        Clear cached requests for current session, can be called any time
181        """
182        if isinstance(self._session, requests_cache.CachedSession):
183            self._session.cache.clear()  # type: ignore # cache.clear is not typed

Clear cached requests for current session, can be called any time

name: str
552    @property
553    def name(self) -> str:
554        return self._name
def send_request( self, http_method: str, url: str, request_kwargs: Mapping[str, Any], headers: Optional[Mapping[str, str]] = None, params: Optional[Mapping[str, str]] = None, json: Optional[Mapping[str, Any]] = None, data: Union[str, Mapping[str, Any], NoneType] = None, dedupe_query_params: bool = False, log_formatter: Optional[Callable[[requests.models.Response], Any]] = None, exit_on_rate_limit: Optional[bool] = False) -> Tuple[requests.models.PreparedRequest, requests.models.Response]:
556    def send_request(
557        self,
558        http_method: str,
559        url: str,
560        request_kwargs: Mapping[str, Any],
561        headers: Optional[Mapping[str, str]] = None,
562        params: Optional[Mapping[str, str]] = None,
563        json: Optional[Mapping[str, Any]] = None,
564        data: Optional[Union[str, Mapping[str, Any]]] = None,
565        dedupe_query_params: bool = False,
566        log_formatter: Optional[Callable[[requests.Response], Any]] = None,
567        exit_on_rate_limit: Optional[bool] = False,
568    ) -> Tuple[requests.PreparedRequest, requests.Response]:
569        """
570        Prepares and sends request and return request and response objects.
571        """
572
573        request: requests.PreparedRequest = self._create_prepared_request(
574            http_method=http_method,
575            url=url,
576            dedupe_query_params=dedupe_query_params,
577            headers=headers,
578            params=params,
579            json=json,
580            data=data,
581        )
582
583        env_settings = self._session.merge_environment_settings(
584            url=request.url,
585            proxies=request_kwargs.get("proxies", {}),
586            stream=request_kwargs.get("stream"),
587            verify=request_kwargs.get("verify"),
588            cert=request_kwargs.get("cert"),
589        )
590        request_kwargs = {**request_kwargs, **env_settings}
591
592        response: requests.Response = self._send_with_retry(
593            request=request,
594            request_kwargs=request_kwargs,
595            log_formatter=log_formatter,
596            exit_on_rate_limit=exit_on_rate_limit,
597        )
598
599        return request, response

Prepares and sends request and return request and response objects.

 45class HttpStream(Stream, CheckpointMixin, ABC):
 46    """
 47    Base abstract class for an Airbyte Stream using the HTTP protocol. Basic building block for users building an Airbyte source for a HTTP API.
 48    """
 49
 50    source_defined_cursor = True  # Most HTTP streams use a source defined cursor (i.e: the user can't configure it like on a SQL table)
 51    page_size: Optional[int] = (
 52        None  # Use this variable to define page size for API http requests with pagination support
 53    )
 54
 55    def __init__(
 56        self, authenticator: Optional[AuthBase] = None, api_budget: Optional[APIBudget] = None
 57    ):
 58        self._exit_on_rate_limit: bool = False
 59        self._http_client = HttpClient(
 60            name=self.name,
 61            logger=self.logger,
 62            error_handler=self.get_error_handler(),
 63            api_budget=api_budget or APIBudget(policies=[]),
 64            authenticator=authenticator,
 65            use_cache=self.use_cache,
 66            backoff_strategy=self.get_backoff_strategy(),
 67            message_repository=InMemoryMessageRepository(),
 68        )
 69
 70        # There are three conditions that dictate if RFR should automatically be applied to a stream
 71        # 1. Streams that explicitly initialize their own cursor should defer to it and not automatically apply RFR
 72        # 2. Streams with at least one cursor_field are incremental and thus a superior sync to RFR.
 73        # 3. Streams overriding read_records() do not guarantee that they will call the parent implementation which can perform
 74        #    per-page checkpointing so RFR is only supported if a stream use the default `HttpStream.read_records()` method
 75        if (
 76            not self.cursor
 77            and len(self.cursor_field) == 0
 78            and type(self).read_records is HttpStream.read_records
 79        ):
 80            self.cursor = ResumableFullRefreshCursor()
 81
 82    @property
 83    def exit_on_rate_limit(self) -> bool:
 84        """
 85        :return: False if the stream will retry endlessly when rate limited
 86        """
 87        return self._exit_on_rate_limit
 88
 89    @exit_on_rate_limit.setter
 90    def exit_on_rate_limit(self, value: bool) -> None:
 91        self._exit_on_rate_limit = value
 92
 93    @property
 94    def cache_filename(self) -> str:
 95        """
 96        Override if needed. Return the name of cache file
 97        Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.
 98        """
 99        return f"{self.name}.sqlite"
100
101    @property
102    def use_cache(self) -> bool:
103        """
104        Override if needed. If True, all records will be cached.
105        Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.
106        """
107        return False
108
109    @property
110    @abstractmethod
111    def url_base(self) -> str:
112        """
113        :return: URL base for the  API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"
114        """
115
116    @property
117    def http_method(self) -> str:
118        """
119        Override if needed. See get_request_data/get_request_json if using POST/PUT/PATCH.
120        """
121        return "GET"
122
123    @property
124    @deprecated(
125        "Deprecated as of CDK version 3.0.0. "
126        "You should set error_handler explicitly in HttpStream.get_error_handler() instead."
127    )
128    def raise_on_http_errors(self) -> bool:
129        """
130        Override if needed. If set to False, allows opting-out of raising HTTP code exception.
131        """
132        return True
133
134    @property
135    @deprecated(
136        "Deprecated as of CDK version 3.0.0. "
137        "You should set backoff_strategies explicitly in HttpStream.get_backoff_strategy() instead."
138    )
139    def max_retries(self) -> Union[int, None]:
140        """
141        Override if needed. Specifies maximum amount of retries for backoff policy. Return None for no limit.
142        """
143        return 5
144
145    @property
146    @deprecated(
147        "Deprecated as of CDK version 3.0.0. "
148        "You should set backoff_strategies explicitly in HttpStream.get_backoff_strategy() instead."
149    )
150    def max_time(self) -> Union[int, None]:
151        """
152        Override if needed. Specifies maximum total waiting time (in seconds) for backoff policy. Return None for no limit.
153        """
154        return 60 * 10
155
156    @property
157    @deprecated(
158        "Deprecated as of CDK version 3.0.0. "
159        "You should set backoff_strategies explicitly in HttpStream.get_backoff_strategy() instead."
160    )
161    def retry_factor(self) -> float:
162        """
163        Override if needed. Specifies factor for backoff policy.
164        """
165        return 5
166
167    @abstractmethod
168    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
169        """
170        Override this method to define a pagination strategy.
171
172        The value returned from this method is passed to most other methods in this class. Use it to form a request e.g: set headers or query params.
173
174        :return: The token for the next page from the input response object. Returning None means there are no more pages to read in this response.
175        """
176
177    @abstractmethod
178    def path(
179        self,
180        *,
181        stream_state: Optional[Mapping[str, Any]] = None,
182        stream_slice: Optional[Mapping[str, Any]] = None,
183        next_page_token: Optional[Mapping[str, Any]] = None,
184    ) -> str:
185        """
186        Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity"
187        """
188
189    def request_params(
190        self,
191        stream_state: Optional[Mapping[str, Any]],
192        stream_slice: Optional[Mapping[str, Any]] = None,
193        next_page_token: Optional[Mapping[str, Any]] = None,
194    ) -> MutableMapping[str, Any]:
195        """
196        Override this method to define the query parameters that should be set on an outgoing HTTP request given the inputs.
197
198        E.g: you might want to define query parameters for paging if next_page_token is not None.
199        """
200        return {}
201
202    def request_headers(
203        self,
204        stream_state: Optional[Mapping[str, Any]],
205        stream_slice: Optional[Mapping[str, Any]] = None,
206        next_page_token: Optional[Mapping[str, Any]] = None,
207    ) -> Mapping[str, Any]:
208        """
209        Override to return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
210        """
211        return {}
212
213    def request_body_data(
214        self,
215        stream_state: Optional[Mapping[str, Any]],
216        stream_slice: Optional[Mapping[str, Any]] = None,
217        next_page_token: Optional[Mapping[str, Any]] = None,
218    ) -> Optional[Union[Mapping[str, Any], str]]:
219        """
220        Override when creating POST/PUT/PATCH requests to populate the body of the request with a non-JSON payload.
221
222        If returns a ready text that it will be sent as is.
223        If returns a dict that it will be converted to a urlencoded form.
224        E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
225
226        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
227        """
228        return None
229
230    def request_body_json(
231        self,
232        stream_state: Optional[Mapping[str, Any]],
233        stream_slice: Optional[Mapping[str, Any]] = None,
234        next_page_token: Optional[Mapping[str, Any]] = None,
235    ) -> Optional[Mapping[str, Any]]:
236        """
237        Override when creating POST/PUT/PATCH requests to populate the body of the request with a JSON payload.
238
239        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
240        """
241        return None
242
243    def request_kwargs(
244        self,
245        stream_state: Optional[Mapping[str, Any]],
246        stream_slice: Optional[Mapping[str, Any]] = None,
247        next_page_token: Optional[Mapping[str, Any]] = None,
248    ) -> Mapping[str, Any]:
249        """
250        Override to return a mapping of keyword arguments to be used when creating the HTTP request.
251        Any option listed in https://docs.python-requests.org/en/latest/api/#requests.adapters.BaseAdapter.send for can be returned from
252        this method. Note that these options do not conflict with request-level options such as headers, request params, etc..
253        """
254        return {}
255
256    @abstractmethod
257    def parse_response(
258        self,
259        response: requests.Response,
260        *,
261        stream_state: Mapping[str, Any],
262        stream_slice: Optional[Mapping[str, Any]] = None,
263        next_page_token: Optional[Mapping[str, Any]] = None,
264    ) -> Iterable[Mapping[str, Any]]:
265        """
266        Parses the raw response object into a list of records.
267        By default, this returns an iterable containing the input. Override to parse differently.
268        :param response:
269        :param stream_state:
270        :param stream_slice:
271        :param next_page_token:
272        :return: An iterable containing the parsed response
273        """
274
275    def get_backoff_strategy(self) -> Optional[Union[BackoffStrategy, List[BackoffStrategy]]]:
276        """
277        Used to initialize Adapter to avoid breaking changes.
278        If Stream has a `backoff_time` method implementation, we know this stream uses old (pre-HTTPClient) backoff handlers and thus an adapter is needed.
279
280        Override to provide custom BackoffStrategy
281        :return Optional[BackoffStrategy]:
282        """
283        if hasattr(self, "backoff_time"):
284            return HttpStreamAdapterBackoffStrategy(self)
285        else:
286            return None
287
288    def get_error_handler(self) -> Optional[ErrorHandler]:
289        """
290        Used to initialize Adapter to avoid breaking changes.
291        If Stream has a `should_retry` method implementation, we know this stream uses old (pre-HTTPClient) error handlers and thus an adapter is needed.
292
293        Override to provide custom ErrorHandler
294        :return Optional[ErrorHandler]:
295        """
296        if hasattr(self, "should_retry"):
297            error_handler = HttpStreamAdapterHttpStatusErrorHandler(
298                stream=self,
299                logger=logging.getLogger(),
300                max_retries=self.max_retries,
301                max_time=timedelta(seconds=self.max_time or 0),
302            )
303            return error_handler
304        else:
305            return None
306
307    @classmethod
308    def _join_url(cls, url_base: str, path: str) -> str:
309        return urljoin(url_base, path)
310
311    @classmethod
312    def parse_response_error_message(cls, response: requests.Response) -> Optional[str]:
313        """
314        Parses the raw response object from a failed request into a user-friendly error message.
315        By default, this method tries to grab the error message from JSON responses by following common API patterns. Override to parse differently.
316
317        :param response:
318        :return: A user-friendly message that indicates the cause of the error
319        """
320
321        # default logic to grab error from common fields
322        def _try_get_error(value: Optional[JsonType]) -> Optional[str]:
323            if isinstance(value, str):
324                return value
325            elif isinstance(value, list):
326                errors_in_value = [_try_get_error(v) for v in value]
327                return ", ".join(v for v in errors_in_value if v is not None)
328            elif isinstance(value, dict):
329                new_value = (
330                    value.get("message")
331                    or value.get("messages")
332                    or value.get("error")
333                    or value.get("errors")
334                    or value.get("failures")
335                    or value.get("failure")
336                    or value.get("detail")
337                )
338                return _try_get_error(new_value)
339            return None
340
341        try:
342            body = response.json()
343            return _try_get_error(body)
344        except requests.exceptions.JSONDecodeError:
345            return None
346
347    def get_error_display_message(self, exception: BaseException) -> Optional[str]:
348        """
349        Retrieves the user-friendly display message that corresponds to an exception.
350        This will be called when encountering an exception while reading records from the stream, and used to build the AirbyteTraceMessage.
351
352        The default implementation of this method only handles HTTPErrors by passing the response to self.parse_response_error_message().
353        The method should be overriden as needed to handle any additional exception types.
354
355        :param exception: The exception that was raised
356        :return: A user-friendly message that indicates the cause of the error
357        """
358        if isinstance(exception, requests.HTTPError) and exception.response is not None:
359            return self.parse_response_error_message(exception.response)
360        return None
361
362    def read_records(
363        self,
364        sync_mode: SyncMode,
365        cursor_field: Optional[List[str]] = None,
366        stream_slice: Optional[Mapping[str, Any]] = None,
367        stream_state: Optional[Mapping[str, Any]] = None,
368    ) -> Iterable[StreamData]:
369        # A cursor_field indicates this is an incremental stream which offers better checkpointing than RFR enabled via the cursor
370        if self.cursor_field or not isinstance(self.get_cursor(), ResumableFullRefreshCursor):
371            yield from self._read_pages(
372                lambda req, res, state, _slice: self.parse_response(
373                    res, stream_slice=_slice, stream_state=state
374                ),
375                stream_slice,
376                stream_state,
377            )
378        else:
379            yield from self._read_single_page(
380                lambda req, res, state, _slice: self.parse_response(
381                    res, stream_slice=_slice, stream_state=state
382                ),
383                stream_slice,
384                stream_state,
385            )
386
387    @property
388    def state(self) -> MutableMapping[str, Any]:
389        cursor = self.get_cursor()
390        if cursor:
391            return cursor.get_stream_state()  # type: ignore
392        return self._state
393
394    @state.setter
395    def state(self, value: MutableMapping[str, Any]) -> None:
396        cursor = self.get_cursor()
397        if cursor:
398            cursor.set_initial_state(value)
399        self._state = value
400
401    def get_cursor(self) -> Optional[Cursor]:
402        # I don't love that this is semi-stateful but not sure what else to do. We don't know exactly what type of cursor to
403        # instantiate when creating the class. We can make a few assumptions like if there is a cursor_field which implies
404        # incremental, but we don't know until runtime if this is a substream. Ideally, a stream should explicitly define
405        # its cursor, but because we're trying to automatically apply RFR we're stuck with this logic where we replace the
406        # cursor at runtime once we detect this is a substream based on self.has_multiple_slices being reassigned
407        if self.has_multiple_slices and isinstance(self.cursor, ResumableFullRefreshCursor):
408            self.cursor = SubstreamResumableFullRefreshCursor()
409            return self.cursor
410        else:
411            return self.cursor
412
413    def _read_pages(
414        self,
415        records_generator_fn: Callable[
416            [
417                requests.PreparedRequest,
418                requests.Response,
419                Mapping[str, Any],
420                Optional[Mapping[str, Any]],
421            ],
422            Iterable[StreamData],
423        ],
424        stream_slice: Optional[Mapping[str, Any]] = None,
425        stream_state: Optional[Mapping[str, Any]] = None,
426    ) -> Iterable[StreamData]:
427        stream_state = stream_state or {}
428        pagination_complete = False
429        next_page_token = None
430        while not pagination_complete:
431            request, response = self._fetch_next_page(stream_slice, stream_state, next_page_token)
432            yield from records_generator_fn(request, response, stream_state, stream_slice)
433
434            next_page_token = self.next_page_token(response)
435            if not next_page_token:
436                pagination_complete = True
437
438        cursor = self.get_cursor()
439        if cursor and isinstance(cursor, SubstreamResumableFullRefreshCursor):
440            partition, _, _ = self._extract_slice_fields(stream_slice=stream_slice)
441            # Substreams checkpoint state by marking an entire parent partition as completed so that on the subsequent attempt
442            # after a failure, completed parents are skipped and the sync can make progress
443            cursor.close_slice(StreamSlice(cursor_slice={}, partition=partition))
444
445        # Always return an empty generator just in case no records were ever yielded
446        yield from []
447
448    def _read_single_page(
449        self,
450        records_generator_fn: Callable[
451            [
452                requests.PreparedRequest,
453                requests.Response,
454                Mapping[str, Any],
455                Optional[Mapping[str, Any]],
456            ],
457            Iterable[StreamData],
458        ],
459        stream_slice: Optional[Mapping[str, Any]] = None,
460        stream_state: Optional[Mapping[str, Any]] = None,
461    ) -> Iterable[StreamData]:
462        partition, cursor_slice, remaining_slice = self._extract_slice_fields(
463            stream_slice=stream_slice
464        )
465        stream_state = stream_state or {}
466        next_page_token = cursor_slice or None
467
468        request, response = self._fetch_next_page(remaining_slice, stream_state, next_page_token)
469        yield from records_generator_fn(request, response, stream_state, remaining_slice)
470
471        next_page_token = self.next_page_token(response) or {
472            "__ab_full_refresh_sync_complete": True
473        }
474
475        cursor = self.get_cursor()
476        if cursor:
477            cursor.close_slice(StreamSlice(cursor_slice=next_page_token, partition=partition))
478
479        # Always return an empty generator just in case no records were ever yielded
480        yield from []
481
482    @staticmethod
483    def _extract_slice_fields(
484        stream_slice: Optional[Mapping[str, Any]],
485    ) -> tuple[Mapping[str, Any], Mapping[str, Any], Mapping[str, Any]]:
486        if not stream_slice:
487            return {}, {}, {}
488
489        if isinstance(stream_slice, StreamSlice):
490            partition = stream_slice.partition
491            cursor_slice = stream_slice.cursor_slice
492            remaining = {k: v for k, v in stream_slice.items()}
493        else:
494            # RFR streams that implement stream_slices() to generate stream slices in the legacy mapping format are converted into a
495            # structured stream slice mapping by the LegacyCursorBasedCheckpointReader. The structured mapping object has separate
496            # fields for the partition and cursor_slice value
497            partition = stream_slice.get("partition", {})
498            cursor_slice = stream_slice.get("cursor_slice", {})
499            remaining = {
500                key: val
501                for key, val in stream_slice.items()
502                if key != "partition" and key != "cursor_slice"
503            }
504        return partition, cursor_slice, remaining
505
506    def _fetch_next_page(
507        self,
508        stream_slice: Optional[Mapping[str, Any]] = None,
509        stream_state: Optional[Mapping[str, Any]] = None,
510        next_page_token: Optional[Mapping[str, Any]] = None,
511    ) -> Tuple[requests.PreparedRequest, requests.Response]:
512        request, response = self._http_client.send_request(
513            http_method=self.http_method,
514            url=self._join_url(
515                self.url_base,
516                self.path(
517                    stream_state=stream_state,
518                    stream_slice=stream_slice,
519                    next_page_token=next_page_token,
520                ),
521            ),
522            request_kwargs=self.request_kwargs(
523                stream_state=stream_state,
524                stream_slice=stream_slice,
525                next_page_token=next_page_token,
526            ),
527            headers=self.request_headers(
528                stream_state=stream_state,
529                stream_slice=stream_slice,
530                next_page_token=next_page_token,
531            ),
532            params=self.request_params(
533                stream_state=stream_state,
534                stream_slice=stream_slice,
535                next_page_token=next_page_token,
536            ),
537            json=self.request_body_json(
538                stream_state=stream_state,
539                stream_slice=stream_slice,
540                next_page_token=next_page_token,
541            ),
542            data=self.request_body_data(
543                stream_state=stream_state,
544                stream_slice=stream_slice,
545                next_page_token=next_page_token,
546            ),
547            dedupe_query_params=True,
548            log_formatter=self.get_log_formatter(),
549            exit_on_rate_limit=self.exit_on_rate_limit,
550        )
551
552        return request, response
553
554    def get_log_formatter(self) -> Optional[Callable[[requests.Response], Any]]:
555        """
556
557        :return Optional[Callable[[requests.Response], Any]]: Function that will be used in logging inside HttpClient
558        """
559        return None

Base abstract class for an Airbyte Stream using the HTTP protocol. Basic building block for users building an Airbyte source for a HTTP API.

source_defined_cursor = True

Return False if the cursor can be configured by the user.

page_size: Optional[int] = None
exit_on_rate_limit: bool
82    @property
83    def exit_on_rate_limit(self) -> bool:
84        """
85        :return: False if the stream will retry endlessly when rate limited
86        """
87        return self._exit_on_rate_limit
Returns

False if the stream will retry endlessly when rate limited

cache_filename: str
93    @property
94    def cache_filename(self) -> str:
95        """
96        Override if needed. Return the name of cache file
97        Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.
98        """
99        return f"{self.name}.sqlite"

Override if needed. Return the name of cache file Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.

use_cache: bool
101    @property
102    def use_cache(self) -> bool:
103        """
104        Override if needed. If True, all records will be cached.
105        Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.
106        """
107        return False

Override if needed. If True, all records will be cached. Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.

url_base: str
109    @property
110    @abstractmethod
111    def url_base(self) -> str:
112        """
113        :return: URL base for the  API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"
114        """
Returns

URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"

http_method: str
116    @property
117    def http_method(self) -> str:
118        """
119        Override if needed. See get_request_data/get_request_json if using POST/PUT/PATCH.
120        """
121        return "GET"

Override if needed. See get_request_data/get_request_json if using POST/PUT/PATCH.

raise_on_http_errors: bool
123    @property
124    @deprecated(
125        "Deprecated as of CDK version 3.0.0. "
126        "You should set error_handler explicitly in HttpStream.get_error_handler() instead."
127    )
128    def raise_on_http_errors(self) -> bool:
129        """
130        Override if needed. If set to False, allows opting-out of raising HTTP code exception.
131        """
132        return True

Override if needed. If set to False, allows opting-out of raising HTTP code exception.

max_retries: Optional[int]
134    @property
135    @deprecated(
136        "Deprecated as of CDK version 3.0.0. "
137        "You should set backoff_strategies explicitly in HttpStream.get_backoff_strategy() instead."
138    )
139    def max_retries(self) -> Union[int, None]:
140        """
141        Override if needed. Specifies maximum amount of retries for backoff policy. Return None for no limit.
142        """
143        return 5

Override if needed. Specifies maximum amount of retries for backoff policy. Return None for no limit.

max_time: Optional[int]
145    @property
146    @deprecated(
147        "Deprecated as of CDK version 3.0.0. "
148        "You should set backoff_strategies explicitly in HttpStream.get_backoff_strategy() instead."
149    )
150    def max_time(self) -> Union[int, None]:
151        """
152        Override if needed. Specifies maximum total waiting time (in seconds) for backoff policy. Return None for no limit.
153        """
154        return 60 * 10

Override if needed. Specifies maximum total waiting time (in seconds) for backoff policy. Return None for no limit.

retry_factor: float
156    @property
157    @deprecated(
158        "Deprecated as of CDK version 3.0.0. "
159        "You should set backoff_strategies explicitly in HttpStream.get_backoff_strategy() instead."
160    )
161    def retry_factor(self) -> float:
162        """
163        Override if needed. Specifies factor for backoff policy.
164        """
165        return 5

Override if needed. Specifies factor for backoff policy.

@abstractmethod
def next_page_token(self, response: requests.models.Response) -> Optional[Mapping[str, Any]]:
167    @abstractmethod
168    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
169        """
170        Override this method to define a pagination strategy.
171
172        The value returned from this method is passed to most other methods in this class. Use it to form a request e.g: set headers or query params.
173
174        :return: The token for the next page from the input response object. Returning None means there are no more pages to read in this response.
175        """

Override this method to define a pagination strategy.

The value returned from this method is passed to most other methods in this class. Use it to form a request e.g: set headers or query params.

Returns

The token for the next page from the input response object. Returning None means there are no more pages to read in this response.

@abstractmethod
def path( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> str:
177    @abstractmethod
178    def path(
179        self,
180        *,
181        stream_state: Optional[Mapping[str, Any]] = None,
182        stream_slice: Optional[Mapping[str, Any]] = None,
183        next_page_token: Optional[Mapping[str, Any]] = None,
184    ) -> str:
185        """
186        Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity"
187        """

Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity"

def request_params( self, stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> MutableMapping[str, Any]:
189    def request_params(
190        self,
191        stream_state: Optional[Mapping[str, Any]],
192        stream_slice: Optional[Mapping[str, Any]] = None,
193        next_page_token: Optional[Mapping[str, Any]] = None,
194    ) -> MutableMapping[str, Any]:
195        """
196        Override this method to define the query parameters that should be set on an outgoing HTTP request given the inputs.
197
198        E.g: you might want to define query parameters for paging if next_page_token is not None.
199        """
200        return {}

Override this method to define the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def request_headers( self, stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
202    def request_headers(
203        self,
204        stream_state: Optional[Mapping[str, Any]],
205        stream_slice: Optional[Mapping[str, Any]] = None,
206        next_page_token: Optional[Mapping[str, Any]] = None,
207    ) -> Mapping[str, Any]:
208        """
209        Override to return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
210        """
211        return {}

Override to return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def request_body_data( self, stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Union[str, Mapping[str, Any], NoneType]:
213    def request_body_data(
214        self,
215        stream_state: Optional[Mapping[str, Any]],
216        stream_slice: Optional[Mapping[str, Any]] = None,
217        next_page_token: Optional[Mapping[str, Any]] = None,
218    ) -> Optional[Union[Mapping[str, Any], str]]:
219        """
220        Override when creating POST/PUT/PATCH requests to populate the body of the request with a non-JSON payload.
221
222        If returns a ready text that it will be sent as is.
223        If returns a dict that it will be converted to a urlencoded form.
224        E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
225
226        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
227        """
228        return None

Override when creating POST/PUT/PATCH requests to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def request_body_json( self, stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Optional[Mapping[str, Any]]:
230    def request_body_json(
231        self,
232        stream_state: Optional[Mapping[str, Any]],
233        stream_slice: Optional[Mapping[str, Any]] = None,
234        next_page_token: Optional[Mapping[str, Any]] = None,
235    ) -> Optional[Mapping[str, Any]]:
236        """
237        Override when creating POST/PUT/PATCH requests to populate the body of the request with a JSON payload.
238
239        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
240        """
241        return None

Override when creating POST/PUT/PATCH requests to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def request_kwargs( self, stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
243    def request_kwargs(
244        self,
245        stream_state: Optional[Mapping[str, Any]],
246        stream_slice: Optional[Mapping[str, Any]] = None,
247        next_page_token: Optional[Mapping[str, Any]] = None,
248    ) -> Mapping[str, Any]:
249        """
250        Override to return a mapping of keyword arguments to be used when creating the HTTP request.
251        Any option listed in https://docs.python-requests.org/en/latest/api/#requests.adapters.BaseAdapter.send for can be returned from
252        this method. Note that these options do not conflict with request-level options such as headers, request params, etc..
253        """
254        return {}

Override to return a mapping of keyword arguments to be used when creating the HTTP request. Any option listed in https://docs.python-requests.org/en/latest/api/#requests.adapters.BaseAdapter.send for can be returned from this method. Note that these options do not conflict with request-level options such as headers, request params, etc..

@abstractmethod
def parse_response( self, response: requests.models.Response, *, stream_state: Mapping[str, Any], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Iterable[Mapping[str, Any]]:
256    @abstractmethod
257    def parse_response(
258        self,
259        response: requests.Response,
260        *,
261        stream_state: Mapping[str, Any],
262        stream_slice: Optional[Mapping[str, Any]] = None,
263        next_page_token: Optional[Mapping[str, Any]] = None,
264    ) -> Iterable[Mapping[str, Any]]:
265        """
266        Parses the raw response object into a list of records.
267        By default, this returns an iterable containing the input. Override to parse differently.
268        :param response:
269        :param stream_state:
270        :param stream_slice:
271        :param next_page_token:
272        :return: An iterable containing the parsed response
273        """

Parses the raw response object into a list of records. By default, this returns an iterable containing the input. Override to parse differently.

Parameters
  • response:
  • stream_state:
  • stream_slice:
  • next_page_token:
Returns

An iterable containing the parsed response

def get_backoff_strategy( self) -> Union[airbyte_cdk.BackoffStrategy, List[airbyte_cdk.BackoffStrategy], NoneType]:
275    def get_backoff_strategy(self) -> Optional[Union[BackoffStrategy, List[BackoffStrategy]]]:
276        """
277        Used to initialize Adapter to avoid breaking changes.
278        If Stream has a `backoff_time` method implementation, we know this stream uses old (pre-HTTPClient) backoff handlers and thus an adapter is needed.
279
280        Override to provide custom BackoffStrategy
281        :return Optional[BackoffStrategy]:
282        """
283        if hasattr(self, "backoff_time"):
284            return HttpStreamAdapterBackoffStrategy(self)
285        else:
286            return None

Used to initialize Adapter to avoid breaking changes. If Stream has a backoff_time method implementation, we know this stream uses old (pre-HTTPClient) backoff handlers and thus an adapter is needed.

Override to provide custom BackoffStrategy

Returns
def get_error_handler( self) -> Optional[airbyte_cdk.sources.streams.http.error_handlers.ErrorHandler]:
288    def get_error_handler(self) -> Optional[ErrorHandler]:
289        """
290        Used to initialize Adapter to avoid breaking changes.
291        If Stream has a `should_retry` method implementation, we know this stream uses old (pre-HTTPClient) error handlers and thus an adapter is needed.
292
293        Override to provide custom ErrorHandler
294        :return Optional[ErrorHandler]:
295        """
296        if hasattr(self, "should_retry"):
297            error_handler = HttpStreamAdapterHttpStatusErrorHandler(
298                stream=self,
299                logger=logging.getLogger(),
300                max_retries=self.max_retries,
301                max_time=timedelta(seconds=self.max_time or 0),
302            )
303            return error_handler
304        else:
305            return None

Used to initialize Adapter to avoid breaking changes. If Stream has a should_retry method implementation, we know this stream uses old (pre-HTTPClient) error handlers and thus an adapter is needed.

Override to provide custom ErrorHandler

Returns
@classmethod
def parse_response_error_message(cls, response: requests.models.Response) -> Optional[str]:
311    @classmethod
312    def parse_response_error_message(cls, response: requests.Response) -> Optional[str]:
313        """
314        Parses the raw response object from a failed request into a user-friendly error message.
315        By default, this method tries to grab the error message from JSON responses by following common API patterns. Override to parse differently.
316
317        :param response:
318        :return: A user-friendly message that indicates the cause of the error
319        """
320
321        # default logic to grab error from common fields
322        def _try_get_error(value: Optional[JsonType]) -> Optional[str]:
323            if isinstance(value, str):
324                return value
325            elif isinstance(value, list):
326                errors_in_value = [_try_get_error(v) for v in value]
327                return ", ".join(v for v in errors_in_value if v is not None)
328            elif isinstance(value, dict):
329                new_value = (
330                    value.get("message")
331                    or value.get("messages")
332                    or value.get("error")
333                    or value.get("errors")
334                    or value.get("failures")
335                    or value.get("failure")
336                    or value.get("detail")
337                )
338                return _try_get_error(new_value)
339            return None
340
341        try:
342            body = response.json()
343            return _try_get_error(body)
344        except requests.exceptions.JSONDecodeError:
345            return None

Parses the raw response object from a failed request into a user-friendly error message. By default, this method tries to grab the error message from JSON responses by following common API patterns. Override to parse differently.

Parameters
  • response:
Returns

A user-friendly message that indicates the cause of the error

def get_error_display_message(self, exception: BaseException) -> Optional[str]:
347    def get_error_display_message(self, exception: BaseException) -> Optional[str]:
348        """
349        Retrieves the user-friendly display message that corresponds to an exception.
350        This will be called when encountering an exception while reading records from the stream, and used to build the AirbyteTraceMessage.
351
352        The default implementation of this method only handles HTTPErrors by passing the response to self.parse_response_error_message().
353        The method should be overriden as needed to handle any additional exception types.
354
355        :param exception: The exception that was raised
356        :return: A user-friendly message that indicates the cause of the error
357        """
358        if isinstance(exception, requests.HTTPError) and exception.response is not None:
359            return self.parse_response_error_message(exception.response)
360        return None

Retrieves the user-friendly display message that corresponds to an exception. This will be called when encountering an exception while reading records from the stream, and used to build the AirbyteTraceMessage.

The default implementation of this method only handles HTTPErrors by passing the response to self.parse_response_error_message(). The method should be overriden as needed to handle any additional exception types.

Parameters
  • exception: The exception that was raised
Returns

A user-friendly message that indicates the cause of the error

def read_records( self, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]] = None, stream_slice: Optional[Mapping[str, Any]] = None, stream_state: Optional[Mapping[str, Any]] = None) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
362    def read_records(
363        self,
364        sync_mode: SyncMode,
365        cursor_field: Optional[List[str]] = None,
366        stream_slice: Optional[Mapping[str, Any]] = None,
367        stream_state: Optional[Mapping[str, Any]] = None,
368    ) -> Iterable[StreamData]:
369        # A cursor_field indicates this is an incremental stream which offers better checkpointing than RFR enabled via the cursor
370        if self.cursor_field or not isinstance(self.get_cursor(), ResumableFullRefreshCursor):
371            yield from self._read_pages(
372                lambda req, res, state, _slice: self.parse_response(
373                    res, stream_slice=_slice, stream_state=state
374                ),
375                stream_slice,
376                stream_state,
377            )
378        else:
379            yield from self._read_single_page(
380                lambda req, res, state, _slice: self.parse_response(
381                    res, stream_slice=_slice, stream_state=state
382                ),
383                stream_slice,
384                stream_state,
385            )

This method should be overridden by subclasses to read records based on the inputs

state: MutableMapping[str, Any]
387    @property
388    def state(self) -> MutableMapping[str, Any]:
389        cursor = self.get_cursor()
390        if cursor:
391            return cursor.get_stream_state()  # type: ignore
392        return self._state

State getter, should return state in form that can serialized to a string and send to the output as a STATE AirbyteMessage.

A good example of a state is a cursor_value: { self.cursor_field: "cursor_value" }

State should try to be as small as possible but at the same time descriptive enough to restore syncing process from the point where it stopped.

def get_cursor(self) -> Optional[airbyte_cdk.sources.streams.checkpoint.Cursor]:
401    def get_cursor(self) -> Optional[Cursor]:
402        # I don't love that this is semi-stateful but not sure what else to do. We don't know exactly what type of cursor to
403        # instantiate when creating the class. We can make a few assumptions like if there is a cursor_field which implies
404        # incremental, but we don't know until runtime if this is a substream. Ideally, a stream should explicitly define
405        # its cursor, but because we're trying to automatically apply RFR we're stuck with this logic where we replace the
406        # cursor at runtime once we detect this is a substream based on self.has_multiple_slices being reassigned
407        if self.has_multiple_slices and isinstance(self.cursor, ResumableFullRefreshCursor):
408            self.cursor = SubstreamResumableFullRefreshCursor()
409            return self.cursor
410        else:
411            return self.cursor

A Cursor is an interface that a stream can implement to manage how its internal state is read and updated while reading records. Historically, Python connectors had no concept of a cursor to manage state. Python streams need to define a cursor implementation and override this method to manage state through a Cursor.

def get_log_formatter(self) -> Optional[Callable[[requests.models.Response], Any]]:
554    def get_log_formatter(self) -> Optional[Callable[[requests.Response], Any]]:
555        """
556
557        :return Optional[Callable[[requests.Response], Any]]: Function that will be used in logging inside HttpClient
558        """
559        return None
Returns

Function that will be used in logging inside HttpClient

class HttpSubStream(airbyte_cdk.sources.streams.http.HttpStream, abc.ABC):
562class HttpSubStream(HttpStream, ABC):
563    def __init__(self, parent: HttpStream, **kwargs: Any):
564        """
565        :param parent: should be the instance of HttpStream class
566        """
567        super().__init__(**kwargs)
568        self.parent = parent
569        self.has_multiple_slices = (
570            True  # Substreams are based on parent records which implies there are multiple slices
571        )
572
573        # There are three conditions that dictate if RFR should automatically be applied to a stream
574        # 1. Streams that explicitly initialize their own cursor should defer to it and not automatically apply RFR
575        # 2. Streams with at least one cursor_field are incremental and thus a superior sync to RFR.
576        # 3. Streams overriding read_records() do not guarantee that they will call the parent implementation which can perform
577        #    per-page checkpointing so RFR is only supported if a stream use the default `HttpStream.read_records()` method
578        if (
579            not self.cursor
580            and len(self.cursor_field) == 0
581            and type(self).read_records is HttpStream.read_records
582        ):
583            self.cursor = SubstreamResumableFullRefreshCursor()
584
585    def stream_slices(
586        self,
587        sync_mode: SyncMode,
588        cursor_field: Optional[List[str]] = None,
589        stream_state: Optional[Mapping[str, Any]] = None,
590    ) -> Iterable[Optional[Mapping[str, Any]]]:
591        # read_stateless() assumes the parent is not concurrent. This is currently okay since the concurrent CDK does
592        # not support either substreams or RFR, but something that needs to be considered once we do
593        for parent_record in self.parent.read_only_records(stream_state):
594            # Skip non-records (eg AirbyteLogMessage)
595            if isinstance(parent_record, AirbyteMessage):
596                if parent_record.type == MessageType.RECORD:
597                    parent_record = parent_record.record.data  # type: ignore [assignment, union-attr]  # Incorrect type for assignment
598                else:
599                    continue
600            elif isinstance(parent_record, Record):
601                parent_record = parent_record.data
602            yield {"parent": parent_record}

Base abstract class for an Airbyte Stream using the HTTP protocol. Basic building block for users building an Airbyte source for a HTTP API.

HttpSubStream( parent: HttpStream, **kwargs: Any)
563    def __init__(self, parent: HttpStream, **kwargs: Any):
564        """
565        :param parent: should be the instance of HttpStream class
566        """
567        super().__init__(**kwargs)
568        self.parent = parent
569        self.has_multiple_slices = (
570            True  # Substreams are based on parent records which implies there are multiple slices
571        )
572
573        # There are three conditions that dictate if RFR should automatically be applied to a stream
574        # 1. Streams that explicitly initialize their own cursor should defer to it and not automatically apply RFR
575        # 2. Streams with at least one cursor_field are incremental and thus a superior sync to RFR.
576        # 3. Streams overriding read_records() do not guarantee that they will call the parent implementation which can perform
577        #    per-page checkpointing so RFR is only supported if a stream use the default `HttpStream.read_records()` method
578        if (
579            not self.cursor
580            and len(self.cursor_field) == 0
581            and type(self).read_records is HttpStream.read_records
582        ):
583            self.cursor = SubstreamResumableFullRefreshCursor()
Parameters
  • parent: should be the instance of HttpStream class
parent
has_multiple_slices = False
def stream_slices( self, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None) -> Iterable[Optional[Mapping[str, Any]]]:
585    def stream_slices(
586        self,
587        sync_mode: SyncMode,
588        cursor_field: Optional[List[str]] = None,
589        stream_state: Optional[Mapping[str, Any]] = None,
590    ) -> Iterable[Optional[Mapping[str, Any]]]:
591        # read_stateless() assumes the parent is not concurrent. This is currently okay since the concurrent CDK does
592        # not support either substreams or RFR, but something that needs to be considered once we do
593        for parent_record in self.parent.read_only_records(stream_state):
594            # Skip non-records (eg AirbyteLogMessage)
595            if isinstance(parent_record, AirbyteMessage):
596                if parent_record.type == MessageType.RECORD:
597                    parent_record = parent_record.record.data  # type: ignore [assignment, union-attr]  # Incorrect type for assignment
598                else:
599                    continue
600            elif isinstance(parent_record, Record):
601                parent_record = parent_record.data
602            yield {"parent": parent_record}

Override to define the slices for this stream. See the stream slicing section of the docs for more information.

Parameters
  • sync_mode:
  • cursor_field:
  • stream_state:
Returns
class UserDefinedBackoffException(airbyte_cdk.sources.streams.http.exceptions.BaseBackoffException):
40class UserDefinedBackoffException(BaseBackoffException):
41    """
42    An exception that exposes how long it attempted to backoff
43    """
44
45    def __init__(
46        self,
47        backoff: Union[int, float],
48        request: requests.PreparedRequest,
49        response: Optional[Union[requests.Response, Exception]],
50        error_message: str = "",
51        failure_type: Optional[FailureType] = None,
52    ):
53        """
54        :param backoff: how long to backoff in seconds
55        :param request: the request that triggered this backoff exception
56        :param response: the response that triggered the backoff exception
57        """
58        self.backoff = backoff
59        super().__init__(
60            request=request,
61            response=response,
62            error_message=error_message,
63            failure_type=failure_type,
64        )

An exception that exposes how long it attempted to backoff

UserDefinedBackoffException( backoff: Union[int, float], request: requests.models.PreparedRequest, response: Union[requests.models.Response, Exception, NoneType], error_message: str = '', failure_type: Optional[airbyte_protocol_dataclasses.models.airbyte_protocol.FailureType] = None)
45    def __init__(
46        self,
47        backoff: Union[int, float],
48        request: requests.PreparedRequest,
49        response: Optional[Union[requests.Response, Exception]],
50        error_message: str = "",
51        failure_type: Optional[FailureType] = None,
52    ):
53        """
54        :param backoff: how long to backoff in seconds
55        :param request: the request that triggered this backoff exception
56        :param response: the response that triggered the backoff exception
57        """
58        self.backoff = backoff
59        super().__init__(
60            request=request,
61            response=response,
62            error_message=error_message,
63            failure_type=failure_type,
64        )
Parameters
  • backoff: how long to backoff in seconds
  • request: the request that triggered this backoff exception
  • response: the response that triggered the backoff exception
backoff