airbyte_cdk.sources.streams.http

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5# Initialize Streams Package
 6from .exceptions import UserDefinedBackoffException
 7from .http import HttpStream, HttpSubStream
 8from .http_client import HttpClient
 9
10__all__ = ["HttpClient", "HttpStream", "HttpSubStream", "UserDefinedBackoffException"]
class HttpClient:
 98class HttpClient:
 99    _DEFAULT_MAX_RETRY: int = 5
100    _DEFAULT_MAX_TIME: int = 60 * 10
101    _ACTIONS_TO_RETRY_ON = {ResponseAction.RETRY, ResponseAction.RATE_LIMITED}
102
103    def __init__(
104        self,
105        name: str,
106        logger: logging.Logger,
107        error_handler: Optional[ErrorHandler] = None,
108        api_budget: Optional[APIBudget] = None,
109        session: Optional[Union[requests.Session, requests_cache.CachedSession]] = None,
110        authenticator: Optional[AuthBase] = None,
111        use_cache: bool = False,
112        backoff_strategy: Optional[Union[BackoffStrategy, List[BackoffStrategy]]] = None,
113        error_message_parser: Optional[ErrorMessageParser] = None,
114        disable_retries: bool = False,
115        message_repository: Optional[MessageRepository] = None,
116    ):
117        self._name = name
118        self._api_budget: APIBudget = api_budget or APIBudget(policies=[])
119        if session:
120            self._session = session
121        else:
122            self._use_cache = use_cache
123            self._session = self._request_session()
124            self._session.mount(
125                "https://",
126                requests.adapters.HTTPAdapter(
127                    pool_connections=MAX_CONNECTION_POOL_SIZE, pool_maxsize=MAX_CONNECTION_POOL_SIZE
128                ),
129            )
130        if isinstance(authenticator, AuthBase):
131            self._session.auth = authenticator
132        self._logger = logger
133        self._error_handler = error_handler or HttpStatusErrorHandler(self._logger)
134        if backoff_strategy is not None:
135            if isinstance(backoff_strategy, list):
136                self._backoff_strategies = backoff_strategy
137            else:
138                self._backoff_strategies = [backoff_strategy]
139        else:
140            self._backoff_strategies = [DefaultBackoffStrategy()]
141        self._error_message_parser = error_message_parser or JsonErrorMessageParser()
142        self._request_attempt_count: Dict[requests.PreparedRequest, int] = {}
143        self._disable_retries = disable_retries
144        self._message_repository = message_repository
145
146    @property
147    def cache_filename(self) -> str:
148        """
149        Override if needed. Return the name of cache file
150        Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.
151        """
152        return f"{self._name}.sqlite"
153
154    def _request_session(self) -> requests.Session:
155        """
156        Session factory based on use_cache property and call rate limits (api_budget parameter)
157        :return: instance of request-based session
158        """
159        if self._use_cache:
160            cache_dir = os.getenv(ENV_REQUEST_CACHE_PATH)
161            # Use in-memory cache if cache_dir is not set
162            # This is a non-obvious interface, but it ensures we don't write sql files when running unit tests
163            # Use in-memory cache if cache_dir is not set
164            # This is a non-obvious interface, but it ensures we don't write sql files when running unit tests
165            sqlite_path = (
166                str(Path(cache_dir) / self.cache_filename)
167                if cache_dir
168                else "file::memory:?cache=shared"
169            )
170            # By using `PRAGMA synchronous=OFF` and `PRAGMA journal_mode=WAL`, we reduce the possible occurrences of `database table is locked` errors.
171            # Note that those were blindly added at the same time and one or the other might be sufficient to prevent the issues but we have seen good results with both. Feel free to revisit given more information.
172            # There are strong signals that `fast_save` might create problems but if the sync crashes, we start back from the beginning in terms of sqlite anyway so the impact should be minimal. Signals are:
173            # * https://github.com/requests-cache/requests-cache/commit/7fa89ffda300331c37d8fad7f773348a3b5b0236#diff-f43db4a5edf931647c32dec28ea7557aae4cae8444af4b26c8ecbe88d8c925aaR238
174            # * https://github.com/requests-cache/requests-cache/commit/7fa89ffda300331c37d8fad7f773348a3b5b0236#diff-2e7f95b7d7be270ff1a8118f817ea3e6663cdad273592e536a116c24e6d23c18R164-R168
175            # * `If the application running SQLite crashes, the data will be safe, but the database [might become corrupted](https://www.sqlite.org/howtocorrupt.html#cfgerr) if the operating system crashes or the computer loses power before that data has been written to the disk surface.` in [this description](https://www.sqlite.org/pragma.html#pragma_synchronous).
176            backend = requests_cache.SQLiteCache(sqlite_path, fast_save=True, wal=True)
177            return CachedLimiterSession(
178                cache_name=sqlite_path,
179                backend=backend,
180                api_budget=self._api_budget,
181                match_headers=True,
182            )
183        else:
184            return LimiterSession(api_budget=self._api_budget)
185
186    def clear_cache(self) -> None:
187        """
188        Clear cached requests for current session, can be called any time
189        """
190        if isinstance(self._session, requests_cache.CachedSession):
191            self._session.cache.clear()  # type: ignore # cache.clear is not typed
192
193    def _dedupe_query_params(
194        self, url: str, params: Optional[Mapping[str, str]]
195    ) -> Mapping[str, str]:
196        """
197        Remove query parameters from params mapping if they are already encoded in the URL.
198        :param url: URL with
199        :param params:
200        :return:
201        """
202        if params is None:
203            params = {}
204        query_string = urllib.parse.urlparse(url).query
205        query_dict = {k: v[0] for k, v in urllib.parse.parse_qs(query_string).items()}
206
207        duplicate_keys_with_same_value = {
208            k for k in query_dict.keys() if str(params.get(k)) == str(query_dict[k])
209        }
210        return {k: v for k, v in params.items() if k not in duplicate_keys_with_same_value}
211
212    def _create_prepared_request(
213        self,
214        http_method: str,
215        url: str,
216        dedupe_query_params: bool = False,
217        headers: Optional[Mapping[str, str]] = None,
218        params: Optional[Mapping[str, str]] = None,
219        json: Optional[Mapping[str, Any]] = None,
220        data: Optional[Union[str, Mapping[str, Any]]] = None,
221    ) -> requests.PreparedRequest:
222        if dedupe_query_params:
223            query_params = self._dedupe_query_params(url, params)
224        else:
225            query_params = params or {}
226        args = {"method": http_method, "url": url, "headers": headers, "params": query_params}
227        if http_method.upper() in BODY_REQUEST_METHODS:
228            if json and data:
229                raise RequestBodyException(
230                    "At the same time only one of the 'request_body_data' and 'request_body_json' functions can return data"
231                )
232            elif json:
233                args["json"] = json
234            elif data:
235                args["data"] = data
236        prepared_request: requests.PreparedRequest = self._session.prepare_request(
237            requests.Request(**args)
238        )
239
240        return prepared_request
241
242    @property
243    def _max_retries(self) -> int:
244        """
245        Determines the max retries based on the provided error handler.
246        """
247        max_retries = None
248        if self._disable_retries:
249            max_retries = 0
250        else:
251            max_retries = self._error_handler.max_retries
252        return max_retries if max_retries is not None else self._DEFAULT_MAX_RETRY
253
254    @property
255    def _max_time(self) -> int:
256        """
257        Determines the max time based on the provided error handler.
258        """
259        return (
260            self._error_handler.max_time
261            if self._error_handler.max_time is not None
262            else self._DEFAULT_MAX_TIME
263        )
264
265    def _send_with_retry(
266        self,
267        request: requests.PreparedRequest,
268        request_kwargs: Mapping[str, Any],
269        log_formatter: Optional[Callable[[requests.Response], Any]] = None,
270        exit_on_rate_limit: Optional[bool] = False,
271    ) -> requests.Response:
272        """
273        Sends a request with retry logic.
274
275        Args:
276            request (requests.PreparedRequest): The prepared HTTP request to send.
277            request_kwargs (Mapping[str, Any]): Additional keyword arguments for the request.
278
279        Returns:
280            requests.Response: The HTTP response received from the server after retries.
281        """
282
283        max_retries = self._max_retries
284        max_tries = max(0, max_retries) + 1
285        max_time = self._max_time
286
287        user_backoff_handler = user_defined_backoff_handler(max_tries=max_tries, max_time=max_time)(
288            self._send
289        )
290        rate_limit_backoff_handler = rate_limit_default_backoff_handler(max_tries=max_tries)
291        backoff_handler = http_client_default_backoff_handler(
292            max_tries=max_tries, max_time=max_time
293        )
294        # backoff handlers wrap _send, so it will always return a response
295        response = backoff_handler(rate_limit_backoff_handler(user_backoff_handler))(
296            request,
297            request_kwargs,
298            log_formatter=log_formatter,
299            exit_on_rate_limit=exit_on_rate_limit,
300        )  # type: ignore # mypy can't infer that backoff_handler wraps _send
301
302        return response
303
304    def _send(
305        self,
306        request: requests.PreparedRequest,
307        request_kwargs: Mapping[str, Any],
308        log_formatter: Optional[Callable[[requests.Response], Any]] = None,
309        exit_on_rate_limit: Optional[bool] = False,
310    ) -> requests.Response:
311        if request not in self._request_attempt_count:
312            self._request_attempt_count[request] = 1
313        else:
314            self._request_attempt_count[request] += 1
315            if hasattr(self._session, "auth") and isinstance(self._session.auth, AuthBase):
316                self._session.auth(request)
317
318        self._logger.debug(
319            "Making outbound API request",
320            extra={"headers": request.headers, "url": request.url, "request_body": request.body},
321        )
322
323        response: Optional[requests.Response] = None
324        exc: Optional[requests.RequestException] = None
325
326        try:
327            response = self._session.send(request, **request_kwargs)
328        except requests.RequestException as e:
329            exc = e
330
331        error_resolution: ErrorResolution = self._error_handler.interpret_response(
332            response if response is not None else exc
333        )
334
335        # Evaluation of response.text can be heavy, for example, if streaming a large response
336        # Do it only in debug mode
337        if self._logger.isEnabledFor(logging.DEBUG) and response is not None:
338            if request_kwargs.get("stream"):
339                self._logger.debug(
340                    "Receiving response, but not logging it as the response is streamed",
341                    extra={"headers": response.headers, "status": response.status_code},
342                )
343            else:
344                self._logger.debug(
345                    "Receiving response",
346                    extra={
347                        "headers": response.headers,
348                        "status": response.status_code,
349                        "body": response.text,
350                    },
351                )
352
353        # Request/response logging for declarative cdk
354        if (
355            log_formatter is not None
356            and response is not None
357            and self._message_repository is not None
358        ):
359            formatter = log_formatter
360            self._message_repository.log_message(
361                Level.DEBUG,
362                lambda: formatter(response),
363            )
364
365        self._handle_error_resolution(
366            response=response,
367            exc=exc,
368            request=request,
369            error_resolution=error_resolution,
370            exit_on_rate_limit=exit_on_rate_limit,
371        )
372
373        return response  # type: ignore # will either return a valid response of type requests.Response or raise an exception
374
375    def _get_response_body(self, response: requests.Response) -> Optional[JsonType]:
376        """
377        Extracts and returns the body of an HTTP response.
378
379        This method attempts to parse the response body as JSON. If the response
380        body is not valid JSON, it falls back to decoding the response content
381        as a UTF-8 string. If both attempts fail, it returns None.
382
383        Args:
384            response (requests.Response): The HTTP response object.
385
386        Returns:
387            Optional[JsonType]: The parsed JSON object as a string, the decoded
388            response content as a string, or None if both parsing attempts fail.
389        """
390        try:
391            return str(response.json())
392        except requests.exceptions.JSONDecodeError:
393            try:
394                return response.content.decode("utf-8")
395            except Exception:
396                return "The Content of the Response couldn't be decoded."
397
398    def _evict_key(self, prepared_request: requests.PreparedRequest) -> None:
399        """
400        Addresses high memory consumption when enabling concurrency in https://github.com/airbytehq/oncall/issues/6821.
401
402        The `_request_attempt_count` attribute keeps growing as multiple requests are made using the same `http_client`.
403        To mitigate this issue, we evict keys for completed requests once we confirm that no further retries are needed.
404        This helps manage memory usage more efficiently while maintaining the necessary logic for retry attempts.
405        """
406        if prepared_request in self._request_attempt_count:
407            del self._request_attempt_count[prepared_request]
408
409    def _handle_error_resolution(
410        self,
411        response: Optional[requests.Response],
412        exc: Optional[requests.RequestException],
413        request: requests.PreparedRequest,
414        error_resolution: ErrorResolution,
415        exit_on_rate_limit: Optional[bool] = False,
416    ) -> None:
417        if error_resolution.response_action not in self._ACTIONS_TO_RETRY_ON:
418            self._evict_key(request)
419
420        # Emit stream status RUNNING with the reason RATE_LIMITED to log that the rate limit has been reached
421        if error_resolution.response_action == ResponseAction.RATE_LIMITED:
422            # TODO: Update to handle with message repository when concurrent message repository is ready
423            reasons = [AirbyteStreamStatusReason(type=AirbyteStreamStatusReasonType.RATE_LIMITED)]
424            message = orjson.dumps(
425                AirbyteMessageSerializer.dump(
426                    stream_status_as_airbyte_message(
427                        StreamDescriptor(name=self._name), AirbyteStreamStatus.RUNNING, reasons
428                    )
429                )
430            ).decode()
431
432            # Simply printing the stream status is a temporary solution and can cause future issues. Currently, the _send method is
433            # wrapped with backoff decorators, and we can only emit messages by iterating record_iterator in the abstract source at the
434            # end of the retry decorator behavior. This approach does not allow us to emit messages in the queue before exiting the
435            # backoff retry loop. Adding `\n` to the message and ignore 'end' ensure that few messages are printed at the same time.
436            print(f"{message}\n", end="", flush=True)
437
438        if error_resolution.response_action == ResponseAction.FAIL:
439            if response is not None:
440                filtered_response_message = filter_secrets(
441                    f"Request (body): '{str(request.body)}'. Response (body): '{self._get_response_body(response)}'. Response (headers): '{response.headers}'."
442                )
443                error_message = f"'{request.method}' request to '{request.url}' failed with status code '{response.status_code}' and error message: '{self._error_message_parser.parse_response_error_message(response)}'. {filtered_response_message}"
444            else:
445                error_message = (
446                    f"'{request.method}' request to '{request.url}' failed with exception: '{exc}'"
447                )
448
449            # ensure the exception message is emitted before raised
450            self._logger.error(error_message)
451
452            raise MessageRepresentationAirbyteTracedErrors(
453                internal_message=error_message,
454                message=error_resolution.error_message or error_message,
455                failure_type=error_resolution.failure_type,
456            )
457
458        elif error_resolution.response_action == ResponseAction.IGNORE:
459            if response is not None:
460                log_message = f"Ignoring response for '{request.method}' request to '{request.url}' with response code '{response.status_code}'"
461            else:
462                log_message = f"Ignoring response for '{request.method}' request to '{request.url}' with error '{exc}'"
463
464            self._logger.info(error_resolution.error_message or log_message)
465
466        # TODO: Consider dynamic retry count depending on subsequent error codes
467        elif (
468            error_resolution.response_action == ResponseAction.RETRY
469            or error_resolution.response_action == ResponseAction.RATE_LIMITED
470        ):
471            user_defined_backoff_time = None
472            for backoff_strategy in self._backoff_strategies:
473                backoff_time = backoff_strategy.backoff_time(
474                    response_or_exception=response if response is not None else exc,
475                    attempt_count=self._request_attempt_count[request],
476                )
477                if backoff_time:
478                    user_defined_backoff_time = backoff_time
479                    break
480            error_message = (
481                error_resolution.error_message
482                or f"Request to {request.url} failed with failure type {error_resolution.failure_type}, response action {error_resolution.response_action}."
483            )
484
485            retry_endlessly = (
486                error_resolution.response_action == ResponseAction.RATE_LIMITED
487                and not exit_on_rate_limit
488            )
489
490            if user_defined_backoff_time:
491                raise UserDefinedBackoffException(
492                    backoff=user_defined_backoff_time,
493                    request=request,
494                    response=(response if response is not None else exc),
495                    error_message=error_message,
496                )
497
498            elif retry_endlessly:
499                raise RateLimitBackoffException(
500                    request=request,
501                    response=(response if response is not None else exc),
502                    error_message=error_message,
503                )
504
505            raise DefaultBackoffException(
506                request=request,
507                response=(response if response is not None else exc),
508                error_message=error_message,
509            )
510
511        elif response:
512            try:
513                response.raise_for_status()
514            except requests.HTTPError as e:
515                self._logger.error(response.text)
516                raise e
517
518    @property
519    def name(self) -> str:
520        return self._name
521
522    def send_request(
523        self,
524        http_method: str,
525        url: str,
526        request_kwargs: Mapping[str, Any],
527        headers: Optional[Mapping[str, str]] = None,
528        params: Optional[Mapping[str, str]] = None,
529        json: Optional[Mapping[str, Any]] = None,
530        data: Optional[Union[str, Mapping[str, Any]]] = None,
531        dedupe_query_params: bool = False,
532        log_formatter: Optional[Callable[[requests.Response], Any]] = None,
533        exit_on_rate_limit: Optional[bool] = False,
534    ) -> Tuple[requests.PreparedRequest, requests.Response]:
535        """
536        Prepares and sends request and return request and response objects.
537        """
538
539        request: requests.PreparedRequest = self._create_prepared_request(
540            http_method=http_method,
541            url=url,
542            dedupe_query_params=dedupe_query_params,
543            headers=headers,
544            params=params,
545            json=json,
546            data=data,
547        )
548
549        response: requests.Response = self._send_with_retry(
550            request=request,
551            request_kwargs=request_kwargs,
552            log_formatter=log_formatter,
553            exit_on_rate_limit=exit_on_rate_limit,
554        )
555
556        return request, response
HttpClient( name: str, logger: logging.Logger, error_handler: Optional[airbyte_cdk.sources.streams.http.error_handlers.ErrorHandler] = None, api_budget: Optional[airbyte_cdk.sources.streams.call_rate.APIBudget] = None, session: Union[requests.sessions.Session, requests_cache.session.CachedSession, NoneType] = None, authenticator: Optional[requests.auth.AuthBase] = None, use_cache: bool = False, backoff_strategy: Union[airbyte_cdk.BackoffStrategy, List[airbyte_cdk.BackoffStrategy], NoneType] = None, error_message_parser: Optional[airbyte_cdk.sources.streams.http.error_handlers.ErrorMessageParser] = None, disable_retries: bool = False, message_repository: Optional[airbyte_cdk.MessageRepository] = None)
103    def __init__(
104        self,
105        name: str,
106        logger: logging.Logger,
107        error_handler: Optional[ErrorHandler] = None,
108        api_budget: Optional[APIBudget] = None,
109        session: Optional[Union[requests.Session, requests_cache.CachedSession]] = None,
110        authenticator: Optional[AuthBase] = None,
111        use_cache: bool = False,
112        backoff_strategy: Optional[Union[BackoffStrategy, List[BackoffStrategy]]] = None,
113        error_message_parser: Optional[ErrorMessageParser] = None,
114        disable_retries: bool = False,
115        message_repository: Optional[MessageRepository] = None,
116    ):
117        self._name = name
118        self._api_budget: APIBudget = api_budget or APIBudget(policies=[])
119        if session:
120            self._session = session
121        else:
122            self._use_cache = use_cache
123            self._session = self._request_session()
124            self._session.mount(
125                "https://",
126                requests.adapters.HTTPAdapter(
127                    pool_connections=MAX_CONNECTION_POOL_SIZE, pool_maxsize=MAX_CONNECTION_POOL_SIZE
128                ),
129            )
130        if isinstance(authenticator, AuthBase):
131            self._session.auth = authenticator
132        self._logger = logger
133        self._error_handler = error_handler or HttpStatusErrorHandler(self._logger)
134        if backoff_strategy is not None:
135            if isinstance(backoff_strategy, list):
136                self._backoff_strategies = backoff_strategy
137            else:
138                self._backoff_strategies = [backoff_strategy]
139        else:
140            self._backoff_strategies = [DefaultBackoffStrategy()]
141        self._error_message_parser = error_message_parser or JsonErrorMessageParser()
142        self._request_attempt_count: Dict[requests.PreparedRequest, int] = {}
143        self._disable_retries = disable_retries
144        self._message_repository = message_repository
cache_filename: str
146    @property
147    def cache_filename(self) -> str:
148        """
149        Override if needed. Return the name of cache file
150        Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.
151        """
152        return f"{self._name}.sqlite"

