airbyte_cdk.sources.streams.http

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5# Initialize Streams Package
 6from .exceptions import UserDefinedBackoffException
 7from .http import HttpStream, HttpSubStream
 8from .http_client import HttpClient
 9
10__all__ = ["HttpClient", "HttpStream", "HttpSubStream", "UserDefinedBackoffException"]
class HttpClient:
103class HttpClient:
104    _DEFAULT_MAX_RETRY: int = 5
105    _DEFAULT_MAX_TIME: int = 60 * 10
106    _ACTIONS_TO_RETRY_ON = {ResponseAction.RETRY, ResponseAction.RATE_LIMITED}
107
108    def __init__(
109        self,
110        name: str,
111        logger: logging.Logger,
112        error_handler: Optional[ErrorHandler] = None,
113        api_budget: Optional[APIBudget] = None,
114        session: Optional[Union[requests.Session, requests_cache.CachedSession]] = None,
115        authenticator: Optional[AuthBase] = None,
116        use_cache: bool = False,
117        backoff_strategy: Optional[Union[BackoffStrategy, List[BackoffStrategy]]] = None,
118        error_message_parser: Optional[ErrorMessageParser] = None,
119        disable_retries: bool = False,
120        message_repository: Optional[MessageRepository] = None,
121    ):
122        self._name = name
123        self._api_budget: APIBudget = api_budget or APIBudget(policies=[])
124        if session:
125            self._session = session
126        else:
127            self._use_cache = use_cache
128            self._session = self._request_session()
129            self._session.mount(
130                "https://",
131                requests.adapters.HTTPAdapter(
132                    pool_connections=MAX_CONNECTION_POOL_SIZE, pool_maxsize=MAX_CONNECTION_POOL_SIZE
133                ),
134            )
135        if isinstance(authenticator, AuthBase):
136            self._session.auth = authenticator
137        self._logger = logger
138        self._error_handler = error_handler or HttpStatusErrorHandler(self._logger)
139        if backoff_strategy is not None:
140            if isinstance(backoff_strategy, list):
141                self._backoff_strategies = backoff_strategy
142            else:
143                self._backoff_strategies = [backoff_strategy]
144        else:
145            self._backoff_strategies = [DefaultBackoffStrategy()]
146        self._error_message_parser = error_message_parser or JsonErrorMessageParser()
147        self._request_attempt_count: Dict[requests.PreparedRequest, int] = {}
148        self._disable_retries = disable_retries
149        self._message_repository = message_repository
150
151    @property
152    def cache_filename(self) -> str:
153        """
154        Override if needed. Return the name of cache file
155        Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.
156        """
157        return f"{self._name}.sqlite"
158
159    def _request_session(self) -> requests.Session:
160        """
161        Session factory based on use_cache property and call rate limits (api_budget parameter)
162        :return: instance of request-based session
163        """
164        if self._use_cache:
165            cache_dir = os.getenv(ENV_REQUEST_CACHE_PATH)
166            # Use in-memory cache if cache_dir is not set
167            # This is a non-obvious interface, but it ensures we don't write sql files when running unit tests
168            # Use in-memory cache if cache_dir is not set
169            # This is a non-obvious interface, but it ensures we don't write sql files when running unit tests
170            sqlite_path = (
171                str(Path(cache_dir) / self.cache_filename)
172                if cache_dir
173                else "file::memory:?cache=shared"
174            )
175            # By using `PRAGMA synchronous=OFF` and `PRAGMA journal_mode=WAL`, we reduce the possible occurrences of `database table is locked` errors.
176            # Note that those were blindly added at the same time and one or the other might be sufficient to prevent the issues but we have seen good results with both. Feel free to revisit given more information.
177            # There are strong signals that `fast_save` might create problems but if the sync crashes, we start back from the beginning in terms of sqlite anyway so the impact should be minimal. Signals are:
178            # * https://github.com/requests-cache/requests-cache/commit/7fa89ffda300331c37d8fad7f773348a3b5b0236#diff-f43db4a5edf931647c32dec28ea7557aae4cae8444af4b26c8ecbe88d8c925aaR238
179            # * https://github.com/requests-cache/requests-cache/commit/7fa89ffda300331c37d8fad7f773348a3b5b0236#diff-2e7f95b7d7be270ff1a8118f817ea3e6663cdad273592e536a116c24e6d23c18R164-R168
180            # * `If the application running SQLite crashes, the data will be safe, but the database [might become corrupted](https://www.sqlite.org/howtocorrupt.html#cfgerr) if the operating system crashes or the computer loses power before that data has been written to the disk surface.` in [this description](https://www.sqlite.org/pragma.html#pragma_synchronous).
181            backend = requests_cache.SQLiteCache(sqlite_path, fast_save=True, wal=True)
182            return CachedLimiterSession(
183                cache_name=sqlite_path,
184                backend=backend,
185                api_budget=self._api_budget,
186                match_headers=True,
187            )
188        else:
189            return LimiterSession(api_budget=self._api_budget)
190
191    def clear_cache(self) -> None:
192        """
193        Clear cached requests for current session, can be called any time
194        """
195        if isinstance(self._session, requests_cache.CachedSession):
196            self._session.cache.clear()  # type: ignore # cache.clear is not typed
197
198    def _dedupe_query_params(
199        self, url: str, params: Optional[Mapping[str, str]]
200    ) -> Mapping[str, str]:
201        """
202        Remove query parameters from params mapping if they are already encoded in the URL.
203        :param url: URL with
204        :param params:
205        :return:
206        """
207        if params is None:
208            params = {}
209        query_string = urllib.parse.urlparse(url).query
210        query_dict = {k: v[0] for k, v in urllib.parse.parse_qs(query_string).items()}
211
212        duplicate_keys_with_same_value = {
213            k for k in query_dict.keys() if str(params.get(k)) == str(query_dict[k])
214        }
215        return {k: v for k, v in params.items() if k not in duplicate_keys_with_same_value}
216
217    def _create_prepared_request(
218        self,
219        http_method: str,
220        url: str,
221        dedupe_query_params: bool = False,
222        headers: Optional[Mapping[str, str]] = None,
223        params: Optional[Mapping[str, str]] = None,
224        json: Optional[Mapping[str, Any]] = None,
225        data: Optional[Union[str, Mapping[str, Any]]] = None,
226    ) -> requests.PreparedRequest:
227        if dedupe_query_params:
228            query_params = self._dedupe_query_params(url, params)
229        else:
230            query_params = params or {}
231        args = {"method": http_method, "url": url, "headers": headers, "params": query_params}
232        if http_method.upper() in BODY_REQUEST_METHODS:
233            if json and data:
234                raise RequestBodyException(
235                    "At the same time only one of the 'request_body_data' and 'request_body_json' functions can return data"
236                )
237            elif json:
238                args["json"] = json
239            elif data:
240                args["data"] = data
241        prepared_request: requests.PreparedRequest = self._session.prepare_request(
242            requests.Request(**args)
243        )
244
245        return prepared_request
246
247    @property
248    def _max_retries(self) -> int:
249        """
250        Determines the max retries based on the provided error handler.
251        """
252        max_retries = None
253        if self._disable_retries:
254            max_retries = 0
255        else:
256            max_retries = self._error_handler.max_retries
257        return max_retries if max_retries is not None else self._DEFAULT_MAX_RETRY
258
259    @property
260    def _max_time(self) -> int:
261        """
262        Determines the max time based on the provided error handler.
263        """
264        return (
265            self._error_handler.max_time
266            if self._error_handler.max_time is not None
267            else self._DEFAULT_MAX_TIME
268        )
269
270    def _send_with_retry(
271        self,
272        request: requests.PreparedRequest,
273        request_kwargs: Mapping[str, Any],
274        log_formatter: Optional[Callable[[requests.Response], Any]] = None,
275        exit_on_rate_limit: Optional[bool] = False,
276    ) -> requests.Response:
277        """
278        Sends a request with retry logic.
279
280        Args:
281            request (requests.PreparedRequest): The prepared HTTP request to send.
282            request_kwargs (Mapping[str, Any]): Additional keyword arguments for the request.
283
284        Returns:
285            requests.Response: The HTTP response received from the server after retries.
286        """
287
288        max_retries = self._max_retries
289        max_tries = max(0, max_retries) + 1
290        max_time = self._max_time
291
292        user_backoff_handler = user_defined_backoff_handler(max_tries=max_tries, max_time=max_time)(
293            self._send
294        )
295        rate_limit_backoff_handler = rate_limit_default_backoff_handler(max_tries=max_tries)
296        backoff_handler = http_client_default_backoff_handler(
297            max_tries=max_tries, max_time=max_time
298        )
299        # backoff handlers wrap _send, so it will always return a response -- except when all retries are exhausted
300        try:
301            response = backoff_handler(rate_limit_backoff_handler(user_backoff_handler))(
302                request,
303                request_kwargs,
304                log_formatter=log_formatter,
305                exit_on_rate_limit=exit_on_rate_limit,
306            )  # type: ignore # mypy can't infer that backoff_handler wraps _send
307
308            return response
309        except BaseBackoffException as e:
310            self._logger.error(f"Retries exhausted with backoff exception.", exc_info=True)
311            raise MessageRepresentationAirbyteTracedErrors(
312                internal_message=f"Exhausted available request attempts. Exception: {e}",
313                message=f"Exhausted available request attempts. Please see logs for more details. Exception: {e}",
314                failure_type=e.failure_type or FailureType.system_error,
315                exception=e,
316                stream_descriptor=StreamDescriptor(name=self._name),
317            )
318
319    def _send(
320        self,
321        request: requests.PreparedRequest,
322        request_kwargs: Mapping[str, Any],
323        log_formatter: Optional[Callable[[requests.Response], Any]] = None,
324        exit_on_rate_limit: Optional[bool] = False,
325    ) -> requests.Response:
326        if request not in self._request_attempt_count:
327            self._request_attempt_count[request] = 1
328        else:
329            self._request_attempt_count[request] += 1
330            if hasattr(self._session, "auth") and isinstance(self._session.auth, AuthBase):
331                self._session.auth(request)
332
333        self._logger.debug(
334            "Making outbound API request",
335            extra={"headers": request.headers, "url": request.url, "request_body": request.body},
336        )
337
338        response: Optional[requests.Response] = None
339        exc: Optional[requests.RequestException] = None
340
341        try:
342            response = self._session.send(request, **request_kwargs)
343        except requests.RequestException as e:
344            exc = e
345
346        error_resolution: ErrorResolution = self._error_handler.interpret_response(
347            response if response is not None else exc
348        )
349
350        # Evaluation of response.text can be heavy, for example, if streaming a large response
351        # Do it only in debug mode
352        if self._logger.isEnabledFor(logging.DEBUG) and response is not None:
353            if request_kwargs.get("stream"):
354                self._logger.debug(
355                    "Receiving response, but not logging it as the response is streamed",
356                    extra={"headers": response.headers, "status": response.status_code},
357                )
358            else:
359                self._logger.debug(
360                    "Receiving response",
361                    extra={
362                        "headers": response.headers,
363                        "status": response.status_code,
364                        "body": response.text,
365                    },
366                )
367
368        # Request/response logging for declarative cdk
369        if (
370            log_formatter is not None
371            and response is not None
372            and self._message_repository is not None
373        ):
374            formatter = log_formatter
375            self._message_repository.log_message(
376                Level.DEBUG,
377                lambda: formatter(response),
378            )
379
380        self._handle_error_resolution(
381            response=response,
382            exc=exc,
383            request=request,
384            error_resolution=error_resolution,
385            exit_on_rate_limit=exit_on_rate_limit,
386        )
387
388        return response  # type: ignore # will either return a valid response of type requests.Response or raise an exception
389
390    def _get_response_body(self, response: requests.Response) -> Optional[JsonType]:
391        """
392        Extracts and returns the body of an HTTP response.
393
394        This method attempts to parse the response body as JSON. If the response
395        body is not valid JSON, it falls back to decoding the response content
396        as a UTF-8 string. If both attempts fail, it returns None.
397
398        Args:
399            response (requests.Response): The HTTP response object.
400
401        Returns:
402            Optional[JsonType]: The parsed JSON object as a string, the decoded
403            response content as a string, or None if both parsing attempts fail.
404        """
405        try:
406            return str(response.json())
407        except requests.exceptions.JSONDecodeError:
408            try:
409                return response.content.decode("utf-8")
410            except Exception:
411                return "The Content of the Response couldn't be decoded."
412
413    def _evict_key(self, prepared_request: requests.PreparedRequest) -> None:
414        """
415        Addresses high memory consumption when enabling concurrency in https://github.com/airbytehq/oncall/issues/6821.
416
417        The `_request_attempt_count` attribute keeps growing as multiple requests are made using the same `http_client`.
418        To mitigate this issue, we evict keys for completed requests once we confirm that no further retries are needed.
419        This helps manage memory usage more efficiently while maintaining the necessary logic for retry attempts.
420        """
421        if prepared_request in self._request_attempt_count:
422            del self._request_attempt_count[prepared_request]
423
424    def _handle_error_resolution(
425        self,
426        response: Optional[requests.Response],
427        exc: Optional[requests.RequestException],
428        request: requests.PreparedRequest,
429        error_resolution: ErrorResolution,
430        exit_on_rate_limit: Optional[bool] = False,
431    ) -> None:
432        if error_resolution.response_action not in self._ACTIONS_TO_RETRY_ON:
433            self._evict_key(request)
434
435        if error_resolution.response_action == ResponseAction.RESET_PAGINATION:
436            raise PaginationResetRequiredException()
437
438        # Emit stream status RUNNING with the reason RATE_LIMITED to log that the rate limit has been reached
439        if error_resolution.response_action == ResponseAction.RATE_LIMITED:
440            # TODO: Update to handle with message repository when concurrent message repository is ready
441            reasons = [AirbyteStreamStatusReason(type=AirbyteStreamStatusReasonType.RATE_LIMITED)]
442            message = orjson.dumps(
443                AirbyteMessageSerializer.dump(
444                    stream_status_as_airbyte_message(
445                        StreamDescriptor(name=self._name), AirbyteStreamStatus.RUNNING, reasons
446                    )
447                )
448            ).decode()
449
450            # Simply printing the stream status is a temporary solution and can cause future issues. Currently, the _send method is
451            # wrapped with backoff decorators, and we can only emit messages by iterating record_iterator in the abstract source at the
452            # end of the retry decorator behavior. This approach does not allow us to emit messages in the queue before exiting the
453            # backoff retry loop. Adding `\n` to the message and ignore 'end' ensure that few messages are printed at the same time.
454            print(f"{message}\n", end="", flush=True)
455
456        if error_resolution.response_action == ResponseAction.FAIL:
457            if response is not None:
458                filtered_response_message = filter_secrets(
459                    f"Request (body): '{str(request.body)}'. Response (body): '{self._get_response_body(response)}'. Response (headers): '{response.headers}'."
460                )
461                error_message = f"'{request.method}' request to '{request.url}' failed with status code '{response.status_code}' and error message: '{self._error_message_parser.parse_response_error_message(response)}'. {filtered_response_message}"
462            else:
463                error_message = (
464                    f"'{request.method}' request to '{request.url}' failed with exception: '{exc}'"
465                )
466
467            # ensure the exception message is emitted before raised
468            self._logger.error(error_message)
469
470            raise MessageRepresentationAirbyteTracedErrors(
471                internal_message=error_message,
472                message=error_resolution.error_message or error_message,
473                failure_type=error_resolution.failure_type,
474            )
475
476        elif error_resolution.response_action == ResponseAction.IGNORE:
477            if response is not None:
478                log_message = f"Ignoring response for '{request.method}' request to '{request.url}' with response code '{response.status_code}'"
479            else:
480                log_message = f"Ignoring response for '{request.method}' request to '{request.url}' with error '{exc}'"
481
482            self._logger.info(error_resolution.error_message or log_message)
483
484        # TODO: Consider dynamic retry count depending on subsequent error codes
485        elif (
486            error_resolution.response_action == ResponseAction.RETRY
487            or error_resolution.response_action == ResponseAction.RATE_LIMITED
488        ):
489            user_defined_backoff_time = None
490            for backoff_strategy in self._backoff_strategies:
491                backoff_time = backoff_strategy.backoff_time(
492                    response_or_exception=response if response is not None else exc,
493                    attempt_count=self._request_attempt_count[request],
494                )
495                if backoff_time:
496                    user_defined_backoff_time = backoff_time
497                    break
498            error_message = (
499                error_resolution.error_message
500                or f"Request to {request.url} failed with failure type {error_resolution.failure_type}, response action {error_resolution.response_action}."
501            )
502
503            retry_endlessly = (
504                error_resolution.response_action == ResponseAction.RATE_LIMITED
505                and not exit_on_rate_limit
506            )
507
508            if user_defined_backoff_time:
509                raise UserDefinedBackoffException(
510                    backoff=user_defined_backoff_time,
511                    request=request,
512                    response=(response if response is not None else exc),
513                    error_message=error_message,
514                    failure_type=error_resolution.failure_type,
515                )
516
517            elif retry_endlessly:
518                raise RateLimitBackoffException(
519                    request=request,
520                    response=(response if response is not None else exc),
521                    error_message=error_message,
522                    failure_type=error_resolution.failure_type,
523                )
524
525            raise DefaultBackoffException(
526                request=request,
527                response=(response if response is not None else exc),
528                error_message=error_message,
529                failure_type=error_resolution.failure_type,
530            )
531
532        elif response:
533            try:
534                response.raise_for_status()
535            except requests.HTTPError as e:
536                self._logger.error(response.text)
537                raise e
538
539    @property
540    def name(self) -> str:
541        return self._name
542
543    def send_request(
544        self,
545        http_method: str,
546        url: str,
547        request_kwargs: Mapping[str, Any],
548        headers: Optional[Mapping[str, str]] = None,
549        params: Optional[Mapping[str, str]] = None,
550        json: Optional[Mapping[str, Any]] = None,
551        data: Optional[Union[str, Mapping[str, Any]]] = None,
552        dedupe_query_params: bool = False,
553        log_formatter: Optional[Callable[[requests.Response], Any]] = None,
554        exit_on_rate_limit: Optional[bool] = False,
555    ) -> Tuple[requests.PreparedRequest, requests.Response]:
556        """
557        Prepares and sends request and return request and response objects.
558        """
559
560        request: requests.PreparedRequest = self._create_prepared_request(
561            http_method=http_method,
562            url=url,
563            dedupe_query_params=dedupe_query_params,
564            headers=headers,
565            params=params,
566            json=json,
567            data=data,
568        )
569
570        env_settings = self._session.merge_environment_settings(
571            url=request.url,
572            proxies=request_kwargs.get("proxies", {}),
573            stream=request_kwargs.get("stream"),
574            verify=request_kwargs.get("verify"),
575            cert=request_kwargs.get("cert"),
576        )
577        request_kwargs = {**request_kwargs, **env_settings}
578
579        response: requests.Response = self._send_with_retry(
580            request=request,
581            request_kwargs=request_kwargs,
582            log_formatter=log_formatter,
583            exit_on_rate_limit=exit_on_rate_limit,
584        )
585
586        return request, response
HttpClient( name: str, logger: logging.Logger, error_handler: Optional[airbyte_cdk.sources.streams.http.error_handlers.ErrorHandler] = None, api_budget: Optional[airbyte_cdk.sources.streams.call_rate.APIBudget] = None, session: Union[requests.sessions.Session, requests_cache.session.CachedSession, NoneType] = None, authenticator: Optional[requests.auth.AuthBase] = None, use_cache: bool = False, backoff_strategy: Union[airbyte_cdk.BackoffStrategy, List[airbyte_cdk.BackoffStrategy], NoneType] = None, error_message_parser: Optional[airbyte_cdk.sources.streams.http.error_handlers.ErrorMessageParser] = None, disable_retries: bool = False, message_repository: Optional[airbyte_cdk.MessageRepository] = None)
108    def __init__(
109        self,
110        name: str,
111        logger: logging.Logger,
112        error_handler: Optional[ErrorHandler] = None,
113        api_budget: Optional[APIBudget] = None,
114        session: Optional[Union[requests.Session, requests_cache.CachedSession]] = None,
115        authenticator: Optional[AuthBase] = None,
116        use_cache: bool = False,
117        backoff_strategy: Optional[Union[BackoffStrategy, List[BackoffStrategy]]] = None,
118        error_message_parser: Optional[ErrorMessageParser] = None,
119        disable_retries: bool = False,
120        message_repository: Optional[MessageRepository] = None,
121    ):
122        self._name = name
123        self._api_budget: APIBudget = api_budget or APIBudget(policies=[])
124        if session:
125            self._session = session
126        else:
127            self._use_cache = use_cache
128            self._session = self._request_session()
129            self._session.mount(
130                "https://",
131                requests.adapters.HTTPAdapter(
132                    pool_connections=MAX_CONNECTION_POOL_SIZE, pool_maxsize=MAX_CONNECTION_POOL_SIZE
133                ),
134            )
135        if isinstance(authenticator, AuthBase):
136            self._session.auth = authenticator
137        self._logger = logger
138        self._error_handler = error_handler or HttpStatusErrorHandler(self._logger)
139        if backoff_strategy is not None:
140            if isinstance(backoff_strategy, list):
141                self._backoff_strategies = backoff_strategy
142            else:
143                self._backoff_strategies = [backoff_strategy]
144        else:
145            self._backoff_strategies = [DefaultBackoffStrategy()]
146        self._error_message_parser = error_message_parser or JsonErrorMessageParser()
147        self._request_attempt_count: Dict[requests.PreparedRequest, int] = {}
148        self._disable_retries = disable_retries
149        self._message_repository = message_repository
cache_filename: str
151    @property
152    def cache_filename(self) -> str:
153        """
154        Override if needed. Return the name of cache file
155        Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.
156        """
157        return f"{self._name}.sqlite"