Override if needed. Return the name of cache file Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.

def clear_cache(self) -> None:
186    def clear_cache(self) -> None:
187        """
188        Clear cached requests for current session, can be called any time
189        """
190        if isinstance(self._session, requests_cache.CachedSession):
191            self._session.cache.clear()  # type: ignore # cache.clear is not typed

Clear cached requests for current session, can be called any time

name: str
518    @property
519    def name(self) -> str:
520        return self._name
def send_request( self, http_method: str, url: str, request_kwargs: Mapping[str, Any], headers: Optional[Mapping[str, str]] = None, params: Optional[Mapping[str, str]] = None, json: Optional[Mapping[str, Any]] = None, data: Union[str, Mapping[str, Any], NoneType] = None, dedupe_query_params: bool = False, log_formatter: Optional[Callable[[requests.models.Response], Any]] = None, exit_on_rate_limit: Optional[bool] = False) -> Tuple[requests.models.PreparedRequest, requests.models.Response]:
522    def send_request(
523        self,
524        http_method: str,
525        url: str,
526        request_kwargs: Mapping[str, Any],
527        headers: Optional[Mapping[str, str]] = None,
528        params: Optional[Mapping[str, str]] = None,
529        json: Optional[Mapping[str, Any]] = None,
530        data: Optional[Union[str, Mapping[str, Any]]] = None,
531        dedupe_query_params: bool = False,
532        log_formatter: Optional[Callable[[requests.Response], Any]] = None,
533        exit_on_rate_limit: Optional[bool] = False,
534    ) -> Tuple[requests.PreparedRequest, requests.Response]:
535        """
536        Prepares and sends request and return request and response objects.
537        """
538
539        request: requests.PreparedRequest = self._create_prepared_request(
540            http_method=http_method,
541            url=url,
542            dedupe_query_params=dedupe_query_params,
543            headers=headers,
544            params=params,
545            json=json,
546            data=data,
547        )
548
549        response: requests.Response = self._send_with_retry(
550            request=request,
551            request_kwargs=request_kwargs,
552            log_formatter=log_formatter,
553            exit_on_rate_limit=exit_on_rate_limit,
554        )
555
556        return request, response

Prepares and sends request and return request and response objects.

 45class HttpStream(Stream, CheckpointMixin, ABC):
 46    """
 47    Base abstract class for an Airbyte Stream using the HTTP protocol. Basic building block for users building an Airbyte source for a HTTP API.
 48    """
 49
 50    source_defined_cursor = True  # Most HTTP streams use a source defined cursor (i.e: the user can't configure it like on a SQL table)
 51    page_size: Optional[int] = (
 52        None  # Use this variable to define page size for API http requests with pagination support
 53    )
 54
 55    def __init__(
 56        self, authenticator: Optional[AuthBase] = None, api_budget: Optional[APIBudget] = None
 57    ):
 58        self._exit_on_rate_limit: bool = False
 59        self._http_client = HttpClient(
 60            name=self.name,
 61            logger=self.logger,
 62            error_handler=self.get_error_handler(),
 63            api_budget=api_budget or APIBudget(policies=[]),
 64            authenticator=authenticator,
 65            use_cache=self.use_cache,
 66            backoff_strategy=self.get_backoff_strategy(),
 67            message_repository=InMemoryMessageRepository(),
 68        )
 69
 70        # There are three conditions that dictate if RFR should automatically be applied to a stream
 71        # 1. Streams that explicitly initialize their own cursor should defer to it and not automatically apply RFR
 72        # 2. Streams with at least one cursor_field are incremental and thus a superior sync to RFR.
 73        # 3. Streams overriding read_records() do not guarantee that they will call the parent implementation which can perform
 74        #    per-page checkpointing so RFR is only supported if a stream use the default `HttpStream.read_records()` method
 75        if (
 76            not self.cursor
 77            and len(self.cursor_field) == 0
 78            and type(self).read_records is HttpStream.read_records
 79        ):
 80            self.cursor = ResumableFullRefreshCursor()
 81
 82    @property
 83    def exit_on_rate_limit(self) -> bool:
 84        """
 85        :return: False if the stream will retry endlessly when rate limited
 86        """
 87        return self._exit_on_rate_limit
 88
 89    @exit_on_rate_limit.setter
 90    def exit_on_rate_limit(self, value: bool) -> None:
 91        self._exit_on_rate_limit = value
 92
 93    @property
 94    def cache_filename(self) -> str:
 95        """
 96        Override if needed. Return the name of cache file
 97        Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.
 98        """
 99        return f"{self.name}.sqlite"