Override if needed. Return the name of cache file Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.

def clear_cache(self) -> None:
191    def clear_cache(self) -> None:
192        """
193        Clear cached requests for current session, can be called any time
194        """
195        if isinstance(self._session, requests_cache.CachedSession):
196            self._session.cache.clear()  # type: ignore # cache.clear is not typed

Clear cached requests for current session, can be called any time

name: str
539    @property
540    def name(self) -> str:
541        return self._name
def send_request( self, http_method: str, url: str, request_kwargs: Mapping[str, Any], headers: Optional[Mapping[str, str]] = None, params: Optional[Mapping[str, str]] = None, json: Optional[Mapping[str, Any]] = None, data: Union[str, Mapping[str, Any], NoneType] = None, dedupe_query_params: bool = False, log_formatter: Optional[Callable[[requests.models.Response], Any]] = None, exit_on_rate_limit: Optional[bool] = False) -> Tuple[requests.models.PreparedRequest, requests.models.Response]:
543    def send_request(
544        self,
545        http_method: str,
546        url: str,
547        request_kwargs: Mapping[str, Any],
548        headers: Optional[Mapping[str, str]] = None,
549        params: Optional[Mapping[str, str]] = None,
550        json: Optional[Mapping[str, Any]] = None,
551        data: Optional[Union[str, Mapping[str, Any]]] = None,
552        dedupe_query_params: bool = False,
553        log_formatter: Optional[Callable[[requests.Response], Any]] = None,
554        exit_on_rate_limit: Optional[bool] = False,
555    ) -> Tuple[requests.PreparedRequest, requests.Response]:
556        """
557        Prepares and sends request and return request and response objects.
558        """
559
560        request: requests.PreparedRequest = self._create_prepared_request(
561            http_method=http_method,
562            url=url,
563            dedupe_query_params=dedupe_query_params,
564            headers=headers,
565            params=params,
566            json=json,
567            data=data,
568        )
569
570        env_settings = self._session.merge_environment_settings(
571            url=request.url,
572            proxies=request_kwargs.get("proxies", {}),
573            stream=request_kwargs.get("stream"),
574            verify=request_kwargs.get("verify"),
575            cert=request_kwargs.get("cert"),
576        )
577        request_kwargs = {**request_kwargs, **env_settings}
578
579        response: requests.Response = self._send_with_retry(
580            request=request,
581            request_kwargs=request_kwargs,
582            log_formatter=log_formatter,
583            exit_on_rate_limit=exit_on_rate_limit,
584        )
585
586        return request, response