100
101    @property
102    def use_cache(self) -> bool:
103        """
104        Override if needed. If True, all records will be cached.
105        Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.
106        """
107        return False
108
109    @property
110    @abstractmethod
111    def url_base(self) -> str:
112        """
113        :return: URL base for the  API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"
114        """
115
116    @property
117    def http_method(self) -> str:
118        """
119        Override if needed. See get_request_data/get_request_json if using POST/PUT/PATCH.
120        """
121        return "GET"
122
123    @property
124    @deprecated(
125        "Deprecated as of CDK version 3.0.0. "
126        "You should set error_handler explicitly in HttpStream.get_error_handler() instead."
127    )
128    def raise_on_http_errors(self) -> bool:
129        """
130        Override if needed. If set to False, allows opting-out of raising HTTP code exception.
131        """
132        return True
133
134    @property
135    @deprecated(
136        "Deprecated as of CDK version 3.0.0. "
137        "You should set backoff_strategies explicitly in HttpStream.get_backoff_strategy() instead."
138    )
139    def max_retries(self) -> Union[int, None]:
140        """
141        Override if needed. Specifies maximum amount of retries for backoff policy. Return None for no limit.
142        """
143        return 5
144
145    @property
146    @deprecated(
147        "Deprecated as of CDK version 3.0.0. "
148        "You should set backoff_strategies explicitly in HttpStream.get_backoff_strategy() instead."
149    )
150    def max_time(self) -> Union[int, None]:
151        """
152        Override if needed. Specifies maximum total waiting time (in seconds) for backoff policy. Return None for no limit.
153        """
154        return 60 * 10
155
156    @property
157    @deprecated(
158        "Deprecated as of CDK version 3.0.0. "
159        "You should set backoff_strategies explicitly in HttpStream.get_backoff_strategy() instead."
160    )
161    def retry_factor(self) -> float:
162        """
163        Override if needed. Specifies factor for backoff policy.
164        """
165        return 5
166
167    @abstractmethod
168    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
169        """
170        Override this method to define a pagination strategy.
171
172        The value returned from this method is passed to most other methods in this class. Use it to form a request e.g: set headers or query params.
173
174        :return: The token for the next page from the input response object. Returning None means there are no more pages to read in this response.
175        """
176
177    @abstractmethod
178    def path(
179        self,
180        *,
181        stream_state: Optional[Mapping[str, Any]] = None,
182        stream_slice: Optional[Mapping[str, Any]] = None,
183        next_page_token: Optional[Mapping[str, Any]] = None,
184    ) -> str:
185        """
186        Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity"
187        """
188
189    def request_params(
190        self,
191        stream_state: Optional[Mapping[str, Any]],
192        stream_slice: Optional[Mapping[str, Any]] = None,
193        next_page_token: Optional[Mapping[str, Any]] = None,
194    ) -> MutableMapping[str, Any]:
195        """
196        Override this method to define the query parameters that should be set on an outgoing HTTP request given the inputs.
197
198        E.g: you might want to define query parameters for paging if next_page_token is not None.
199        """
200        return {}
201
202    def request_headers(
203        self,
204        stream_state: Optional[Mapping[str, Any]],
205        stream_slice: Optional[Mapping[str, Any]] = None,
206        next_page_token: Optional[Mapping[str, Any]] = None,
207    ) -> Mapping[str, Any]:
208        """
209        Override to return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
210        """
211        return {}
212
213    def request_body_data(
214        self,
215        stream_state: Optional[Mapping[str, Any]],
216        stream_slice: Optional[Mapping[str, Any]] = None,
217        next_page_token: Optional[Mapping[str, Any]] = None,
218    ) -> Optional[Union[Mapping[str, Any], str]]:
219        """
220        Override when creating POST/PUT/PATCH requests to populate the body of the request with a non-JSON payload.
221
222        If returns a ready text that it will be sent as is.
223        If returns a dict that it will be converted to a urlencoded form.
224        E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
225
226        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
227        """
228        return None
229
230    def request_body_json(
231        self,
232        stream_state: Optional[Mapping[str, Any]],
233        stream_slice: Optional[Mapping[str, Any]] = None,
234        next_page_token: Optional[Mapping[str, Any]] = None,
235    ) -> Optional[Mapping[str, Any]]:
236        """
237        Override when creating POST/PUT/PATCH requests to populate the body of the request with a JSON payload.
238
239        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
240        """
241        return None
242
243    def request_kwargs(
244        self,
245        stream_state: Optional[Mapping[str, Any]],
246        stream_slice: Optional[Mapping[str, Any]] = None,
247        next_page_token: Optional[Mapping[str, Any]] = None,
248    ) -> Mapping[str, Any]:
249        """
250        Override to return a mapping of keyword arguments to be used when creating the HTTP request.
251        Any option listed in https://docs.python-requests.org/en/latest/api/#requests.adapters.BaseAdapter.send for can be returned from
252        this method. Note that these options do not conflict with request-level options such as headers, request params, etc..
253        """
254        return {}
255
256    @abstractmethod
257    def parse_response(
258        self,
259        response: requests.Response,
260        *,
261        stream_state: Mapping[str, Any],
262        stream_slice: Optional[Mapping[str, Any]] = None,
263        next_page_token: Optional[Mapping[str, Any]] = None,
264    ) -> Iterable[Mapping[str, Any]]:
265        """
266        Parses the raw response object into a list of records.
267        By default, this returns an iterable containing the input. Override to parse differently.
268        :param response:
269        :param stream_state:
270        :param stream_slice:
271        :param next_page_token:
272        :return: An iterable containing the parsed response
273        """
274
275    def get_backoff_strategy(self) -> Optional[Union[BackoffStrategy, List[BackoffStrategy]]]:
276        """
277        Used to initialize Adapter to avoid breaking changes.
278        If Stream has a `backoff_time` method implementation, we know this stream uses old (pre-HTTPClient) backoff handlers and thus an adapter is needed.
279
280        Override to provide custom BackoffStrategy
281        :return Optional[BackoffStrategy]:
282        """
283        if hasattr(self, "backoff_time"):
284            return HttpStreamAdapterBackoffStrategy(self)
285        else:
286            return None
287
288    def get_error_handler(self) -> Optional[ErrorHandler]:
289        """
290        Used to initialize Adapter to avoid breaking changes.
291        If Stream has a `should_retry` method implementation, we know this stream uses old (pre-HTTPClient) error handlers and thus an adapter is needed.
292
293        Override to provide custom ErrorHandler
294        :return Optional[ErrorHandler]:
295        """
296        if hasattr(self, "should_retry"):
297            error_handler = HttpStreamAdapterHttpStatusErrorHandler(
298                stream=self,
299                logger=logging.getLogger(),
300                max_retries=self.max_retries,
301                max_time=timedelta(seconds=self.max_time or 0),
302            )
303            return error_handler
304        else:
305            return None
306
307    @classmethod
308    def _join_url(cls, url_base: str, path: str) -> str:
309        return urljoin(url_base, path)
310
311    @classmethod
312    def parse_response_error_message(cls, response: requests.Response) -> Optional[str]:
313        """
314        Parses the raw response object from a failed request into a user-friendly error message.
315        By default, this method tries to grab the error message from JSON responses by following common API patterns. Override to parse differently.
316
317        :param response:
318        :return: A user-friendly message that indicates the cause of the error
319        """
320
321        # default logic to grab error from common fields
322        def _try_get_error(value: Optional[JsonType]) -> Optional[str]:
323            if isinstance(value, str):
324                return value
325            elif isinstance(value, list):
326                errors_in_value = [_try_get_error(v) for v in value]
327                return ", ".join(v for v in errors_in_value if v is not None)
328            elif isinstance(value, dict):
329                new_value = (
330                    value.get("message")
331                    or value.get("messages")
332                    or value.get("error")
333                    or value.get("errors")
334                    or value.get("failures")
335                    or value.get("failure")
336                    or value.get("detail")
337                )
338                return _try_get_error(new_value)
339            return None
340
341        try:
342            body = response.json()
343            return _try_get_error(body)
344        except requests.exceptions.JSONDecodeError:
345            return None
346
347    def get_error_display_message(self, exception: BaseException) -> Optional[str]:
348        """
349        Retrieves the user-friendly display message that corresponds to an exception.
350        This will be called when encountering an exception while reading records from the stream, and used to build the AirbyteTraceMessage.
351
352        The default implementation of this method only handles HTTPErrors by passing the response to self.parse_response_error_message().
353        The method should be overriden as needed to handle any additional exception types.
354
355        :param exception: The exception that was raised
356        :return: A user-friendly message that indicates the cause of the error
357        """
358        if isinstance(exception, requests.HTTPError) and exception.response is not None:
359            return self.parse_response_error_message(exception.response)
360        return None
361
362    def read_records(
363        self,
364        sync_mode: SyncMode,
365        cursor_field: Optional[List[str]] = None,
366        stream_slice: Optional[Mapping[str, Any]] = None,
367        stream_state: Optional[Mapping[str, Any]] = None,
368    ) -> Iterable[StreamData]:
369        # A cursor_field indicates this is an incremental stream which offers better checkpointing than RFR enabled via the cursor
370        if self.cursor_field or not isinstance(self.get_cursor(), ResumableFullRefreshCursor):
371            yield from self._read_pages(
372                lambda req, res, state, _slice: self.parse_response(
373                    res, stream_slice=_slice, stream_state=state
374                ),
375                stream_slice,
376                stream_state,
377            )
378        else:
379            yield from self._read_single_page(
380                lambda req, res, state, _slice: self.parse_response(
381                    res, stream_slice=_slice, stream_state=state
382                ),
383                stream_slice,
384                stream_state,
385            )
386
387    @property
388    def state(self) -> MutableMapping[str, Any]:
389        cursor = self.get_cursor()
390        if cursor:
391            return cursor.get_stream_state()  # type: ignore
392        return self._state
393
394    @state.setter
395    def state(self, value: MutableMapping[str, Any]) -> None:
396        cursor = self.get_cursor()
397        if cursor:
398            cursor.set_initial_state(value)
399        self._state = value
400
401    def get_cursor(self) -> Optional[Cursor]:
402        # I don't love that this is semi-stateful but not sure what else to do. We don't know exactly what type of cursor to
403        # instantiate when creating the class. We can make a few assumptions like if there is a cursor_field which implies
404        # incremental, but we don't know until runtime if this is a substream. Ideally, a stream should explicitly define
405        # its cursor, but because we're trying to automatically apply RFR we're stuck with this logic where we replace the
406        # cursor at runtime once we detect this is a substream based on self.has_multiple_slices being reassigned
407        if self.has_multiple_slices and isinstance(self.cursor, ResumableFullRefreshCursor):
408            self.cursor = SubstreamResumableFullRefreshCursor()
409            return self.cursor
410        else:
411            return self.cursor
412
413    def _read_pages(
414        self,
415        records_generator_fn: Callable[
416            [
417                requests.PreparedRequest,
418                requests.Response,
419                Mapping[str, Any],
420                Optional[Mapping[str, Any]],
421            ],
422            Iterable[StreamData],
423        ],
424        stream_slice: Optional[Mapping[str, Any]] = None,
425        stream_state: Optional[Mapping[str, Any]] = None,
426    ) -> Iterable[StreamData]:
427        stream_state = stream_state or {}
428        pagination_complete = False
429        next_page_token = None
430        while not pagination_complete:
431            request, response = self._fetch_next_page(stream_slice, stream_state, next_page_token)
432            yield from records_generator_fn(request, response, stream_state, stream_slice)
433
434            next_page_token = self.next_page_token(response)
435            if not next_page_token:
436                pagination_complete = True
437
438        cursor = self.get_cursor()
439        if cursor and isinstance(cursor, SubstreamResumableFullRefreshCursor):
440            partition, _, _ = self._extract_slice_fields(stream_slice=stream_slice)
441            # Substreams checkpoint state by marking an entire parent partition as completed so that on the subsequent attempt
442            # after a failure, completed parents are skipped and the sync can make progress
443            cursor.close_slice(StreamSlice(cursor_slice={}, partition=partition))
444
445        # Always return an empty generator just in case no records were ever yielded
446        yield from []
447
448    def _read_single_page(
449        self,
450        records_generator_fn: Callable[
451            [
452                requests.PreparedRequest,
453                requests.Response,
454                Mapping[str, Any],
455                Optional[Mapping[str, Any]],
456            ],
457            Iterable[StreamData],
458        ],
459        stream_slice: Optional[Mapping[str, Any]] = None,
460        stream_state: Optional[Mapping[str, Any]] = None,
461    ) -> Iterable[StreamData]:
462        partition, cursor_slice, remaining_slice = self._extract_slice_fields(
463            stream_slice=stream_slice
464        )
465        stream_state = stream_state or {}
466        next_page_token = cursor_slice or None
467
468        request, response = self._fetch_next_page(remaining_slice, stream_state, next_page_token)
469        yield from records_generator_fn(request, response, stream_state, remaining_slice)
470
471        next_page_token = self.next_page_token(response) or {
472            "__ab_full_refresh_sync_complete": True
473        }
474
475        cursor = self.get_cursor()
476        if cursor:
477            cursor.close_slice(StreamSlice(cursor_slice=next_page_token, partition=partition))
478
479        # Always return an empty generator just in case no records were ever yielded
480        yield from []
481
482    @staticmethod
483    def _extract_slice_fields(
484        stream_slice: Optional[Mapping[str, Any]],
485    ) -> tuple[Mapping[str, Any], Mapping[str, Any], Mapping[str, Any]]:
486        if not stream_slice:
487            return {}, {}, {}
488
489        if isinstance(stream_slice, StreamSlice):
490            partition = stream_slice.partition
491            cursor_slice = stream_slice.cursor_slice
492            remaining = {k: v for k, v in stream_slice.items()}
493        else:
494            # RFR streams that implement stream_slices() to generate stream slices in the legacy mapping format are converted into a
495            # structured stream slice mapping by the LegacyCursorBasedCheckpointReader. The structured mapping object has separate
496            # fields for the partition and cursor_slice value
497            partition = stream_slice.get("partition", {})
498            cursor_slice = stream_slice.get("cursor_slice", {})
499            remaining = {
500                key: val
501                for key, val in stream_slice.items()
502                if key != "partition" and key != "cursor_slice"
503            }
504        return partition, cursor_slice, remaining
505
506    def _fetch_next_page(
507        self,
508        stream_slice: Optional[Mapping[str, Any]] = None,
509        stream_state: Optional[Mapping[str, Any]] = None,
510        next_page_token: Optional[Mapping[str, Any]] = None,
511    ) -> Tuple[requests.PreparedRequest, requests.Response]:
512        request, response = self._http_client.send_request(
513            http_method=self.http_method,
514            url=self._join_url(
515                self.url_base,
516                self.path(
517                    stream_state=stream_state,
518                    stream_slice=stream_slice,
519                    next_page_token=next_page_token,
520                ),
521            ),
522            request_kwargs=self.request_kwargs(
523                stream_state=stream_state,
524                stream_slice=stream_slice,
525                next_page_token=next_page_token,
526            ),
527            headers=self.request_headers(
528                stream_state=stream_state,
529                stream_slice=stream_slice,
530                next_page_token=next_page_token,
531            ),
532            params=self.request_params(
533                stream_state=stream_state,
534                stream_slice=stream_slice,
535                next_page_token=next_page_token,
536            ),
537            json=self.request_body_json(
538                stream_state=stream_state,
539                stream_slice=stream_slice,
540                next_page_token=next_page_token,
541            ),
542            data=self.request_body_data(
543                stream_state=stream_state,
544                stream_slice=stream_slice,
545                next_page_token=next_page_token,
546            ),
547            dedupe_query_params=True,
548            log_formatter=self.get_log_formatter(),
549            exit_on_rate_limit=self.exit_on_rate_limit,
550        )
551
552        return request, response
553
554    def get_log_formatter(self) -> Optional[Callable[[requests.Response], Any]]:
555        """
556
557        :return Optional[Callable[[requests.Response], Any]]: Function that will be used in logging inside HttpClient
558        """
559        return None