Prepares and sends request and return request and response objects.

 45class HttpStream(Stream, CheckpointMixin, ABC):
 46    """
 47    Base abstract class for an Airbyte Stream using the HTTP protocol. Basic building block for users building an Airbyte source for a HTTP API.
 48    """
 49
 50    source_defined_cursor = True  # Most HTTP streams use a source defined cursor (i.e: the user can't configure it like on a SQL table)
 51    page_size: Optional[int] = (
 52        None  # Use this variable to define page size for API http requests with pagination support
 53    )
 54
 55    def __init__(
 56        self, authenticator: Optional[AuthBase] = None, api_budget: Optional[APIBudget] = None
 57    ):
 58        self._exit_on_rate_limit: bool = False
 59        self._http_client = HttpClient(
 60            name=self.name,
 61            logger=self.logger,
 62            error_handler=self.get_error_handler(),
 63            api_budget=api_budget or APIBudget(policies=[]),
 64            authenticator=authenticator,
 65            use_cache=self.use_cache,
 66            backoff_strategy=self.get_backoff_strategy(),
 67            message_repository=InMemoryMessageRepository(),
 68        )
 69
 70        # There are three conditions that dictate if RFR should automatically be applied to a stream
 71        # 1. Streams that explicitly initialize their own cursor should defer to it and not automatically apply RFR
 72        # 2. Streams with at least one cursor_field are incremental and thus a superior sync to RFR.
 73        # 3. Streams overriding read_records() do not guarantee that they will call the parent implementation which can perform
 74        #    per-page checkpointing so RFR is only supported if a stream use the default `HttpStream.read_records()` method
 75        if (
 76            not self.cursor
 77            and len(self.cursor_field) == 0
 78            and type(self).read_records is HttpStream.read_records
 79        ):
 80            self.cursor = ResumableFullRefreshCursor()
 81
 82    @property
 83    def exit_on_rate_limit(self) -> bool:
 84        """
 85        :return: False if the stream will retry endlessly when rate limited
 86        """
 87        return self._exit_on_rate_limit
 88
 89    @exit_on_rate_limit.setter
 90    def exit_on_rate_limit(self, value: bool) -> None:
 91        self._exit_on_rate_limit = value
 92
 93    @property
 94    def cache_filename(self) -> str:
 95        """
 96        Override if needed. Return the name of cache file
 97        Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.
 98        """
 99        return f"{self.name}.sqlite"
100
101    @property
102    def use_cache(self) -> bool:
103        """
104        Override if needed. If True, all records will be cached.
105        Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.
106        """
107        return False
108
109    @property
110    @abstractmethod
111    def url_base(self) -> str:
112        """
113        :return: URL base for the  API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"
114        """
115
116    @property
117    def http_method(self) -> str:
118        """
119        Override if needed. See get_request_data/get_request_json if using POST/PUT/PATCH.
120        """
121        return "GET"
122
123    @property
124    @deprecated(
125        "Deprecated as of CDK version 3.0.0. "
126        "You should set error_handler explicitly in HttpStream.get_error_handler() instead."
127    )
128    def raise_on_http_errors(self) -> bool:
129        """
130        Override if needed. If set to False, allows opting-out of raising HTTP code exception.
131        """
132        return True
133
134    @property
135    @deprecated(
136        "Deprecated as of CDK version 3.0.0. "
137        "You should set backoff_strategies explicitly in HttpStream.get_backoff_strategy() instead."
138    )
139    def max_retries(self) -> Union[int, None]:
140        """
141        Override if needed. Specifies maximum amount of retries for backoff policy. Return None for no limit.
142        """
143        return 5
144
145    @property
146    @deprecated(
147        "Deprecated as of CDK version 3.0.0. "
148        "You should set backoff_strategies explicitly in HttpStream.get_backoff_strategy() instead."
149    )
150    def max_time(self) -> Union[int, None]:
151        """
152        Override if needed. Specifies maximum total waiting time (in seconds) for backoff policy. Return None for no limit.
153        """
154        return 60 * 10
155
156    @property
157    @deprecated(
158        "Deprecated as of CDK version 3.0.0. "
159        "You should set backoff_strategies explicitly in HttpStream.get_backoff_strategy() instead."
160    )
161    def retry_factor(self) -> float:
162        """
163        Override if needed. Specifies factor for backoff policy.
164        """
165        return 5
166
167    @abstractmethod
168    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
169        """
170        Override this method to define a pagination strategy.
171
172        The value returned from this method is passed to most other methods in this class. Use it to form a request e.g: set headers or query params.
173
174        :return: The token for the next page from the input response object. Returning None means there are no more pages to read in this response.
175        """
176
177    @abstractmethod
178    def path(
179        self,
180        *,
181        stream_state: Optional[Mapping[str, Any]] = None,
182        stream_slice: Optional[Mapping[str, Any]] = None,
183        next_page_token: Optional[Mapping[str, Any]] = None,
184    ) -> str:
185        """
186        Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity"
187        """
188
189    def request_params(
190        self,
191        stream_state: Optional[Mapping[str, Any]],
192        stream_slice: Optional[Mapping[str, Any]] = None,
193        next_page_token: Optional[Mapping[str, Any]] = None,
194    ) -> MutableMapping[str, Any]:
195        """
196        Override this method to define the query parameters that should be set on an outgoing HTTP request given the inputs.
197
198        E.g: you might want to define query parameters for paging if next_page_token is not None.
199        """
200        return {}
201
202    def request_headers(
203        self,
204        stream_state: Optional[Mapping[str, Any]],
205        stream_slice: Optional[Mapping[str, Any]] = None,
206        next_page_token: Optional[Mapping[str, Any]] = None,
207    ) -> Mapping[str, Any]:
208        """
209        Override to return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
210        """
211        return {}
212
213    def request_body_data(
214        self,
215        stream_state: Optional[Mapping[str, Any]],
216        stream_slice: Optional[Mapping[str, Any]] = None,
217        next_page_token: Optional[Mapping[str, Any]] = None,
218    ) -> Optional[Union[Mapping[str, Any], str]]:
219        """
220        Override when creating POST/PUT/PATCH requests to populate the body of the request with a non-JSON payload.
221
222        If returns a ready text that it will be sent as is.
223        If returns a dict that it will be converted to a urlencoded form.
224        E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
225
226        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
227        """
228        return None
229
230    def request_body_json(
231        self,
232        stream_state: Optional[Mapping[str, Any]],
233        stream_slice: Optional[Mapping[str, Any]] = None,
234        next_page_token: Optional[Mapping[str, Any]] = None,
235    ) -> Optional[Mapping[str, Any]]:
236        """
237        Override when creating POST/PUT/PATCH requests to populate the body of the request with a JSON payload.
238
239        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
240        """
241        return None
242
243    def request_kwargs(
244        self,
245        stream_state: Optional[Mapping[str, Any]],
246        stream_slice: Optional[Mapping[str, Any]] = None,
247        next_page_token: Optional[Mapping[str, Any]] = None,
248    ) -> Mapping[str, Any]:
249        """
250        Override to return a mapping of keyword arguments to be used when creating the HTTP request.
251        Any option listed in https://docs.python-requests.org/en/latest/api/#requests.adapters.BaseAdapter.send for can be returned from
252        this method. Note that these options do not conflict with request-level options such as headers, request params, etc..
253        """
254        return {}
255
256    @abstractmethod
257    def parse_response(
258        self,
259        response: requests.Response,
260        *,
261        stream_state: Mapping[str, Any],
262        stream_slice: Optional[Mapping[str, Any]] = None,
263        next_page_token: Optional[Mapping[str, Any]] = None,
264    ) -> Iterable[Mapping[str, Any]]:
265        """
266        Parses the raw response object into a list of records.
267        By default, this returns an iterable containing the input. Override to parse differently.
268        :param response:
269        :param stream_state:
270        :param stream_slice:
271        :param next_page_token:
272        :return: An iterable containing the parsed response
273        """
274
275    def get_backoff_strategy(self) -> Optional[Union[BackoffStrategy, List[BackoffStrategy]]]:
276        """
277        Used to initialize Adapter to avoid breaking changes.
278        If Stream has a `backoff_time` method implementation, we know this stream uses old (pre-HTTPClient) backoff handlers and thus an adapter is needed.
279
280        Override to provide custom BackoffStrategy
281        :return Optional[BackoffStrategy]:
282        """
283        if hasattr(self, "backoff_time"):
284            return HttpStreamAdapterBackoffStrategy(self)
285        else:
286            return None
287
288    def get_error_handler(self) -> Optional[ErrorHandler]:
289        """
290        Used to initialize Adapter to avoid breaking changes.
291        If Stream has a `should_retry` method implementation, we know this stream uses old (pre-HTTPClient) error handlers and thus an adapter is needed.
292
293        Override to provide custom ErrorHandler
294        :return Optional[ErrorHandler]:
295        """
296        if hasattr(self, "should_retry"):
297            error_handler = HttpStreamAdapterHttpStatusErrorHandler(
298                stream=self,
299                logger=logging.getLogger(),
300                max_retries=self.max_retries,
301                max_time=timedelta(seconds=self.max_time or 0),
302            )
303            return error_handler
304        else:
305            return None
306
307    @classmethod
308    def _join_url(cls, url_base: str, path: str) -> str:
309        return urljoin(url_base, path)
310
311    @classmethod
312    def parse_response_error_message(cls, response: requests.Response) -> Optional[str]:
313        """
314        Parses the raw response object from a failed request into a user-friendly error message.
315        By default, this method tries to grab the error message from JSON responses by following common API patterns. Override to parse differently.
316
317        :param response:
318        :return: A user-friendly message that indicates the cause of the error
319        """
320
321        # default logic to grab error from common fields
322        def _try_get_error(value: Optional[JsonType]) -> Optional[str]:
323            if isinstance(value, str):
324                return value
325            elif isinstance(value, list):
326                errors_in_value = [_try_get_error(v) for v in value]
327                return ", ".join(v for v in errors_in_value if v is not None)
328            elif isinstance(value, dict):
329                new_value = (
330                    value.get("message")
331                    or value.get("messages")
332                    or value.get("error")
333                    or value.get("errors")
334                    or value.get("failures")
335                    or value.get("failure")
336                    or value.get("detail")
337                )
338                return _try_get_error(new_value)
339            return None
340
341        try:
342            body = response.json()
343            return _try_get_error(body)
344        except requests.exceptions.JSONDecodeError:
345            return None
346
347    def get_error_display_message(self, exception: BaseException) -> Optional[str]:
348        """
349        Retrieves the user-friendly display message that corresponds to an exception.
350        This will be called when encountering an exception while reading records from the stream, and used to build the AirbyteTraceMessage.
351
352        The default implementation of this method only handles HTTPErrors by passing the response to self.parse_response_error_message().
353        The method should be overriden as needed to handle any additional exception types.
354
355        :param exception: The exception that was raised
356        :return: A user-friendly message that indicates the cause of the error
357        """
358        if isinstance(exception, requests.HTTPError) and exception.response is not None:
359            return self.parse_response_error_message(exception.response)
360        return None
361
362    def read_records(
363        self,
364        sync_mode: SyncMode,
365        cursor_field: Optional[List[str]] = None,
366        stream_slice: Optional[Mapping[str, Any]] = None,
367        stream_state: Optional[Mapping[str, Any]] = None,
368    ) -> Iterable[StreamData]:
369        # A cursor_field indicates this is an incremental stream which offers better checkpointing than RFR enabled via the cursor
370        if self.cursor_field or not isinstance(self.get_cursor(), ResumableFullRefreshCursor):
371            yield from self._read_pages(
372                lambda req, res, state, _slice: self.parse_response(
373                    res, stream_slice=_slice, stream_state=state
374                ),
375                stream_slice,
376                stream_state,
377            )
378        else:
379            yield from self._read_single_page(
380                lambda req, res, state, _slice: self.parse_response(
381                    res, stream_slice=_slice, stream_state=state
382                ),
383                stream_slice,
384                stream_state,
385            )
386
387    @property
388    def state(self) -> MutableMapping[str, Any]:
389        cursor = self.get_cursor()
390        if cursor:
391            return cursor.get_stream_state()  # type: ignore
392        return self._state
393
394    @state.setter
395    def state(self, value: MutableMapping[str, Any]) -> None:
396        cursor = self.get_cursor()
397        if cursor:
398            cursor.set_initial_state(value)
399        self._state = value
400
401    def get_cursor(self) -> Optional[Cursor]:
402        # I don't love that this is semi-stateful but not sure what else to do. We don't know exactly what type of cursor to
403        # instantiate when creating the class. We can make a few assumptions like if there is a cursor_field which implies
404        # incremental, but we don't know until runtime if this is a substream. Ideally, a stream should explicitly define
405        # its cursor, but because we're trying to automatically apply RFR we're stuck with this logic where we replace the
406        # cursor at runtime once we detect this is a substream based on self.has_multiple_slices being reassigned
407        if self.has_multiple_slices and isinstance(self.cursor, ResumableFullRefreshCursor):
408            self.cursor = SubstreamResumableFullRefreshCursor()
409            return self.cursor
410        else:
411            return self.cursor
412
413    def _read_pages(
414        self,
415        records_generator_fn: Callable[
416            [
417                requests.PreparedRequest,
418                requests.Response,
419                Mapping[str, Any],
420                Optional[Mapping[str, Any]],
421            ],
422            Iterable[StreamData],
423        ],
424        stream_slice: Optional[Mapping[str, Any]] = None,
425        stream_state: Optional[Mapping[str, Any]] = None,
426    ) -> Iterable[StreamData]:
427        stream_state = stream_state or {}
428        pagination_complete = False
429        next_page_token = None
430        while not pagination_complete:
431            request, response = self._fetch_next_page(stream_slice, stream_state, next_page_token)
432            yield from records_generator_fn(request, response, stream_state, stream_slice)
433
434            next_page_token = self.next_page_token(response)
435            if not next_page_token:
436                pagination_complete = True
437
438        cursor = self.get_cursor()
439        if cursor and isinstance(cursor, SubstreamResumableFullRefreshCursor):
440            partition, _, _ = self._extract_slice_fields(stream_slice=stream_slice)
441            # Substreams checkpoint state by marking an entire parent partition as completed so that on the subsequent attempt
442            # after a failure, completed parents are skipped and the sync can make progress
443            cursor.close_slice(StreamSlice(cursor_slice={}, partition=partition))
444
445        # Always return an empty generator just in case no records were ever yielded
446        yield from []
447
448    def _read_single_page(
449        self,
450        records_generator_fn: Callable[
451            [
452                requests.PreparedRequest,
453                requests.Response,
454                Mapping[str, Any],
455                Optional[Mapping[str, Any]],
456            ],
457            Iterable[StreamData],
458        ],
459        stream_slice: Optional[Mapping[str, Any]] = None,
460        stream_state: Optional[Mapping[str, Any]] = None,
461    ) -> Iterable[StreamData]:
462        partition, cursor_slice, remaining_slice = self._extract_slice_fields(
463            stream_slice=stream_slice
464        )
465        stream_state = stream_state or {}
466        next_page_token = cursor_slice or None
467
468        request, response = self._fetch_next_page(remaining_slice, stream_state, next_page_token)
469        yield from records_generator_fn(request, response, stream_state, remaining_slice)
470
471        next_page_token = self.next_page_token(response) or {
472            "__ab_full_refresh_sync_complete": True
473        }
474
475        cursor = self.get_cursor()
476        if cursor:
477            cursor.close_slice(StreamSlice(cursor_slice=next_page_token, partition=partition))
478
479        # Always return an empty generator just in case no records were ever yielded
480        yield from []
481
482    @staticmethod
483    def _extract_slice_fields(
484        stream_slice: Optional[Mapping[str, Any]],
485    ) -> tuple[Mapping[str, Any], Mapping[str, Any], Mapping[str, Any]]:
486        if not stream_slice:
487            return {}, {}, {}
488
489        if isinstance(stream_slice, StreamSlice):
490            partition = stream_slice.partition
491            cursor_slice = stream_slice.cursor_slice
492            remaining = {k: v for k, v in stream_slice.items()}
493        else:
494            # RFR streams that implement stream_slices() to generate stream slices in the legacy mapping format are converted into a
495            # structured stream slice mapping by the LegacyCursorBasedCheckpointReader. The structured mapping object has separate
496            # fields for the partition and cursor_slice value
497            partition = stream_slice.get("partition", {})
498            cursor_slice = stream_slice.get("cursor_slice", {})
499            remaining = {
500                key: val
501                for key, val in stream_slice.items()
502                if key != "partition" and key != "cursor_slice"
503            }
504        return partition, cursor_slice, remaining
505
506    def _fetch_next_page(
507        self,
508        stream_slice: Optional[Mapping[str, Any]] = None,
509        stream_state: Optional[Mapping[str, Any]] = None,
510        next_page_token: Optional[Mapping[str, Any]] = None,
511    ) -> Tuple[requests.PreparedRequest, requests.Response]:
512        request, response = self._http_client.send_request(
513            http_method=self.http_method,
514            url=self._join_url(
515                self.url_base,
516                self.path(
517                    stream_state=stream_state,
518                    stream_slice=stream_slice,
519                    next_page_token=next_page_token,
520                ),
521            ),
522            request_kwargs=self.request_kwargs(
523                stream_state=stream_state,
524                stream_slice=stream_slice,
525                next_page_token=next_page_token,
526            ),
527            headers=self.request_headers(
528                stream_state=stream_state,
529                stream_slice=stream_slice,
530                next_page_token=next_page_token,
531            ),
532            params=self.request_params(
533                stream_state=stream_state,
534                stream_slice=stream_slice,
535                next_page_token=next_page_token,
536            ),
537            json=self.request_body_json(
538                stream_state=stream_state,
539                stream_slice=stream_slice,
540                next_page_token=next_page_token,
541            ),
542            data=self.request_body_data(
543                stream_state=stream_state,
544                stream_slice=stream_slice,
545                next_page_token=next_page_token,
546            ),
547            dedupe_query_params=True,
548            log_formatter=self.get_log_formatter(),
549            exit_on_rate_limit=self.exit_on_rate_limit,
550        )
551
552        return request, response
553
554    def get_log_formatter(self) -> Optional[Callable[[requests.Response], Any]]:
555        """
556
557        :return Optional[Callable[[requests.Response], Any]]: Function that will be used in logging inside HttpClient
558        """
559        return None