Base abstract class for an Airbyte Stream using the HTTP protocol. Basic building block for users building an Airbyte source for a HTTP API.

source_defined_cursor = True

Return False if the cursor can be configured by the user.

page_size: Optional[int] = None
exit_on_rate_limit: bool
82    @property
83    def exit_on_rate_limit(self) -> bool:
84        """
85        :return: False if the stream will retry endlessly when rate limited
86        """
87        return self._exit_on_rate_limit
Returns

False if the stream will retry endlessly when rate limited

cache_filename: str
93    @property
94    def cache_filename(self) -> str:
95        """
96        Override if needed. Return the name of cache file
97        Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.
98        """
99        return f"{self.name}.sqlite"

Override if needed. Return the name of cache file Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.

use_cache: bool
101    @property
102    def use_cache(self) -> bool:
103        """
104        Override if needed. If True, all records will be cached.
105        Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.
106        """
107        return False

Override if needed. If True, all records will be cached. Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.

url_base: str
109    @property
110    @abstractmethod
111    def url_base(self) -> str:
112        """
113        :return: URL base for the  API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"
114        """
Returns

URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"

http_method: str
116    @property
117    def http_method(self) -> str:
118        """
119        Override if needed. See get_request_data/get_request_json if using POST/PUT/PATCH.
120        """
121        return "GET"