Base abstract class for an Airbyte Stream using the HTTP protocol. Basic building block for users building an Airbyte source for a HTTP API.

source_defined_cursor = True

Return False if the cursor can be configured by the user.

page_size: Optional[int] = None
exit_on_rate_limit: bool
82    @property
83    def exit_on_rate_limit(self) -> bool:
84        """
85        :return: False if the stream will retry endlessly when rate limited
86        """
87        return self._exit_on_rate_limit
Returns

False if the stream will retry endlessly when rate limited

cache_filename: str
93    @property
94    def cache_filename(self) -> str:
95        """
96        Override if needed. Return the name of cache file
97        Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.
98        """
99        return f"{self.name}.sqlite"

Override if needed. Return the name of cache file Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.

use_cache: bool
101    @property
102    def use_cache(self) -> bool:
103        """
104        Override if needed. If True, all records will be cached.
105        Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.
106        """
107        return False

Override if needed. If True, all records will be cached. Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.

url_base: str
109    @property
110    @abstractmethod
111    def url_base(self) -> str:
112        """
113        :return: URL base for the  API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"
114        """
Returns

URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"

http_method: str
116    @property
117    def http_method(self) -> str:
118        """
119        Override if needed. See get_request_data/get_request_json if using POST/PUT/PATCH.
120        """
121        return "GET"