Override if needed. See get_request_data/get_request_json if using POST/PUT/PATCH.

raise_on_http_errors: bool
123    @property
124    @deprecated(
125        "Deprecated as of CDK version 3.0.0. "
126        "You should set error_handler explicitly in HttpStream.get_error_handler() instead."
127    )
128    def raise_on_http_errors(self) -> bool:
129        """
130        Override if needed. If set to False, allows opting-out of raising HTTP code exception.
131        """
132        return True

Override if needed. If set to False, allows opting-out of raising HTTP code exception.

max_retries: Optional[int]
134    @property
135    @deprecated(
136        "Deprecated as of CDK version 3.0.0. "
137        "You should set backoff_strategies explicitly in HttpStream.get_backoff_strategy() instead."
138    )
139    def max_retries(self) -> Union[int, None]:
140        """
141        Override if needed. Specifies maximum amount of retries for backoff policy. Return None for no limit.
142        """
143        return 5

Override if needed. Specifies maximum amount of retries for backoff policy. Return None for no limit.

max_time: Optional[int]
145    @property
146    @deprecated(
147        "Deprecated as of CDK version 3.0.0. "
148        "You should set backoff_strategies explicitly in HttpStream.get_backoff_strategy() instead."
149    )
150    def max_time(self) -> Union[int, None]:
151        """
152        Override if needed. Specifies maximum total waiting time (in seconds) for backoff policy. Return None for no limit.
153        """
154        return 60 * 10

Override if needed. Specifies maximum total waiting time (in seconds) for backoff policy. Return None for no limit.

retry_factor: float
156    @property
157    @deprecated(
158        "Deprecated as of CDK version 3.0.0. "
159        "You should set backoff_strategies explicitly in HttpStream.get_backoff_strategy() instead."
160    )
161    def retry_factor(self) -> float:
162        """
163        Override if needed. Specifies factor for backoff policy.
164        """
165        return 5

Override if needed. Specifies factor for backoff policy.

@abstractmethod
def next_page_token(self, response: requests.models.Response) -> Optional[Mapping[str, Any]]:
167    @abstractmethod
168    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
169        """
170        Override this method to define a pagination strategy.
171
172        The value returned from this method is passed to most other methods in this class. Use it to form a request e.g: set headers or query params.
173
174        :return: The token for the next page from the input response object. Returning None means there are no more pages to read in this response.
175        """

Override this method to define a pagination strategy.

The value returned from this method is passed to most other methods in this class. Use it to form a request e.g: set headers or query params.

Returns

The token for the next page from the input response object. Returning None means there are no more pages to read in this response.