Override if needed. See get_request_data/get_request_json if using POST/PUT/PATCH.

raise_on_http_errors: bool
123    @property
124    @deprecated(
125        "Deprecated as of CDK version 3.0.0. "
126        "You should set error_handler explicitly in HttpStream.get_error_handler() instead."
127    )
128    def raise_on_http_errors(self) -> bool:
129        """
130        Override if needed. If set to False, allows opting-out of raising HTTP code exception.
131        """
132        return True

Override if needed. If set to False, allows opting-out of raising HTTP code exception.

max_retries: Optional[int]
134    @property
135    @deprecated(
136        "Deprecated as of CDK version 3.0.0. "
137        "You should set backoff_strategies explicitly in HttpStream.get_backoff_strategy() instead."
138    )
139    def max_retries(self) -> Union[int, None]:
140        """
141        Override if needed. Specifies maximum amount of retries for backoff policy. Return None for no limit.
142        """
143        return 5

Override if needed. Specifies maximum amount of retries for backoff policy. Return None for no limit.

max_time: Optional[int]
145    @property
146    @deprecated(
147        "Deprecated as of CDK version 3.0.0. "
148        "You should set backoff_strategies explicitly in HttpStream.get_backoff_strategy() instead."
149    )
150    def max_time(self) -> Union[int, None]:
151        """
152        Override if needed. Specifies maximum total waiting time (in seconds) for backoff policy. Return None for no limit.
153        """
154        return 60 * 10

Override if needed. Specifies maximum total waiting time (in seconds) for backoff policy. Return None for no limit.

retry_factor: float
156    @property
157    @deprecated(
158        "Deprecated as of CDK version 3.0.0. "
159        "You should set backoff_strategies explicitly in HttpStream.get_backoff_strategy() instead."
160    )
161    def retry_factor(self) -> float:
162        """
163        Override if needed. Specifies factor for backoff policy.
164        """
165        return 5

Override if needed. Specifies factor for backoff policy.

@abstractmethod
def next_page_token(self, response: requests.models.Response) -> Optional[Mapping[str, Any]]:
167    @abstractmethod
168    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
169        """
170        Override this method to define a pagination strategy.
171
172        The value returned from this method is passed to most other methods in this class. Use it to form a request e.g: set headers or query params.
173
174        :return: The token for the next page from the input response object. Returning None means there are no more pages to read in this response.
175        """

Override this method to define a pagination strategy.

The value returned from this method is passed to most other methods in this class. Use it to form a request e.g: set headers or query params.

Returns

The token for the next page from the input response object. Returning None means there are no more pages to read in this response.