@abstractmethod
def path( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> str:
177    @abstractmethod
178    def path(
179        self,
180        *,
181        stream_state: Optional[Mapping[str, Any]] = None,
182        stream_slice: Optional[Mapping[str, Any]] = None,
183        next_page_token: Optional[Mapping[str, Any]] = None,
184    ) -> str:
185        """
186        Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity"
187        """

Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity"

def request_params( self, stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> MutableMapping[str, Any]:
189    def request_params(
190        self,
191        stream_state: Optional[Mapping[str, Any]],
192        stream_slice: Optional[Mapping[str, Any]] = None,
193        next_page_token: Optional[Mapping[str, Any]] = None,
194    ) -> MutableMapping[str, Any]:
195        """
196        Override this method to define the query parameters that should be set on an outgoing HTTP request given the inputs.
197
198        E.g: you might want to define query parameters for paging if next_page_token is not None.
199        """
200        return {}

Override this method to define the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def request_headers( self, stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
202    def request_headers(
203        self,
204        stream_state: Optional[Mapping[str, Any]],
205        stream_slice: Optional[Mapping[str, Any]] = None,
206        next_page_token: Optional[Mapping[str, Any]] = None,
207    ) -> Mapping[str, Any]:
208        """
209        Override to return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
210        """
211        return {}

Override to return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def request_body_data( self, stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Union[str, Mapping[str, Any], NoneType]:
213    def request_body_data(
214        self,
215        stream_state: Optional[Mapping[str, Any]],
216        stream_slice: Optional[Mapping[str, Any]] = None,
217        next_page_token: Optional[Mapping[str, Any]] = None,
218    ) -> Optional[Union[Mapping[str, Any], str]]:
219        """
220        Override when creating POST/PUT/PATCH requests to populate the body of the request with a non-JSON payload.
221
222        If returns a ready text that it will be sent as is.
223        If returns a dict that it will be converted to a urlencoded form.
224        E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
225
226        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
227        """
228        return None

Override when creating POST/PUT/PATCH requests to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def request_body_json( self, stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Optional[Mapping[str, Any]]:
230    def request_body_json(
231        self,
232        stream_state: Optional[Mapping[str, Any]],
233        stream_slice: Optional[Mapping[str, Any]] = None,
234        next_page_token: Optional[Mapping[str, Any]] = None,
235    ) -> Optional[Mapping[str, Any]]:
236        """
237        Override when creating POST/PUT/PATCH requests to populate the body of the request with a JSON payload.
238
239        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
240        """
241        return None

Override when creating POST/PUT/PATCH requests to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def request_kwargs( self, stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
243    def request_kwargs(
244        self,
245        stream_state: Optional[Mapping[str, Any]],
246        stream_slice: Optional[Mapping[str, Any]] = None,
247        next_page_token: Optional[Mapping[str, Any]] = None,
248    ) -> Mapping[str, Any]:
249        """
250        Override to return a mapping of keyword arguments to be used when creating the HTTP request.
251        Any option listed in https://docs.python-requests.org/en/latest/api/#requests.adapters.BaseAdapter.send for can be returned from
252        this method. Note that these options do not conflict with request-level options such as headers, request params, etc..
253        """
254        return {}

Override to return a mapping of keyword arguments to be used when creating the HTTP request. Any option listed in https://docs.python-requests.org/en/latest/api/#requests.adapters.BaseAdapter.send for can be returned from this method. Note that these options do not conflict with request-level options such as headers, request params, etc..

@abstractmethod
def parse_response( self, response: requests.models.Response, *, stream_state: Mapping[str, Any], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Iterable[Mapping[str, Any]]:
256    @abstractmethod
257    def parse_response(
258        self,
259        response: requests.Response,
260        *,
261        stream_state: Mapping[str, Any],
262        stream_slice: Optional[Mapping[str, Any]] = None,
263        next_page_token: Optional[Mapping[str, Any]] = None,
264    ) -> Iterable[Mapping[str, Any]]:
265        """
266        Parses the raw response object into a list of records.
267        By default, this returns an iterable containing the input. Override to parse differently.
268        :param response:
269        :param stream_state:
270        :param stream_slice:
271        :param next_page_token:
272        :return: An iterable containing the parsed response
273        """

Parses the raw response object into a list of records. By default, this returns an iterable containing the input. Override to parse differently.

Parameters
  • response:
  • stream_state:
  • stream_slice:
  • next_page_token:
Returns

An iterable containing the parsed response

def get_backoff_strategy( self) -> Union[airbyte_cdk.BackoffStrategy, List[airbyte_cdk.BackoffStrategy], NoneType]:
275    def get_backoff_strategy(self) -> Optional[Union[BackoffStrategy, List[BackoffStrategy]]]:
276        """
277        Used to initialize Adapter to avoid breaking changes.
278        If Stream has a `backoff_time` method implementation, we know this stream uses old (pre-HTTPClient) backoff handlers and thus an adapter is needed.
279
280        Override to provide custom BackoffStrategy
281        :return Optional[BackoffStrategy]:
282        """
283        if hasattr(self, "backoff_time"):
284            return HttpStreamAdapterBackoffStrategy(self)
285        else:
286            return None

Used to initialize Adapter to avoid breaking changes. If Stream has a backoff_time method implementation, we know this stream uses old (pre-HTTPClient) backoff handlers and thus an adapter is needed.

Override to provide custom BackoffStrategy

Returns
def get_error_handler( self) -> Optional[airbyte_cdk.sources.streams.http.error_handlers.ErrorHandler]:
288    def get_error_handler(self) -> Optional[ErrorHandler]:
289        """
290        Used to initialize Adapter to avoid breaking changes.
291        If Stream has a `should_retry` method implementation, we know this stream uses old (pre-HTTPClient) error handlers and thus an adapter is needed.
292
293        Override to provide custom ErrorHandler
294        :return Optional[ErrorHandler]:
295        """
296        if hasattr(self, "should_retry"):
297            error_handler = HttpStreamAdapterHttpStatusErrorHandler(
298                stream=self,
299                logger=logging.getLogger(),
300                max_retries=self.max_retries,
301                max_time=timedelta(seconds=self.max_time or 0),
302            )
303            return error_handler
304        else:
305            return None

Used to initialize Adapter to avoid breaking changes. If Stream has a should_retry method implementation, we know this stream uses old (pre-HTTPClient) error handlers and thus an adapter is needed.

Override to provide custom ErrorHandler

Returns
@classmethod
def parse_response_error_message(cls, response: requests.models.Response) -> Optional[str]:
311    @classmethod
312    def parse_response_error_message(cls, response: requests.Response) -> Optional[str]:
313        """
314        Parses the raw response object from a failed request into a user-friendly error message.
315        By default, this method tries to grab the error message from JSON responses by following common API patterns. Override to parse differently.
316
317        :param response:
318        :return: A user-friendly message that indicates the cause of the error
319        """
320
321        # default logic to grab error from common fields
322        def _try_get_error(value: Optional[JsonType]) -> Optional[str]:
323            if isinstance(value, str):
324                return value
325            elif isinstance(value, list):
326                errors_in_value = [_try_get_error(v) for v in value]
327                return ", ".join(v for v in errors_in_value if v is not None)
328            elif isinstance(value, dict):
329                new_value = (
330                    value.get("message")
331                    or value.get("messages")
332                    or value.get("error")
333                    or value.get("errors")
334                    or value.get("failures")
335                    or value.get("failure")
336                    or value.get("detail")
337                )
338                return _try_get_error(new_value)
339            return None
340
341        try:
342            body = response.json()
343            return _try_get_error(body)
344        except requests.exceptions.JSONDecodeError:
345            return None

Parses the raw response object from a failed request into a user-friendly error message. By default, this method tries to grab the error message from JSON responses by following common API patterns. Override to parse differently.

Parameters
  • response:
Returns

A user-friendly message that indicates the cause of the error

def get_error_display_message(self, exception: BaseException) -> Optional[str]:
347    def get_error_display_message(self, exception: BaseException) -> Optional[str]:
348        """
349        Retrieves the user-friendly display message that corresponds to an exception.
350        This will be called when encountering an exception while reading records from the stream, and used to build the AirbyteTraceMessage.
351
352        The default implementation of this method only handles HTTPErrors by passing the response to self.parse_response_error_message().
353        The method should be overriden as needed to handle any additional exception types.
354
355        :param exception: The exception that was raised
356        :return: A user-friendly message that indicates the cause of the error
357        """
358        if isinstance(exception, requests.HTTPError) and exception.response is not None:
359            return self.parse_response_error_message(exception.response)
360        return None

Retrieves the user-friendly display message that corresponds to an exception. This will be called when encountering an exception while reading records from the stream, and used to build the AirbyteTraceMessage.

The default implementation of this method only handles HTTPErrors by passing the response to self.parse_response_error_message(). The method should be overriden as needed to handle any additional exception types.

Parameters
  • exception: The exception that was raised
Returns

A user-friendly message that indicates the cause of the error

def read_records( self, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]] = None, stream_slice: Optional[Mapping[str, Any]] = None, stream_state: Optional[Mapping[str, Any]] = None) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
362    def read_records(
363        self,
364        sync_mode: SyncMode,
365        cursor_field: Optional[List[str]] = None,
366        stream_slice: Optional[Mapping[str, Any]] = None,
367        stream_state: Optional[Mapping[str, Any]] = None,
368    ) -> Iterable[StreamData]:
369        # A cursor_field indicates this is an incremental stream which offers better checkpointing than RFR enabled via the cursor
370        if self.cursor_field or not isinstance(self.get_cursor(), ResumableFullRefreshCursor):
371            yield from self._read_pages(
372                lambda req, res, state, _slice: self.parse_response(
373                    res, stream_slice=_slice, stream_state=state
374                ),
375                stream_slice,
376                stream_state,
377            )
378        else:
379            yield from self._read_single_page(
380                lambda req, res, state, _slice: self.parse_response(
381                    res, stream_slice=_slice, stream_state=state
382                ),
383                stream_slice,
384                stream_state,
385            )

This method should be overridden by subclasses to read records based on the inputs

state: MutableMapping[str, Any]
387    @property
388    def state(self) -> MutableMapping[str, Any]:
389        cursor = self.get_cursor()
390        if cursor:
391            return cursor.get_stream_state()  # type: ignore
392        return self._state

State getter, should return state in form that can serialized to a string and send to the output as a STATE AirbyteMessage.

A good example of a state is a cursor_value: { self.cursor_field: "cursor_value" }

State should try to be as small as possible but at the same time descriptive enough to restore syncing process from the point where it stopped.

def get_cursor(self) -> Optional[airbyte_cdk.sources.streams.checkpoint.Cursor]:
401    def get_cursor(self) -> Optional[Cursor]:
402        # I don't love that this is semi-stateful but not sure what else to do. We don't know exactly what type of cursor to
403        # instantiate when creating the class. We can make a few assumptions like if there is a cursor_field which implies
404        # incremental, but we don't know until runtime if this is a substream. Ideally, a stream should explicitly define
405        # its cursor, but because we're trying to automatically apply RFR we're stuck with this logic where we replace the
406        # cursor at runtime once we detect this is a substream based on self.has_multiple_slices being reassigned
407        if self.has_multiple_slices and isinstance(self.cursor, ResumableFullRefreshCursor):
408            self.cursor = SubstreamResumableFullRefreshCursor()
409            return self.cursor
410        else:
411            return self.cursor

A Cursor is an interface that a stream can implement to manage how its internal state is read and updated while reading records. Historically, Python connectors had no concept of a cursor to manage state. Python streams need to define a cursor implementation and override this method to manage state through a Cursor.

def get_log_formatter(self) -> Optional[Callable[[requests.models.Response], Any]]:
554    def get_log_formatter(self) -> Optional[Callable[[requests.Response], Any]]:
555        """
556
557        :return Optional[Callable[[requests.Response], Any]]: Function that will be used in logging inside HttpClient
558        """
559        return None
Returns

Function that will be used in logging inside HttpClient

class HttpSubStream(airbyte_cdk.sources.streams.http.HttpStream, abc.ABC):
562class HttpSubStream(HttpStream, ABC):
563    def __init__(self, parent: HttpStream, **kwargs: Any):
564        """
565        :param parent: should be the instance of HttpStream class
566        """
567        super().__init__(**kwargs)
568        self.parent = parent
569        self.has_multiple_slices = (
570            True  # Substreams are based on parent records which implies there are multiple slices
571        )
572
573        # There are three conditions that dictate if RFR should automatically be applied to a stream
574        # 1. Streams that explicitly initialize their own cursor should defer to it and not automatically apply RFR
575        # 2. Streams with at least one cursor_field are incremental and thus a superior sync to RFR.
576        # 3. Streams overriding read_records() do not guarantee that they will call the parent implementation which can perform
577        #    per-page checkpointing so RFR is only supported if a stream use the default `HttpStream.read_records()` method
578        if (
579            not self.cursor
580            and len(self.cursor_field) == 0
581            and type(self).read_records is HttpStream.read_records
582        ):
583            self.cursor = SubstreamResumableFullRefreshCursor()
584
585    def stream_slices(
586        self,
587        sync_mode: SyncMode,
588        cursor_field: Optional[List[str]] = None,
589        stream_state: Optional[Mapping[str, Any]] = None,
590    ) -> Iterable[Optional[Mapping[str, Any]]]:
591        # read_stateless() assumes the parent is not concurrent. This is currently okay since the concurrent CDK does
592        # not support either substreams or RFR, but something that needs to be considered once we do
593        for parent_record in self.parent.read_only_records(stream_state):
594            # Skip non-records (eg AirbyteLogMessage)
595            if isinstance(parent_record, AirbyteMessage):
596                if parent_record.type == MessageType.RECORD:
597                    parent_record = parent_record.record.data  # type: ignore [assignment, union-attr]  # Incorrect type for assignment
598                else:
599                    continue
600            elif isinstance(parent_record, Record):
601                parent_record = parent_record.data
602            yield {"parent": parent_record}

Base abstract class for an Airbyte Stream using the HTTP protocol. Basic building block for users building an Airbyte source for a HTTP API.

HttpSubStream( parent: HttpStream, **kwargs: Any)
563    def __init__(self, parent: HttpStream, **kwargs: Any):
564        """
565        :param parent: should be the instance of HttpStream class
566        """
567        super().__init__(**kwargs)
568        self.parent = parent
569        self.has_multiple_slices = (
570            True  # Substreams are based on parent records which implies there are multiple slices
571        )
572
573        # There are three conditions that dictate if RFR should automatically be applied to a stream
574        # 1. Streams that explicitly initialize their own cursor should defer to it and not automatically apply RFR
575        # 2. Streams with at least one cursor_field are incremental and thus a superior sync to RFR.
576        # 3. Streams overriding read_records() do not guarantee that they will call the parent implementation which can perform
577        #    per-page checkpointing so RFR is only supported if a stream use the default `HttpStream.read_records()` method
578        if (
579            not self.cursor
580            and len(self.cursor_field) == 0
581            and type(self).read_records is HttpStream.read_records
582        ):
583            self.cursor = SubstreamResumableFullRefreshCursor()
Parameters
  • parent: should be the instance of HttpStream class
parent
has_multiple_slices = False
def stream_slices( self, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None) -> Iterable[Optional[Mapping[str, Any]]]:
585    def stream_slices(
586        self,
587        sync_mode: SyncMode,
588        cursor_field: Optional[List[str]] = None,
589        stream_state: Optional[Mapping[str, Any]] = None,
590    ) -> Iterable[Optional[Mapping[str, Any]]]:
591        # read_stateless() assumes the parent is not concurrent. This is currently okay since the concurrent CDK does
592        # not support either substreams or RFR, but something that needs to be considered once we do
593        for parent_record in self.parent.read_only_records(stream_state):
594            # Skip non-records (eg AirbyteLogMessage)
595            if isinstance(parent_record, AirbyteMessage):
596                if parent_record.type == MessageType.RECORD:
597                    parent_record = parent_record.record.data  # type: ignore [assignment, union-attr]  # Incorrect type for assignment
598                else:
599                    continue
600            elif isinstance(parent_record, Record):
601                parent_record = parent_record.data
602            yield {"parent": parent_record}

Override to define the slices for this stream. See the stream slicing section of the docs for more information.

Parameters
  • sync_mode:
  • cursor_field:
  • stream_state:
Returns
class UserDefinedBackoffException(airbyte_cdk.sources.streams.http.exceptions.BaseBackoffException):
36class UserDefinedBackoffException(BaseBackoffException):
37    """
38    An exception that exposes how long it attempted to backoff
39    """
40
41    def __init__(
42        self,
43        backoff: Union[int, float],
44        request: requests.PreparedRequest,
45        response: Optional[Union[requests.Response, Exception]],
46        error_message: str = "",
47    ):
48        """
49        :param backoff: how long to backoff in seconds
50        :param request: the request that triggered this backoff exception
51        :param response: the response that triggered the backoff exception
52        """
53        self.backoff = backoff
54        super().__init__(request=request, response=response, error_message=error_message)

An exception that exposes how long it attempted to backoff

UserDefinedBackoffException( backoff: Union[int, float], request: requests.models.PreparedRequest, response: Union[requests.models.Response, Exception, NoneType], error_message: str = '')
41    def __init__(
42        self,
43        backoff: Union[int, float],
44        request: requests.PreparedRequest,
45        response: Optional[Union[requests.Response, Exception]],
46        error_message: str = "",
47    ):
48        """
49        :param backoff: how long to backoff in seconds
50        :param request: the request that triggered this backoff exception
51        :param response: the response that triggered the backoff exception
52        """
53        self.backoff = backoff
54        super().__init__(request=request, response=response, error_message=error_message)
Parameters
  • backoff: how long to backoff in seconds
  • request: the request that triggered this backoff exception
  • response: the response that triggered the backoff exception
backoff