@abstractmethod
def path( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> str:
177    @abstractmethod
178    def path(
179        self,
180        *,
181        stream_state: Optional[Mapping[str, Any]] = None,
182        stream_slice: Optional[Mapping[str, Any]] = None,
183        next_page_token: Optional[Mapping[str, Any]] = None,
184    ) -> str:
185        """
186        Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity"
187        """

Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity"

def request_params( self, stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> MutableMapping[str, Any]:
189    def request_params(
190        self,
191        stream_state: Optional[Mapping[str, Any]],
192        stream_slice: Optional[Mapping[str, Any]] = None,
193        next_page_token: Optional[Mapping[str, Any]] = None,
194    ) -> MutableMapping[str, Any]:
195        """
196        Override this method to define the query parameters that should be set on an outgoing HTTP request given the inputs.
197
198        E.g: you might want to define query parameters for paging if next_page_token is not None.
199        """
200        return {}

Override this method to define the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def request_headers( self, stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
202    def request_headers(
203        self,
204        stream_state: Optional[Mapping[str, Any]],
205        stream_slice: Optional[Mapping[str, Any]] = None,
206        next_page_token: Optional[Mapping[str, Any]] = None,
207    ) -> Mapping[str, Any]:
208        """
209        Override to return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
210        """
211        return {}

Override to return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def request_body_data( self, stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Union[str, Mapping[str, Any], NoneType]:
213    def request_body_data(
214        self,
215        stream_state: Optional[Mapping[str, Any]],
216        stream_slice: Optional[Mapping[str, Any]] = None,
217        next_page_token: Optional[Mapping[str, Any]] = None,
218    ) -> Optional[Union[Mapping[str, Any], str]]:
219        """
220        Override when creating POST/PUT/PATCH requests to populate the body of the request with a non-JSON payload.
221
222        If returns a ready text that it will be sent as is.
223        If returns a dict that it will be converted to a urlencoded form.
224        E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
225
226        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
227        """
228        return None

Override when creating POST/PUT/PATCH requests to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def request_body_json( self, stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Optional[Mapping[str, Any]]:
230    def request_body_json(
231        self,
232        stream_state: Optional[Mapping[str, Any]],
233        stream_slice: Optional[Mapping[str, Any]] = None,
234        next_page_token: Optional[Mapping[str, Any]] = None,
235    ) -> Optional[Mapping[str, Any]]:
236        """
237        Override when creating POST/PUT/PATCH requests to populate the body of the request with a JSON payload.
238
239        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
240        """
241        return None

Override when creating POST/PUT/PATCH requests to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def request_kwargs( self, stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
243    def request_kwargs(
244        self,
245        stream_state: Optional[Mapping[str, Any]],
246        stream_slice: Optional[Mapping[str, Any]] = None,
247        next_page_token: Optional[Mapping[str, Any]] = None,
248    ) -> Mapping[str, Any]:
249        """
250        Override to return a mapping of keyword arguments to be used when creating the HTTP request.
251        Any option listed in https://docs.python-requests.org/en/latest/api/#requests.adapters.BaseAdapter.send for can be returned from
252        this method. Note that these options do not conflict with request-level options such as headers, request params, etc..
253        """
254        return {}

Override to return a mapping of keyword arguments to be used when creating the HTTP request. Any option listed in https://docs.python-requests.org/en/latest/api/#requests.adapters.BaseAdapter.send for can be returned from this method. Note that these options do not conflict with request-level options such as headers, request params, etc..

@abstractmethod
def parse_response( self, response: requests.models.Response, *, stream_state: Mapping[str, Any], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Iterable[Mapping[str, Any]]:
256    @abstractmethod
257    def parse_response(
258        self,
259        response: requests.Response,
260        *,
261        stream_state: Mapping[str, Any],
262        stream_slice: Optional[Mapping[str, Any]] = None,
263        next_page_token: Optional[Mapping[str, Any]] = None,
264    ) -> Iterable[Mapping[str, Any]]:
265        """
266        Parses the raw response object into a list of records.
267        By default, this returns an iterable containing the input. Override to parse differently.
268        :param response:
269        :param stream_state:
270        :param stream_slice:
271        :param next_page_token:
272        :return: An iterable containing the parsed response
273        """

Parses the raw response object into a list of records. By default, this returns an iterable containing the input. Override to parse differently.

Parameters
  • response:
  • stream_state:
  • stream_slice:
  • next_page_token:
Returns

An iterable containing the parsed response

def get_backoff_strategy( self) -> Union[airbyte_cdk.BackoffStrategy, List[airbyte_cdk.BackoffStrategy], NoneType]:
275    def get_backoff_strategy(self) -> Optional[Union[BackoffStrategy, List[BackoffStrategy]]]:
276        """
277        Used to initialize Adapter to avoid breaking changes.
278        If Stream has a `backoff_time` method implementation, we know this stream uses old (pre-HTTPClient) backoff handlers and thus an adapter is needed.
279
280        Override to provide custom BackoffStrategy
281        :return Optional[BackoffStrategy]:
282        """
283        if hasattr(self, "backoff_time"):
284            return HttpStreamAdapterBackoffStrategy(self)
285        else:
286            return None

Used to initialize Adapter to avoid breaking changes. If Stream has a backoff_time method implementation, we know this stream uses old (pre-HTTPClient) backoff handlers and thus an adapter is needed.

Override to provide custom BackoffStrategy

Returns
def get_error_handler( self) -> Optional[airbyte_cdk.sources.streams.http.error_handlers.ErrorHandler]:
288    def get_error_handler(self) -> Optional[ErrorHandler]:
289        """
290        Used to initialize Adapter to avoid breaking changes.
291        If Stream has a `should_retry` method implementation, we know this stream uses old (pre-HTTPClient) error handlers and thus an adapter is needed.
292
293        Override to provide custom ErrorHandler
294        :return Optional[ErrorHandler]:
295        """
296        if hasattr(self, "should_retry"):
297            error_handler = HttpStreamAdapterHttpStatusErrorHandler(
298                stream=self,
299                logger=logging.getLogger(),
300                max_retries=self.max_retries,
301                max_time=timedelta(seconds=self.max_time or 0),
302            )
303            return error_handler
304        else:
305            return None

Used to initialize Adapter to avoid breaking changes. If Stream has a should_retry method implementation, we know this stream uses old (pre-HTTPClient) error handlers and thus an adapter is needed.

Override to provide custom ErrorHandler

Returns
@classmethod
def parse_response_error_message(cls, response: requests.models.Response) -> Optional[str]:
311    @classmethod
312    def parse_response_error_message(cls, response: requests.Response) -> Optional[str]:
313        """
314        Parses the raw response object from a failed request into a user-friendly error message.
315        By default, this method tries to grab the error message from JSON responses by following common API patterns. Override to parse differently.
316
317        :param response:
318        :return: A user-friendly message that indicates the cause of the error
319        """
320
321        # default logic to grab error from common fields
322        def _try_get_error(value: Optional[JsonType]) -> Optional[str]:
323            if isinstance(value, str):
324                return value
325            elif isinstance(value, list):
326                errors_in_value = [_try_get_error(v) for v in value]
327                return ", ".join(v for v in errors_in_value if v is not None)
328            elif isinstance(value, dict):
329                new_value = (
330                    value.get("message")
331                    or value.get("messages")
332                    or value.get("error")
333                    or value.get("errors")
334                    or value.get("failures")
335                    or value.get("failure")
336                    or value.get("detail")
337                )
338                return _try_get_error(new_value)
339            return None
340
341        try:
342            body = response.json()
343            return _try_get_error(body)
344        except requests.exceptions.JSONDecodeError:
345            return None

Parses the raw response object from a failed request into a user-friendly error message. By default, this method tries to grab the error message from JSON responses by following common API patterns. Override to parse differently.

Parameters
  • response:
Returns

A user-friendly message that indicates the cause of the error

def get_error_display_message(self, exception: BaseException) -> Optional[str]:
347    def get_error_display_message(self, exception: BaseException) -> Optional[str]:
348        """
349        Retrieves the user-friendly display message that corresponds to an exception.
350        This will be called when encountering an exception while reading records from the stream, and used to build the AirbyteTraceMessage.
351
352        The default implementation of this method only handles HTTPErrors by passing the response to self.parse_response_error_message().
353        The method should be overriden as needed to handle any additional exception types.
354
355        :param exception: The exception that was raised
356        :return: A user-friendly message that indicates the cause of the error
357        """
358        if isinstance(exception, requests.HTTPError) and exception.response is not None:
359            return self.parse_response_error_message(exception.response)
360        return None

Retrieves the user-friendly display message that corresponds to an exception. This will be called when encountering an exception while reading records from the stream, and used to build the AirbyteTraceMessage.

The default implementation of this method only handles HTTPErrors by passing the response to self.parse_response_error_message(). The method should be overriden as needed to handle any additional exception types.

Parameters
  • exception: The exception that was raised
Returns

A user-friendly message that indicates the cause of the error

def read_records( self, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]] = None, stream_slice: Optional[Mapping[str, Any]] = None, stream_state: Optional[Mapping[str, Any]] = None) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
362    def read_records(
363        self,
364        sync_mode: SyncMode,
365        cursor_field: Optional[List[str]] = None,
366        stream_slice: Optional[Mapping[str, Any]] = None,
367        stream_state: Optional[Mapping[str, Any]] = None,
368    ) -> Iterable[StreamData]:
369        # A cursor_field indicates this is an incremental stream which offers better checkpointing than RFR enabled via the cursor
370        if self.cursor_field or not isinstance(self.get_cursor(), ResumableFullRefreshCursor):
371            yield from self._read_pages(
372                lambda req, res, state, _slice: self.parse_response(
373                    res, stream_slice=_slice, stream_state=state
374                ),
375                stream_slice,
376                stream_state,
377            )
378        else:
379            yield from self._read_single_page(
380                lambda req, res, state, _slice: self.parse_response(
381                    res, stream_slice=_slice, stream_state=state
382                ),
383                stream_slice,
384                stream_state,
385            )

This method should be overridden by subclasses to read records based on the inputs

state: MutableMapping[str, Any]
387    @property
388    def state(self) -> MutableMapping[str, Any]:
389        cursor = self.get_cursor()
390        if cursor:
391            return cursor.get_stream_state()  # type: ignore
392        return self._state

State getter, should return state in form that can serialized to a string and send to the output as a STATE AirbyteMessage.

A good example of a state is a cursor_value: { self.cursor_field: "cursor_value" }

State should try to be as small as possible but at the same time descriptive enough to restore syncing process from the point where it stopped.

def get_cursor(self) -> Optional[airbyte_cdk.sources.streams.checkpoint.Cursor]:
401    def get_cursor(self) -> Optional[Cursor]:
402        # I don't love that this is semi-stateful but not sure what else to do. We don't know exactly what type of cursor to
403        # instantiate when creating the class. We can make a few assumptions like if there is a cursor_field which implies
404        # incremental, but we don't know until runtime if this is a substream. Ideally, a stream should explicitly define
405        # its cursor, but because we're trying to automatically apply RFR we're stuck with this logic where we replace the
406        # cursor at runtime once we detect this is a substream based on self.has_multiple_slices being reassigned
407        if self.has_multiple_slices and isinstance(self.cursor, ResumableFullRefreshCursor):
408            self.cursor = SubstreamResumableFullRefreshCursor()
409            return self.cursor
410        else:
411            return self.cursor

A Cursor is an interface that a stream can implement to manage how its internal state is read and updated while reading records. Historically, Python connectors had no concept of a cursor to manage state. Python streams need to define a cursor implementation and override this method to manage state through a Cursor.

def get_log_formatter(self) -> Optional[Callable[[requests.models.Response], Any]]:
554    def get_log_formatter(self) -> Optional[Callable[[requests.Response], Any]]:
555        """
556
557        :return Optional[Callable[[requests.Response], Any]]: Function that will be used in logging inside HttpClient
558        """
559        return None
Returns

Function that will be used in logging inside HttpClient

class HttpSubStream(airbyte_cdk.sources.streams.http.HttpStream, abc.ABC):
562class HttpSubStream(HttpStream, ABC):
563    def __init__(self, parent: HttpStream, **kwargs: Any):
564        """
565        :param parent: should be the instance of HttpStream class
566        """
567        super().__init__(**kwargs)
568        self.parent = parent
569        self.has_multiple_slices = (
570            True  # Substreams are based on parent records which implies there are multiple slices
571        )
572
573        # There are three conditions that dictate if RFR should automatically be applied to a stream
574        # 1. Streams that explicitly initialize their own cursor should defer to it and not automatically apply RFR
575        # 2. Streams with at least one cursor_field are incremental and thus a superior sync to RFR.
576        # 3. Streams overriding read_records() do not guarantee that they will call the parent implementation which can perform
577        #    per-page checkpointing so RFR is only supported if a stream use the default `HttpStream.read_records()` method
578        if (
579            not self.cursor
580            and len(self.cursor_field) == 0
581            and type(self).read_records is HttpStream.read_records
582        ):
583            self.cursor = SubstreamResumableFullRefreshCursor()
584
585    def stream_slices(
586        self,
587        sync_mode: SyncMode,
588        cursor_field: Optional[List[str]] = None,
589        stream_state: Optional[Mapping[str, Any]] = None,
590    ) -> Iterable[Optional[Mapping[str, Any]]]:
591        # read_stateless() assumes the parent is not concurrent. This is currently okay since the concurrent CDK does
592        # not support either substreams or RFR, but something that needs to be considered once we do
593        for parent_record in self.parent.read_only_records(stream_state):
594            # Skip non-records (eg AirbyteLogMessage)
595            if isinstance(parent_record, AirbyteMessage):
596                if parent_record.type == MessageType.RECORD:
597                    parent_record = parent_record.record.data  # type: ignore [assignment, union-attr]  # Incorrect type for assignment
598                else:
599                    continue
600            elif isinstance(parent_record, Record):
601                parent_record = parent_record.data
602            yield {"parent": parent_record}

Base abstract class for an Airbyte Stream using the HTTP protocol. Basic building block for users building an Airbyte source for a HTTP API.

HttpSubStream( parent: HttpStream, **kwargs: Any)
563    def __init__(self, parent: HttpStream, **kwargs: Any):
564        """
565        :param parent: should be the instance of HttpStream class
566        """
567        super().__init__(**kwargs)
568        self.parent = parent
569        self.has_multiple_slices = (
570            True  # Substreams are based on parent records which implies there are multiple slices
571        )
572
573        # There are three conditions that dictate if RFR should automatically be applied to a stream
574        # 1. Streams that explicitly initialize their own cursor should defer to it and not automatically apply RFR
575        # 2. Streams with at least one cursor_field are incremental and thus a superior sync to RFR.
576        # 3. Streams overriding read_records() do not guarantee that they will call the parent implementation which can perform
577        #    per-page checkpointing so RFR is only supported if a stream use the default `HttpStream.read_records()` method
578        if (
579            not self.cursor
580            and len(self.cursor_field) == 0
581            and type(self).read_records is HttpStream.read_records
582        ):
583            self.cursor = SubstreamResumableFullRefreshCursor()
Parameters
  • parent: should be the instance of HttpStream class
parent
has_multiple_slices = False
def stream_slices( self, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None) -> Iterable[Optional[Mapping[str, Any]]]:
585    def stream_slices(
586        self,
587        sync_mode: SyncMode,
588        cursor_field: Optional[List[str]] = None,
589        stream_state: Optional[Mapping[str, Any]] = None,
590    ) -> Iterable[Optional[Mapping[str, Any]]]:
591        # read_stateless() assumes the parent is not concurrent. This is currently okay since the concurrent CDK does
592        # not support either substreams or RFR, but something that needs to be considered once we do
593        for parent_record in self.parent.read_only_records(stream_state):
594            # Skip non-records (eg AirbyteLogMessage)
595            if isinstance(parent_record, AirbyteMessage):
596                if parent_record.type == MessageType.RECORD:
597                    parent_record = parent_record.record.data  # type: ignore [assignment, union-attr]  # Incorrect type for assignment
598                else:
599                    continue
600            elif isinstance(parent_record, Record):
601                parent_record = parent_record.data
602            yield {"parent": parent_record}

Override to define the slices for this stream. See the stream slicing section of the docs for more information.

Parameters
  • sync_mode:
  • cursor_field:
  • stream_state:
Returns
class UserDefinedBackoffException(airbyte_cdk.sources.streams.http.exceptions.BaseBackoffException):
40class UserDefinedBackoffException(BaseBackoffException):
41    """
42    An exception that exposes how long it attempted to backoff
43    """
44
45    def __init__(
46        self,
47        backoff: Union[int, float],
48        request: requests.PreparedRequest,
49        response: Optional[Union[requests.Response, Exception]],
50        error_message: str = "",
51        failure_type: Optional[FailureType] = None,
52    ):
53        """
54        :param backoff: how long to backoff in seconds
55        :param request: the request that triggered this backoff exception
56        :param response: the response that triggered the backoff exception
57        """
58        self.backoff = backoff
59        super().__init__(
60            request=request,
61            response=response,
62            error_message=error_message,
63            failure_type=failure_type,
64        )

An exception that exposes how long it attempted to backoff

UserDefinedBackoffException( backoff: Union[int, float], request: requests.models.PreparedRequest, response: Union[requests.models.Response, Exception, NoneType], error_message: str = '', failure_type: Optional[airbyte_protocol_dataclasses.models.airbyte_protocol.FailureType] = None)
45    def __init__(
46        self,
47        backoff: Union[int, float],
48        request: requests.PreparedRequest,
49        response: Optional[Union[requests.Response, Exception]],
50        error_message: str = "",
51        failure_type: Optional[FailureType] = None,
52    ):
53        """
54        :param backoff: how long to backoff in seconds
55        :param request: the request that triggered this backoff exception
56        :param response: the response that triggered the backoff exception
57        """
58        self.backoff = backoff
59        super().__init__(
60            request=request,
61            response=response,
62            error_message=error_message,
63            failure_type=failure_type,
64        )
Parameters
  • backoff: how long to backoff in seconds
  • request: the request that triggered this backoff exception
  • response: the response that triggered the backoff exception
backoff