airbyte_cdk.sources.streams.http

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5# Initialize Streams Package
 6from .exceptions import UserDefinedBackoffException
 7from .http import HttpStream, HttpSubStream
 8from .http_client import HttpClient
 9
10__all__ = ["HttpClient", "HttpStream", "HttpSubStream", "UserDefinedBackoffException"]
class HttpClient:
 77class HttpClient:
 78    _DEFAULT_MAX_RETRY: int = 5
 79    _DEFAULT_MAX_TIME: int = 60 * 10
 80    _ACTIONS_TO_RETRY_ON = {ResponseAction.RETRY, ResponseAction.RATE_LIMITED}
 81
 82    def __init__(
 83        self,
 84        name: str,
 85        logger: logging.Logger,
 86        error_handler: Optional[ErrorHandler] = None,
 87        api_budget: Optional[APIBudget] = None,
 88        session: Optional[Union[requests.Session, requests_cache.CachedSession]] = None,
 89        authenticator: Optional[AuthBase] = None,
 90        use_cache: bool = False,
 91        backoff_strategy: Optional[Union[BackoffStrategy, List[BackoffStrategy]]] = None,
 92        error_message_parser: Optional[ErrorMessageParser] = None,
 93        disable_retries: bool = False,
 94        message_repository: Optional[MessageRepository] = None,
 95    ):
 96        self._name = name
 97        self._api_budget: APIBudget = api_budget or APIBudget(policies=[])
 98        if session:
 99            self._session = session
100        else:
101            self._use_cache = use_cache
102            self._session = self._request_session()
103            self._session.mount(
104                "https://",
105                requests.adapters.HTTPAdapter(
106                    pool_connections=MAX_CONNECTION_POOL_SIZE, pool_maxsize=MAX_CONNECTION_POOL_SIZE
107                ),
108            )
109        if isinstance(authenticator, AuthBase):
110            self._session.auth = authenticator
111        self._logger = logger
112        self._error_handler = error_handler or HttpStatusErrorHandler(self._logger)
113        if backoff_strategy is not None:
114            if isinstance(backoff_strategy, list):
115                self._backoff_strategies = backoff_strategy
116            else:
117                self._backoff_strategies = [backoff_strategy]
118        else:
119            self._backoff_strategies = [DefaultBackoffStrategy()]
120        self._error_message_parser = error_message_parser or JsonErrorMessageParser()
121        self._request_attempt_count: Dict[requests.PreparedRequest, int] = {}
122        self._disable_retries = disable_retries
123        self._message_repository = message_repository
124
125    @property
126    def cache_filename(self) -> str:
127        """
128        Override if needed. Return the name of cache file
129        Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.
130        """
131        return f"{self._name}.sqlite"
132
133    def _request_session(self) -> requests.Session:
134        """
135        Session factory based on use_cache property and call rate limits (api_budget parameter)
136        :return: instance of request-based session
137        """
138        if self._use_cache:
139            cache_dir = os.getenv(ENV_REQUEST_CACHE_PATH)
140            # Use in-memory cache if cache_dir is not set
141            # This is a non-obvious interface, but it ensures we don't write sql files when running unit tests
142            # Use in-memory cache if cache_dir is not set
143            # This is a non-obvious interface, but it ensures we don't write sql files when running unit tests
144            sqlite_path = (
145                str(Path(cache_dir) / self.cache_filename)
146                if cache_dir
147                else "file::memory:?cache=shared"
148            )
149            # By using `PRAGMA synchronous=OFF` and `PRAGMA journal_mode=WAL`, we reduce the possible occurrences of `database table is locked` errors.
150            # Note that those were blindly added at the same time and one or the other might be sufficient to prevent the issues but we have seen good results with both. Feel free to revisit given more information.
151            # There are strong signals that `fast_save` might create problems but if the sync crashes, we start back from the beginning in terms of sqlite anyway so the impact should be minimal. Signals are:
152            # * https://github.com/requests-cache/requests-cache/commit/7fa89ffda300331c37d8fad7f773348a3b5b0236#diff-f43db4a5edf931647c32dec28ea7557aae4cae8444af4b26c8ecbe88d8c925aaR238
153            # * https://github.com/requests-cache/requests-cache/commit/7fa89ffda300331c37d8fad7f773348a3b5b0236#diff-2e7f95b7d7be270ff1a8118f817ea3e6663cdad273592e536a116c24e6d23c18R164-R168
154            # * `If the application running SQLite crashes, the data will be safe, but the database [might become corrupted](https://www.sqlite.org/howtocorrupt.html#cfgerr) if the operating system crashes or the computer loses power before that data has been written to the disk surface.` in [this description](https://www.sqlite.org/pragma.html#pragma_synchronous).
155            backend = requests_cache.SQLiteCache(sqlite_path, fast_save=True, wal=True)
156            return CachedLimiterSession(
157                sqlite_path, backend=backend, api_budget=self._api_budget, match_headers=True
158            )
159        else:
160            return LimiterSession(api_budget=self._api_budget)
161
162    def clear_cache(self) -> None:
163        """
164        Clear cached requests for current session, can be called any time
165        """
166        if isinstance(self._session, requests_cache.CachedSession):
167            self._session.cache.clear()  # type: ignore # cache.clear is not typed
168
169    def _dedupe_query_params(
170        self, url: str, params: Optional[Mapping[str, str]]
171    ) -> Mapping[str, str]:
172        """
173        Remove query parameters from params mapping if they are already encoded in the URL.
174        :param url: URL with
175        :param params:
176        :return:
177        """
178        if params is None:
179            params = {}
180        query_string = urllib.parse.urlparse(url).query
181        query_dict = {k: v[0] for k, v in urllib.parse.parse_qs(query_string).items()}
182
183        duplicate_keys_with_same_value = {
184            k for k in query_dict.keys() if str(params.get(k)) == str(query_dict[k])
185        }
186        return {k: v for k, v in params.items() if k not in duplicate_keys_with_same_value}
187
188    def _create_prepared_request(
189        self,
190        http_method: str,
191        url: str,
192        dedupe_query_params: bool = False,
193        headers: Optional[Mapping[str, str]] = None,
194        params: Optional[Mapping[str, str]] = None,
195        json: Optional[Mapping[str, Any]] = None,
196        data: Optional[Union[str, Mapping[str, Any]]] = None,
197    ) -> requests.PreparedRequest:
198        if dedupe_query_params:
199            query_params = self._dedupe_query_params(url, params)
200        else:
201            query_params = params or {}
202        args = {"method": http_method, "url": url, "headers": headers, "params": query_params}
203        if http_method.upper() in BODY_REQUEST_METHODS:
204            if json and data:
205                raise RequestBodyException(
206                    "At the same time only one of the 'request_body_data' and 'request_body_json' functions can return data"
207                )
208            elif json:
209                args["json"] = json
210            elif data:
211                args["data"] = data
212        prepared_request: requests.PreparedRequest = self._session.prepare_request(
213            requests.Request(**args)
214        )
215
216        return prepared_request
217
218    @property
219    def _max_retries(self) -> int:
220        """
221        Determines the max retries based on the provided error handler.
222        """
223        max_retries = None
224        if self._disable_retries:
225            max_retries = 0
226        else:
227            max_retries = self._error_handler.max_retries
228        return max_retries if max_retries is not None else self._DEFAULT_MAX_RETRY
229
230    @property
231    def _max_time(self) -> int:
232        """
233        Determines the max time based on the provided error handler.
234        """
235        return (
236            self._error_handler.max_time
237            if self._error_handler.max_time is not None
238            else self._DEFAULT_MAX_TIME
239        )
240
241    def _send_with_retry(
242        self,
243        request: requests.PreparedRequest,
244        request_kwargs: Mapping[str, Any],
245        log_formatter: Optional[Callable[[requests.Response], Any]] = None,
246        exit_on_rate_limit: Optional[bool] = False,
247    ) -> requests.Response:
248        """
249        Sends a request with retry logic.
250
251        Args:
252            request (requests.PreparedRequest): The prepared HTTP request to send.
253            request_kwargs (Mapping[str, Any]): Additional keyword arguments for the request.
254
255        Returns:
256            requests.Response: The HTTP response received from the server after retries.
257        """
258
259        max_retries = self._max_retries
260        max_tries = max(0, max_retries) + 1
261        max_time = self._max_time
262
263        user_backoff_handler = user_defined_backoff_handler(max_tries=max_tries, max_time=max_time)(
264            self._send
265        )
266        rate_limit_backoff_handler = rate_limit_default_backoff_handler(max_tries=max_tries)
267        backoff_handler = http_client_default_backoff_handler(
268            max_tries=max_tries, max_time=max_time
269        )
270        # backoff handlers wrap _send, so it will always return a response
271        response = backoff_handler(rate_limit_backoff_handler(user_backoff_handler))(
272            request,
273            request_kwargs,
274            log_formatter=log_formatter,
275            exit_on_rate_limit=exit_on_rate_limit,
276        )  # type: ignore # mypy can't infer that backoff_handler wraps _send
277
278        return response
279
280    def _send(
281        self,
282        request: requests.PreparedRequest,
283        request_kwargs: Mapping[str, Any],
284        log_formatter: Optional[Callable[[requests.Response], Any]] = None,
285        exit_on_rate_limit: Optional[bool] = False,
286    ) -> requests.Response:
287        if request not in self._request_attempt_count:
288            self._request_attempt_count[request] = 1
289        else:
290            self._request_attempt_count[request] += 1
291            if hasattr(self._session, "auth") and isinstance(self._session.auth, AuthBase):
292                self._session.auth(request)
293
294        self._logger.debug(
295            "Making outbound API request",
296            extra={"headers": request.headers, "url": request.url, "request_body": request.body},
297        )
298
299        response: Optional[requests.Response] = None
300        exc: Optional[requests.RequestException] = None
301
302        try:
303            response = self._session.send(request, **request_kwargs)
304        except requests.RequestException as e:
305            exc = e
306
307        error_resolution: ErrorResolution = self._error_handler.interpret_response(
308            response if response is not None else exc
309        )
310
311        # Evaluation of response.text can be heavy, for example, if streaming a large response
312        # Do it only in debug mode
313        if self._logger.isEnabledFor(logging.DEBUG) and response is not None:
314            if request_kwargs.get("stream"):
315                self._logger.debug(
316                    "Receiving response, but not logging it as the response is streamed",
317                    extra={"headers": response.headers, "status": response.status_code},
318                )
319            else:
320                self._logger.debug(
321                    "Receiving response",
322                    extra={
323                        "headers": response.headers,
324                        "status": response.status_code,
325                        "body": response.text,
326                    },
327                )
328
329        # Request/response logging for declarative cdk
330        if (
331            log_formatter is not None
332            and response is not None
333            and self._message_repository is not None
334        ):
335            formatter = log_formatter
336            self._message_repository.log_message(
337                Level.DEBUG,
338                lambda: formatter(response),
339            )
340
341        self._handle_error_resolution(
342            response=response,
343            exc=exc,
344            request=request,
345            error_resolution=error_resolution,
346            exit_on_rate_limit=exit_on_rate_limit,
347        )
348
349        return response  # type: ignore # will either return a valid response of type requests.Response or raise an exception
350
351    def _get_response_body(self, response: requests.Response) -> Optional[JsonType]:
352        """
353        Extracts and returns the body of an HTTP response.
354
355        This method attempts to parse the response body as JSON. If the response
356        body is not valid JSON, it falls back to decoding the response content
357        as a UTF-8 string. If both attempts fail, it returns None.
358
359        Args:
360            response (requests.Response): The HTTP response object.
361
362        Returns:
363            Optional[JsonType]: The parsed JSON object as a string, the decoded
364            response content as a string, or None if both parsing attempts fail.
365        """
366        try:
367            return str(response.json())
368        except requests.exceptions.JSONDecodeError:
369            try:
370                return response.content.decode("utf-8")
371            except Exception:
372                return "The Content of the Response couldn't be decoded."
373
374    def _evict_key(self, prepared_request: requests.PreparedRequest) -> None:
375        """
376        Addresses high memory consumption when enabling concurrency in https://github.com/airbytehq/oncall/issues/6821.
377
378        The `_request_attempt_count` attribute keeps growing as multiple requests are made using the same `http_client`.
379        To mitigate this issue, we evict keys for completed requests once we confirm that no further retries are needed.
380        This helps manage memory usage more efficiently while maintaining the necessary logic for retry attempts.
381        """
382        if prepared_request in self._request_attempt_count:
383            del self._request_attempt_count[prepared_request]
384
385    def _handle_error_resolution(
386        self,
387        response: Optional[requests.Response],
388        exc: Optional[requests.RequestException],
389        request: requests.PreparedRequest,
390        error_resolution: ErrorResolution,
391        exit_on_rate_limit: Optional[bool] = False,
392    ) -> None:
393        if error_resolution.response_action not in self._ACTIONS_TO_RETRY_ON:
394            self._evict_key(request)
395
396        # Emit stream status RUNNING with the reason RATE_LIMITED to log that the rate limit has been reached
397        if error_resolution.response_action == ResponseAction.RATE_LIMITED:
398            # TODO: Update to handle with message repository when concurrent message repository is ready
399            reasons = [AirbyteStreamStatusReason(type=AirbyteStreamStatusReasonType.RATE_LIMITED)]
400            message = orjson.dumps(
401                AirbyteMessageSerializer.dump(
402                    stream_status_as_airbyte_message(
403                        StreamDescriptor(name=self._name), AirbyteStreamStatus.RUNNING, reasons
404                    )
405                )
406            ).decode()
407
408            # Simply printing the stream status is a temporary solution and can cause future issues. Currently, the _send method is
409            # wrapped with backoff decorators, and we can only emit messages by iterating record_iterator in the abstract source at the
410            # end of the retry decorator behavior. This approach does not allow us to emit messages in the queue before exiting the
411            # backoff retry loop. Adding `\n` to the message and ignore 'end' ensure that few messages are printed at the same time.
412            print(f"{message}\n", end="", flush=True)
413
414        if error_resolution.response_action == ResponseAction.FAIL:
415            if response is not None:
416                filtered_response_message = filter_secrets(
417                    f"Request (body): '{str(request.body)}'. Response (body): '{self._get_response_body(response)}'. Response (headers): '{response.headers}'."
418                )
419                error_message = f"'{request.method}' request to '{request.url}' failed with status code '{response.status_code}' and error message: '{self._error_message_parser.parse_response_error_message(response)}'. {filtered_response_message}"
420            else:
421                error_message = (
422                    f"'{request.method}' request to '{request.url}' failed with exception: '{exc}'"
423                )
424
425            # ensure the exception message is emitted before raised
426            self._logger.error(error_message)
427
428            raise MessageRepresentationAirbyteTracedErrors(
429                internal_message=error_message,
430                message=error_resolution.error_message or error_message,
431                failure_type=error_resolution.failure_type,
432            )
433
434        elif error_resolution.response_action == ResponseAction.IGNORE:
435            if response is not None:
436                log_message = f"Ignoring response for '{request.method}' request to '{request.url}' with response code '{response.status_code}'"
437            else:
438                log_message = f"Ignoring response for '{request.method}' request to '{request.url}' with error '{exc}'"
439
440            self._logger.info(error_resolution.error_message or log_message)
441
442        # TODO: Consider dynamic retry count depending on subsequent error codes
443        elif (
444            error_resolution.response_action == ResponseAction.RETRY
445            or error_resolution.response_action == ResponseAction.RATE_LIMITED
446        ):
447            user_defined_backoff_time = None
448            for backoff_strategy in self._backoff_strategies:
449                backoff_time = backoff_strategy.backoff_time(
450                    response_or_exception=response if response is not None else exc,
451                    attempt_count=self._request_attempt_count[request],
452                )
453                if backoff_time:
454                    user_defined_backoff_time = backoff_time
455                    break
456            error_message = (
457                error_resolution.error_message
458                or f"Request to {request.url} failed with failure type {error_resolution.failure_type}, response action {error_resolution.response_action}."
459            )
460
461            retry_endlessly = (
462                error_resolution.response_action == ResponseAction.RATE_LIMITED
463                and not exit_on_rate_limit
464            )
465
466            if user_defined_backoff_time:
467                raise UserDefinedBackoffException(
468                    backoff=user_defined_backoff_time,
469                    request=request,
470                    response=(response if response is not None else exc),
471                    error_message=error_message,
472                )
473
474            elif retry_endlessly:
475                raise RateLimitBackoffException(
476                    request=request,
477                    response=(response if response is not None else exc),
478                    error_message=error_message,
479                )
480
481            raise DefaultBackoffException(
482                request=request,
483                response=(response if response is not None else exc),
484                error_message=error_message,
485            )
486
487        elif response:
488            try:
489                response.raise_for_status()
490            except requests.HTTPError as e:
491                self._logger.error(response.text)
492                raise e
493
494    @property
495    def name(self) -> str:
496        return self._name
497
498    def send_request(
499        self,
500        http_method: str,
501        url: str,
502        request_kwargs: Mapping[str, Any],
503        headers: Optional[Mapping[str, str]] = None,
504        params: Optional[Mapping[str, str]] = None,
505        json: Optional[Mapping[str, Any]] = None,
506        data: Optional[Union[str, Mapping[str, Any]]] = None,
507        dedupe_query_params: bool = False,
508        log_formatter: Optional[Callable[[requests.Response], Any]] = None,
509        exit_on_rate_limit: Optional[bool] = False,
510    ) -> Tuple[requests.PreparedRequest, requests.Response]:
511        """
512        Prepares and sends request and return request and response objects.
513        """
514
515        request: requests.PreparedRequest = self._create_prepared_request(
516            http_method=http_method,
517            url=url,
518            dedupe_query_params=dedupe_query_params,
519            headers=headers,
520            params=params,
521            json=json,
522            data=data,
523        )
524
525        response: requests.Response = self._send_with_retry(
526            request=request,
527            request_kwargs=request_kwargs,
528            log_formatter=log_formatter,
529            exit_on_rate_limit=exit_on_rate_limit,
530        )
531
532        return request, response
HttpClient( name: str, logger: logging.Logger, error_handler: Optional[airbyte_cdk.sources.streams.http.error_handlers.ErrorHandler] = None, api_budget: Optional[airbyte_cdk.sources.streams.call_rate.APIBudget] = None, session: Union[requests.sessions.Session, requests_cache.session.CachedSession, NoneType] = None, authenticator: Optional[requests.auth.AuthBase] = None, use_cache: bool = False, backoff_strategy: Union[airbyte_cdk.BackoffStrategy, List[airbyte_cdk.BackoffStrategy], NoneType] = None, error_message_parser: Optional[airbyte_cdk.sources.streams.http.error_handlers.ErrorMessageParser] = None, disable_retries: bool = False, message_repository: Optional[airbyte_cdk.MessageRepository] = None)
 82    def __init__(
 83        self,
 84        name: str,
 85        logger: logging.Logger,
 86        error_handler: Optional[ErrorHandler] = None,
 87        api_budget: Optional[APIBudget] = None,
 88        session: Optional[Union[requests.Session, requests_cache.CachedSession]] = None,
 89        authenticator: Optional[AuthBase] = None,
 90        use_cache: bool = False,
 91        backoff_strategy: Optional[Union[BackoffStrategy, List[BackoffStrategy]]] = None,
 92        error_message_parser: Optional[ErrorMessageParser] = None,
 93        disable_retries: bool = False,
 94        message_repository: Optional[MessageRepository] = None,
 95    ):
 96        self._name = name
 97        self._api_budget: APIBudget = api_budget or APIBudget(policies=[])
 98        if session:
 99            self._session = session
100        else:
101            self._use_cache = use_cache
102            self._session = self._request_session()
103            self._session.mount(
104                "https://",
105                requests.adapters.HTTPAdapter(
106                    pool_connections=MAX_CONNECTION_POOL_SIZE, pool_maxsize=MAX_CONNECTION_POOL_SIZE
107                ),
108            )
109        if isinstance(authenticator, AuthBase):
110            self._session.auth = authenticator
111        self._logger = logger
112        self._error_handler = error_handler or HttpStatusErrorHandler(self._logger)
113        if backoff_strategy is not None:
114            if isinstance(backoff_strategy, list):
115                self._backoff_strategies = backoff_strategy
116            else:
117                self._backoff_strategies = [backoff_strategy]
118        else:
119            self._backoff_strategies = [DefaultBackoffStrategy()]
120        self._error_message_parser = error_message_parser or JsonErrorMessageParser()
121        self._request_attempt_count: Dict[requests.PreparedRequest, int] = {}
122        self._disable_retries = disable_retries
123        self._message_repository = message_repository
cache_filename: str
125    @property
126    def cache_filename(self) -> str:
127        """
128        Override if needed. Return the name of cache file
129        Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.
130        """
131        return f"{self._name}.sqlite"

Override if needed. Return the name of cache file Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.

def clear_cache(self) -> None:
162    def clear_cache(self) -> None:
163        """
164        Clear cached requests for current session, can be called any time
165        """
166        if isinstance(self._session, requests_cache.CachedSession):
167            self._session.cache.clear()  # type: ignore # cache.clear is not typed

Clear cached requests for current session, can be called any time

name: str
494    @property
495    def name(self) -> str:
496        return self._name
def send_request( self, http_method: str, url: str, request_kwargs: Mapping[str, Any], headers: Optional[Mapping[str, str]] = None, params: Optional[Mapping[str, str]] = None, json: Optional[Mapping[str, Any]] = None, data: Union[str, Mapping[str, Any], NoneType] = None, dedupe_query_params: bool = False, log_formatter: Optional[Callable[[requests.models.Response], Any]] = None, exit_on_rate_limit: Optional[bool] = False) -> Tuple[requests.models.PreparedRequest, requests.models.Response]:
498    def send_request(
499        self,
500        http_method: str,
501        url: str,
502        request_kwargs: Mapping[str, Any],
503        headers: Optional[Mapping[str, str]] = None,
504        params: Optional[Mapping[str, str]] = None,
505        json: Optional[Mapping[str, Any]] = None,
506        data: Optional[Union[str, Mapping[str, Any]]] = None,
507        dedupe_query_params: bool = False,
508        log_formatter: Optional[Callable[[requests.Response], Any]] = None,
509        exit_on_rate_limit: Optional[bool] = False,
510    ) -> Tuple[requests.PreparedRequest, requests.Response]:
511        """
512        Prepares and sends request and return request and response objects.
513        """
514
515        request: requests.PreparedRequest = self._create_prepared_request(
516            http_method=http_method,
517            url=url,
518            dedupe_query_params=dedupe_query_params,
519            headers=headers,
520            params=params,
521            json=json,
522            data=data,
523        )
524
525        response: requests.Response = self._send_with_retry(
526            request=request,
527            request_kwargs=request_kwargs,
528            log_formatter=log_formatter,
529            exit_on_rate_limit=exit_on_rate_limit,
530        )
531
532        return request, response

Prepares and sends request and return request and response objects.

 45class HttpStream(Stream, CheckpointMixin, ABC):
 46    """
 47    Base abstract class for an Airbyte Stream using the HTTP protocol. Basic building block for users building an Airbyte source for a HTTP API.
 48    """
 49
 50    source_defined_cursor = True  # Most HTTP streams use a source defined cursor (i.e: the user can't configure it like on a SQL table)
 51    page_size: Optional[int] = (
 52        None  # Use this variable to define page size for API http requests with pagination support
 53    )
 54
 55    def __init__(
 56        self, authenticator: Optional[AuthBase] = None, api_budget: Optional[APIBudget] = None
 57    ):
 58        self._exit_on_rate_limit: bool = False
 59        self._http_client = HttpClient(
 60            name=self.name,
 61            logger=self.logger,
 62            error_handler=self.get_error_handler(),
 63            api_budget=api_budget or APIBudget(policies=[]),
 64            authenticator=authenticator,
 65            use_cache=self.use_cache,
 66            backoff_strategy=self.get_backoff_strategy(),
 67            message_repository=InMemoryMessageRepository(),
 68        )
 69
 70        # There are three conditions that dictate if RFR should automatically be applied to a stream
 71        # 1. Streams that explicitly initialize their own cursor should defer to it and not automatically apply RFR
 72        # 2. Streams with at least one cursor_field are incremental and thus a superior sync to RFR.
 73        # 3. Streams overriding read_records() do not guarantee that they will call the parent implementation which can perform
 74        #    per-page checkpointing so RFR is only supported if a stream use the default `HttpStream.read_records()` method
 75        if (
 76            not self.cursor
 77            and len(self.cursor_field) == 0
 78            and type(self).read_records is HttpStream.read_records
 79        ):
 80            self.cursor = ResumableFullRefreshCursor()
 81
 82    @property
 83    def exit_on_rate_limit(self) -> bool:
 84        """
 85        :return: False if the stream will retry endlessly when rate limited
 86        """
 87        return self._exit_on_rate_limit
 88
 89    @exit_on_rate_limit.setter
 90    def exit_on_rate_limit(self, value: bool) -> None:
 91        self._exit_on_rate_limit = value
 92
 93    @property
 94    def cache_filename(self) -> str:
 95        """
 96        Override if needed. Return the name of cache file
 97        Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.
 98        """
 99        return f"{self.name}.sqlite"
100
101    @property
102    def use_cache(self) -> bool:
103        """
104        Override if needed. If True, all records will be cached.
105        Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.
106        """
107        return False
108
109    @property
110    @abstractmethod
111    def url_base(self) -> str:
112        """
113        :return: URL base for the  API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"
114        """
115
116    @property
117    def http_method(self) -> str:
118        """
119        Override if needed. See get_request_data/get_request_json if using POST/PUT/PATCH.
120        """
121        return "GET"
122
123    @property
124    @deprecated(
125        "Deprecated as of CDK version 3.0.0. "
126        "You should set error_handler explicitly in HttpStream.get_error_handler() instead."
127    )
128    def raise_on_http_errors(self) -> bool:
129        """
130        Override if needed. If set to False, allows opting-out of raising HTTP code exception.
131        """
132        return True
133
134    @property
135    @deprecated(
136        "Deprecated as of CDK version 3.0.0. "
137        "You should set backoff_strategies explicitly in HttpStream.get_backoff_strategy() instead."
138    )
139    def max_retries(self) -> Union[int, None]:
140        """
141        Override if needed. Specifies maximum amount of retries for backoff policy. Return None for no limit.
142        """
143        return 5
144
145    @property
146    @deprecated(
147        "Deprecated as of CDK version 3.0.0. "
148        "You should set backoff_strategies explicitly in HttpStream.get_backoff_strategy() instead."
149    )
150    def max_time(self) -> Union[int, None]:
151        """
152        Override if needed. Specifies maximum total waiting time (in seconds) for backoff policy. Return None for no limit.
153        """
154        return 60 * 10
155
156    @property
157    @deprecated(
158        "Deprecated as of CDK version 3.0.0. "
159        "You should set backoff_strategies explicitly in HttpStream.get_backoff_strategy() instead."
160    )
161    def retry_factor(self) -> float:
162        """
163        Override if needed. Specifies factor for backoff policy.
164        """
165        return 5
166
167    @abstractmethod
168    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
169        """
170        Override this method to define a pagination strategy.
171
172        The value returned from this method is passed to most other methods in this class. Use it to form a request e.g: set headers or query params.
173
174        :return: The token for the next page from the input response object. Returning None means there are no more pages to read in this response.
175        """
176
177    @abstractmethod
178    def path(
179        self,
180        *,
181        stream_state: Optional[Mapping[str, Any]] = None,
182        stream_slice: Optional[Mapping[str, Any]] = None,
183        next_page_token: Optional[Mapping[str, Any]] = None,
184    ) -> str:
185        """
186        Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity"
187        """
188
189    def request_params(
190        self,
191        stream_state: Optional[Mapping[str, Any]],
192        stream_slice: Optional[Mapping[str, Any]] = None,
193        next_page_token: Optional[Mapping[str, Any]] = None,
194    ) -> MutableMapping[str, Any]:
195        """
196        Override this method to define the query parameters that should be set on an outgoing HTTP request given the inputs.
197
198        E.g: you might want to define query parameters for paging if next_page_token is not None.
199        """
200        return {}
201
202    def request_headers(
203        self,
204        stream_state: Optional[Mapping[str, Any]],
205        stream_slice: Optional[Mapping[str, Any]] = None,
206        next_page_token: Optional[Mapping[str, Any]] = None,
207    ) -> Mapping[str, Any]:
208        """
209        Override to return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
210        """
211        return {}
212
213    def request_body_data(
214        self,
215        stream_state: Optional[Mapping[str, Any]],
216        stream_slice: Optional[Mapping[str, Any]] = None,
217        next_page_token: Optional[Mapping[str, Any]] = None,
218    ) -> Optional[Union[Mapping[str, Any], str]]:
219        """
220        Override when creating POST/PUT/PATCH requests to populate the body of the request with a non-JSON payload.
221
222        If returns a ready text that it will be sent as is.
223        If returns a dict that it will be converted to a urlencoded form.
224        E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
225
226        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
227        """
228        return None
229
230    def request_body_json(
231        self,
232        stream_state: Optional[Mapping[str, Any]],
233        stream_slice: Optional[Mapping[str, Any]] = None,
234        next_page_token: Optional[Mapping[str, Any]] = None,
235    ) -> Optional[Mapping[str, Any]]:
236        """
237        Override when creating POST/PUT/PATCH requests to populate the body of the request with a JSON payload.
238
239        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
240        """
241        return None
242
243    def request_kwargs(
244        self,
245        stream_state: Optional[Mapping[str, Any]],
246        stream_slice: Optional[Mapping[str, Any]] = None,
247        next_page_token: Optional[Mapping[str, Any]] = None,
248    ) -> Mapping[str, Any]:
249        """
250        Override to return a mapping of keyword arguments to be used when creating the HTTP request.
251        Any option listed in https://docs.python-requests.org/en/latest/api/#requests.adapters.BaseAdapter.send for can be returned from
252        this method. Note that these options do not conflict with request-level options such as headers, request params, etc..
253        """
254        return {}
255
256    @abstractmethod
257    def parse_response(
258        self,
259        response: requests.Response,
260        *,
261        stream_state: Mapping[str, Any],
262        stream_slice: Optional[Mapping[str, Any]] = None,
263        next_page_token: Optional[Mapping[str, Any]] = None,
264    ) -> Iterable[Mapping[str, Any]]:
265        """
266        Parses the raw response object into a list of records.
267        By default, this returns an iterable containing the input. Override to parse differently.
268        :param response:
269        :param stream_state:
270        :param stream_slice:
271        :param next_page_token:
272        :return: An iterable containing the parsed response
273        """
274
275    def get_backoff_strategy(self) -> Optional[Union[BackoffStrategy, List[BackoffStrategy]]]:
276        """
277        Used to initialize Adapter to avoid breaking changes.
278        If Stream has a `backoff_time` method implementation, we know this stream uses old (pre-HTTPClient) backoff handlers and thus an adapter is needed.
279
280        Override to provide custom BackoffStrategy
281        :return Optional[BackoffStrategy]:
282        """
283        if hasattr(self, "backoff_time"):
284            return HttpStreamAdapterBackoffStrategy(self)
285        else:
286            return None
287
288    def get_error_handler(self) -> Optional[ErrorHandler]:
289        """
290        Used to initialize Adapter to avoid breaking changes.
291        If Stream has a `should_retry` method implementation, we know this stream uses old (pre-HTTPClient) error handlers and thus an adapter is needed.
292
293        Override to provide custom ErrorHandler
294        :return Optional[ErrorHandler]:
295        """
296        if hasattr(self, "should_retry"):
297            error_handler = HttpStreamAdapterHttpStatusErrorHandler(
298                stream=self,
299                logger=logging.getLogger(),
300                max_retries=self.max_retries,
301                max_time=timedelta(seconds=self.max_time or 0),
302            )
303            return error_handler
304        else:
305            return None
306
307    @classmethod
308    def _join_url(cls, url_base: str, path: str) -> str:
309        return urljoin(url_base, path)
310
311    @classmethod
312    def parse_response_error_message(cls, response: requests.Response) -> Optional[str]:
313        """
314        Parses the raw response object from a failed request into a user-friendly error message.
315        By default, this method tries to grab the error message from JSON responses by following common API patterns. Override to parse differently.
316
317        :param response:
318        :return: A user-friendly message that indicates the cause of the error
319        """
320
321        # default logic to grab error from common fields
322        def _try_get_error(value: Optional[JsonType]) -> Optional[str]:
323            if isinstance(value, str):
324                return value
325            elif isinstance(value, list):
326                errors_in_value = [_try_get_error(v) for v in value]
327                return ", ".join(v for v in errors_in_value if v is not None)
328            elif isinstance(value, dict):
329                new_value = (
330                    value.get("message")
331                    or value.get("messages")
332                    or value.get("error")
333                    or value.get("errors")
334                    or value.get("failures")
335                    or value.get("failure")
336                    or value.get("detail")
337                )
338                return _try_get_error(new_value)
339            return None
340
341        try:
342            body = response.json()
343            return _try_get_error(body)
344        except requests.exceptions.JSONDecodeError:
345            return None
346
347    def get_error_display_message(self, exception: BaseException) -> Optional[str]:
348        """
349        Retrieves the user-friendly display message that corresponds to an exception.
350        This will be called when encountering an exception while reading records from the stream, and used to build the AirbyteTraceMessage.
351
352        The default implementation of this method only handles HTTPErrors by passing the response to self.parse_response_error_message().
353        The method should be overriden as needed to handle any additional exception types.
354
355        :param exception: The exception that was raised
356        :return: A user-friendly message that indicates the cause of the error
357        """
358        if isinstance(exception, requests.HTTPError) and exception.response is not None:
359            return self.parse_response_error_message(exception.response)
360        return None
361
362    def read_records(
363        self,
364        sync_mode: SyncMode,
365        cursor_field: Optional[List[str]] = None,
366        stream_slice: Optional[Mapping[str, Any]] = None,
367        stream_state: Optional[Mapping[str, Any]] = None,
368    ) -> Iterable[StreamData]:
369        # A cursor_field indicates this is an incremental stream which offers better checkpointing than RFR enabled via the cursor
370        if self.cursor_field or not isinstance(self.get_cursor(), ResumableFullRefreshCursor):
371            yield from self._read_pages(
372                lambda req, res, state, _slice: self.parse_response(
373                    res, stream_slice=_slice, stream_state=state
374                ),
375                stream_slice,
376                stream_state,
377            )
378        else:
379            yield from self._read_single_page(
380                lambda req, res, state, _slice: self.parse_response(
381                    res, stream_slice=_slice, stream_state=state
382                ),
383                stream_slice,
384                stream_state,
385            )
386
387    @property
388    def state(self) -> MutableMapping[str, Any]:
389        cursor = self.get_cursor()
390        if cursor:
391            return cursor.get_stream_state()  # type: ignore
392        return self._state
393
394    @state.setter
395    def state(self, value: MutableMapping[str, Any]) -> None:
396        cursor = self.get_cursor()
397        if cursor:
398            cursor.set_initial_state(value)
399        self._state = value
400
401    def get_cursor(self) -> Optional[Cursor]:
402        # I don't love that this is semi-stateful but not sure what else to do. We don't know exactly what type of cursor to
403        # instantiate when creating the class. We can make a few assumptions like if there is a cursor_field which implies
404        # incremental, but we don't know until runtime if this is a substream. Ideally, a stream should explicitly define
405        # its cursor, but because we're trying to automatically apply RFR we're stuck with this logic where we replace the
406        # cursor at runtime once we detect this is a substream based on self.has_multiple_slices being reassigned
407        if self.has_multiple_slices and isinstance(self.cursor, ResumableFullRefreshCursor):
408            self.cursor = SubstreamResumableFullRefreshCursor()
409            return self.cursor
410        else:
411            return self.cursor
412
413    def _read_pages(
414        self,
415        records_generator_fn: Callable[
416            [
417                requests.PreparedRequest,
418                requests.Response,
419                Mapping[str, Any],
420                Optional[Mapping[str, Any]],
421            ],
422            Iterable[StreamData],
423        ],
424        stream_slice: Optional[Mapping[str, Any]] = None,
425        stream_state: Optional[Mapping[str, Any]] = None,
426    ) -> Iterable[StreamData]:
427        stream_state = stream_state or {}
428        pagination_complete = False
429        next_page_token = None
430        while not pagination_complete:
431            request, response = self._fetch_next_page(stream_slice, stream_state, next_page_token)
432            yield from records_generator_fn(request, response, stream_state, stream_slice)
433
434            next_page_token = self.next_page_token(response)
435            if not next_page_token:
436                pagination_complete = True
437
438        cursor = self.get_cursor()
439        if cursor and isinstance(cursor, SubstreamResumableFullRefreshCursor):
440            partition, _, _ = self._extract_slice_fields(stream_slice=stream_slice)
441            # Substreams checkpoint state by marking an entire parent partition as completed so that on the subsequent attempt
442            # after a failure, completed parents are skipped and the sync can make progress
443            cursor.close_slice(StreamSlice(cursor_slice={}, partition=partition))
444
445        # Always return an empty generator just in case no records were ever yielded
446        yield from []
447
448    def _read_single_page(
449        self,
450        records_generator_fn: Callable[
451            [
452                requests.PreparedRequest,
453                requests.Response,
454                Mapping[str, Any],
455                Optional[Mapping[str, Any]],
456            ],
457            Iterable[StreamData],
458        ],
459        stream_slice: Optional[Mapping[str, Any]] = None,
460        stream_state: Optional[Mapping[str, Any]] = None,
461    ) -> Iterable[StreamData]:
462        partition, cursor_slice, remaining_slice = self._extract_slice_fields(
463            stream_slice=stream_slice
464        )
465        stream_state = stream_state or {}
466        next_page_token = cursor_slice or None
467
468        request, response = self._fetch_next_page(remaining_slice, stream_state, next_page_token)
469        yield from records_generator_fn(request, response, stream_state, remaining_slice)
470
471        next_page_token = self.next_page_token(response) or {
472            "__ab_full_refresh_sync_complete": True
473        }
474
475        cursor = self.get_cursor()
476        if cursor:
477            cursor.close_slice(StreamSlice(cursor_slice=next_page_token, partition=partition))
478
479        # Always return an empty generator just in case no records were ever yielded
480        yield from []
481
482    @staticmethod
483    def _extract_slice_fields(
484        stream_slice: Optional[Mapping[str, Any]],
485    ) -> tuple[Mapping[str, Any], Mapping[str, Any], Mapping[str, Any]]:
486        if not stream_slice:
487            return {}, {}, {}
488
489        if isinstance(stream_slice, StreamSlice):
490            partition = stream_slice.partition
491            cursor_slice = stream_slice.cursor_slice
492            remaining = {k: v for k, v in stream_slice.items()}
493        else:
494            # RFR streams that implement stream_slices() to generate stream slices in the legacy mapping format are converted into a
495            # structured stream slice mapping by the LegacyCursorBasedCheckpointReader. The structured mapping object has separate
496            # fields for the partition and cursor_slice value
497            partition = stream_slice.get("partition", {})
498            cursor_slice = stream_slice.get("cursor_slice", {})
499            remaining = {
500                key: val
501                for key, val in stream_slice.items()
502                if key != "partition" and key != "cursor_slice"
503            }
504        return partition, cursor_slice, remaining
505
506    def _fetch_next_page(
507        self,
508        stream_slice: Optional[Mapping[str, Any]] = None,
509        stream_state: Optional[Mapping[str, Any]] = None,
510        next_page_token: Optional[Mapping[str, Any]] = None,
511    ) -> Tuple[requests.PreparedRequest, requests.Response]:
512        request, response = self._http_client.send_request(
513            http_method=self.http_method,
514            url=self._join_url(
515                self.url_base,
516                self.path(
517                    stream_state=stream_state,
518                    stream_slice=stream_slice,
519                    next_page_token=next_page_token,
520                ),
521            ),
522            request_kwargs=self.request_kwargs(
523                stream_state=stream_state,
524                stream_slice=stream_slice,
525                next_page_token=next_page_token,
526            ),
527            headers=self.request_headers(
528                stream_state=stream_state,
529                stream_slice=stream_slice,
530                next_page_token=next_page_token,
531            ),
532            params=self.request_params(
533                stream_state=stream_state,
534                stream_slice=stream_slice,
535                next_page_token=next_page_token,
536            ),
537            json=self.request_body_json(
538                stream_state=stream_state,
539                stream_slice=stream_slice,
540                next_page_token=next_page_token,
541            ),
542            data=self.request_body_data(
543                stream_state=stream_state,
544                stream_slice=stream_slice,
545                next_page_token=next_page_token,
546            ),
547            dedupe_query_params=True,
548            log_formatter=self.get_log_formatter(),
549            exit_on_rate_limit=self.exit_on_rate_limit,
550        )
551
552        return request, response
553
554    def get_log_formatter(self) -> Optional[Callable[[requests.Response], Any]]:
555        """
556
557        :return Optional[Callable[[requests.Response], Any]]: Function that will be used in logging inside HttpClient
558        """
559        return None

Base abstract class for an Airbyte Stream using the HTTP protocol. Basic building block for users building an Airbyte source for a HTTP API.

source_defined_cursor = True

Return False if the cursor can be configured by the user.

page_size: Optional[int] = None
exit_on_rate_limit: bool
82    @property
83    def exit_on_rate_limit(self) -> bool:
84        """
85        :return: False if the stream will retry endlessly when rate limited
86        """
87        return self._exit_on_rate_limit
Returns

False if the stream will retry endlessly when rate limited

cache_filename: str
93    @property
94    def cache_filename(self) -> str:
95        """
96        Override if needed. Return the name of cache file
97        Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.
98        """
99        return f"{self.name}.sqlite"

Override if needed. Return the name of cache file Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.

use_cache: bool
101    @property
102    def use_cache(self) -> bool:
103        """
104        Override if needed. If True, all records will be cached.
105        Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.
106        """
107        return False

Override if needed. If True, all records will be cached. Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.

url_base: str
109    @property
110    @abstractmethod
111    def url_base(self) -> str:
112        """
113        :return: URL base for the  API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"
114        """
Returns

URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"

http_method: str
116    @property
117    def http_method(self) -> str:
118        """
119        Override if needed. See get_request_data/get_request_json if using POST/PUT/PATCH.
120        """
121        return "GET"

Override if needed. See get_request_data/get_request_json if using POST/PUT/PATCH.

raise_on_http_errors: bool
123    @property
124    @deprecated(
125        "Deprecated as of CDK version 3.0.0. "
126        "You should set error_handler explicitly in HttpStream.get_error_handler() instead."
127    )
128    def raise_on_http_errors(self) -> bool:
129        """
130        Override if needed. If set to False, allows opting-out of raising HTTP code exception.
131        """
132        return True

Override if needed. If set to False, allows opting-out of raising HTTP code exception.

max_retries: Optional[int]
134    @property
135    @deprecated(
136        "Deprecated as of CDK version 3.0.0. "
137        "You should set backoff_strategies explicitly in HttpStream.get_backoff_strategy() instead."
138    )
139    def max_retries(self) -> Union[int, None]:
140        """
141        Override if needed. Specifies maximum amount of retries for backoff policy. Return None for no limit.
142        """
143        return 5

Override if needed. Specifies maximum amount of retries for backoff policy. Return None for no limit.

max_time: Optional[int]
145    @property
146    @deprecated(
147        "Deprecated as of CDK version 3.0.0. "
148        "You should set backoff_strategies explicitly in HttpStream.get_backoff_strategy() instead."
149    )
150    def max_time(self) -> Union[int, None]:
151        """
152        Override if needed. Specifies maximum total waiting time (in seconds) for backoff policy. Return None for no limit.
153        """
154        return 60 * 10

Override if needed. Specifies maximum total waiting time (in seconds) for backoff policy. Return None for no limit.

retry_factor: float
156    @property
157    @deprecated(
158        "Deprecated as of CDK version 3.0.0. "
159        "You should set backoff_strategies explicitly in HttpStream.get_backoff_strategy() instead."
160    )
161    def retry_factor(self) -> float:
162        """
163        Override if needed. Specifies factor for backoff policy.
164        """
165        return 5

Override if needed. Specifies factor for backoff policy.

@abstractmethod
def next_page_token(self, response: requests.models.Response) -> Optional[Mapping[str, Any]]:
167    @abstractmethod
168    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
169        """
170        Override this method to define a pagination strategy.
171
172        The value returned from this method is passed to most other methods in this class. Use it to form a request e.g: set headers or query params.
173
174        :return: The token for the next page from the input response object. Returning None means there are no more pages to read in this response.
175        """

Override this method to define a pagination strategy.

The value returned from this method is passed to most other methods in this class. Use it to form a request e.g: set headers or query params.

Returns

The token for the next page from the input response object. Returning None means there are no more pages to read in this response.

@abstractmethod
def path( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> str:
177    @abstractmethod
178    def path(
179        self,
180        *,
181        stream_state: Optional[Mapping[str, Any]] = None,
182        stream_slice: Optional[Mapping[str, Any]] = None,
183        next_page_token: Optional[Mapping[str, Any]] = None,
184    ) -> str:
185        """
186        Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity"
187        """

Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity"

def request_params( self, stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> MutableMapping[str, Any]:
189    def request_params(
190        self,
191        stream_state: Optional[Mapping[str, Any]],
192        stream_slice: Optional[Mapping[str, Any]] = None,
193        next_page_token: Optional[Mapping[str, Any]] = None,
194    ) -> MutableMapping[str, Any]:
195        """
196        Override this method to define the query parameters that should be set on an outgoing HTTP request given the inputs.
197
198        E.g: you might want to define query parameters for paging if next_page_token is not None.
199        """
200        return {}

Override this method to define the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def request_headers( self, stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
202    def request_headers(
203        self,
204        stream_state: Optional[Mapping[str, Any]],
205        stream_slice: Optional[Mapping[str, Any]] = None,
206        next_page_token: Optional[Mapping[str, Any]] = None,
207    ) -> Mapping[str, Any]:
208        """
209        Override to return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
210        """
211        return {}

Override to return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def request_body_data( self, stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Union[str, Mapping[str, Any], NoneType]:
213    def request_body_data(
214        self,
215        stream_state: Optional[Mapping[str, Any]],
216        stream_slice: Optional[Mapping[str, Any]] = None,
217        next_page_token: Optional[Mapping[str, Any]] = None,
218    ) -> Optional[Union[Mapping[str, Any], str]]:
219        """
220        Override when creating POST/PUT/PATCH requests to populate the body of the request with a non-JSON payload.
221
222        If returns a ready text that it will be sent as is.
223        If returns a dict that it will be converted to a urlencoded form.
224        E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
225
226        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
227        """
228        return None

Override when creating POST/PUT/PATCH requests to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def request_body_json( self, stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Optional[Mapping[str, Any]]:
230    def request_body_json(
231        self,
232        stream_state: Optional[Mapping[str, Any]],
233        stream_slice: Optional[Mapping[str, Any]] = None,
234        next_page_token: Optional[Mapping[str, Any]] = None,
235    ) -> Optional[Mapping[str, Any]]:
236        """
237        Override when creating POST/PUT/PATCH requests to populate the body of the request with a JSON payload.
238
239        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
240        """
241        return None

Override when creating POST/PUT/PATCH requests to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def request_kwargs( self, stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
243    def request_kwargs(
244        self,
245        stream_state: Optional[Mapping[str, Any]],
246        stream_slice: Optional[Mapping[str, Any]] = None,
247        next_page_token: Optional[Mapping[str, Any]] = None,
248    ) -> Mapping[str, Any]:
249        """
250        Override to return a mapping of keyword arguments to be used when creating the HTTP request.
251        Any option listed in https://docs.python-requests.org/en/latest/api/#requests.adapters.BaseAdapter.send for can be returned from
252        this method. Note that these options do not conflict with request-level options such as headers, request params, etc..
253        """
254        return {}

Override to return a mapping of keyword arguments to be used when creating the HTTP request. Any option listed in https://docs.python-requests.org/en/latest/api/#requests.adapters.BaseAdapter.send for can be returned from this method. Note that these options do not conflict with request-level options such as headers, request params, etc..

@abstractmethod
def parse_response( self, response: requests.models.Response, *, stream_state: Mapping[str, Any], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Iterable[Mapping[str, Any]]:
256    @abstractmethod
257    def parse_response(
258        self,
259        response: requests.Response,
260        *,
261        stream_state: Mapping[str, Any],
262        stream_slice: Optional[Mapping[str, Any]] = None,
263        next_page_token: Optional[Mapping[str, Any]] = None,
264    ) -> Iterable[Mapping[str, Any]]:
265        """
266        Parses the raw response object into a list of records.
267        By default, this returns an iterable containing the input. Override to parse differently.
268        :param response:
269        :param stream_state:
270        :param stream_slice:
271        :param next_page_token:
272        :return: An iterable containing the parsed response
273        """

Parses the raw response object into a list of records. By default, this returns an iterable containing the input. Override to parse differently.

Parameters
  • response:
  • stream_state:
  • stream_slice:
  • next_page_token:
Returns

An iterable containing the parsed response

def get_backoff_strategy( self) -> Union[airbyte_cdk.BackoffStrategy, List[airbyte_cdk.BackoffStrategy], NoneType]:
275    def get_backoff_strategy(self) -> Optional[Union[BackoffStrategy, List[BackoffStrategy]]]:
276        """
277        Used to initialize Adapter to avoid breaking changes.
278        If Stream has a `backoff_time` method implementation, we know this stream uses old (pre-HTTPClient) backoff handlers and thus an adapter is needed.
279
280        Override to provide custom BackoffStrategy
281        :return Optional[BackoffStrategy]:
282        """
283        if hasattr(self, "backoff_time"):
284            return HttpStreamAdapterBackoffStrategy(self)
285        else:
286            return None

Used to initialize Adapter to avoid breaking changes. If Stream has a backoff_time method implementation, we know this stream uses old (pre-HTTPClient) backoff handlers and thus an adapter is needed.

Override to provide custom BackoffStrategy

Returns
def get_error_handler( self) -> Optional[airbyte_cdk.sources.streams.http.error_handlers.ErrorHandler]:
288    def get_error_handler(self) -> Optional[ErrorHandler]:
289        """
290        Used to initialize Adapter to avoid breaking changes.
291        If Stream has a `should_retry` method implementation, we know this stream uses old (pre-HTTPClient) error handlers and thus an adapter is needed.
292
293        Override to provide custom ErrorHandler
294        :return Optional[ErrorHandler]:
295        """
296        if hasattr(self, "should_retry"):
297            error_handler = HttpStreamAdapterHttpStatusErrorHandler(
298                stream=self,
299                logger=logging.getLogger(),
300                max_retries=self.max_retries,
301                max_time=timedelta(seconds=self.max_time or 0),
302            )
303            return error_handler
304        else:
305            return None

Used to initialize Adapter to avoid breaking changes. If Stream has a should_retry method implementation, we know this stream uses old (pre-HTTPClient) error handlers and thus an adapter is needed.

Override to provide custom ErrorHandler

Returns
@classmethod
def parse_response_error_message(cls, response: requests.models.Response) -> Optional[str]:
311    @classmethod
312    def parse_response_error_message(cls, response: requests.Response) -> Optional[str]:
313        """
314        Parses the raw response object from a failed request into a user-friendly error message.
315        By default, this method tries to grab the error message from JSON responses by following common API patterns. Override to parse differently.
316
317        :param response:
318        :return: A user-friendly message that indicates the cause of the error
319        """
320
321        # default logic to grab error from common fields
322        def _try_get_error(value: Optional[JsonType]) -> Optional[str]:
323            if isinstance(value, str):
324                return value
325            elif isinstance(value, list):
326                errors_in_value = [_try_get_error(v) for v in value]
327                return ", ".join(v for v in errors_in_value if v is not None)
328            elif isinstance(value, dict):
329                new_value = (
330                    value.get("message")
331                    or value.get("messages")
332                    or value.get("error")
333                    or value.get("errors")
334                    or value.get("failures")
335                    or value.get("failure")
336                    or value.get("detail")
337                )
338                return _try_get_error(new_value)
339            return None
340
341        try:
342            body = response.json()
343            return _try_get_error(body)
344        except requests.exceptions.JSONDecodeError:
345            return None

Parses the raw response object from a failed request into a user-friendly error message. By default, this method tries to grab the error message from JSON responses by following common API patterns. Override to parse differently.

Parameters
  • response:
Returns

A user-friendly message that indicates the cause of the error

def get_error_display_message(self, exception: BaseException) -> Optional[str]:
347    def get_error_display_message(self, exception: BaseException) -> Optional[str]:
348        """
349        Retrieves the user-friendly display message that corresponds to an exception.
350        This will be called when encountering an exception while reading records from the stream, and used to build the AirbyteTraceMessage.
351
352        The default implementation of this method only handles HTTPErrors by passing the response to self.parse_response_error_message().
353        The method should be overriden as needed to handle any additional exception types.
354
355        :param exception: The exception that was raised
356        :return: A user-friendly message that indicates the cause of the error
357        """
358        if isinstance(exception, requests.HTTPError) and exception.response is not None:
359            return self.parse_response_error_message(exception.response)
360        return None

Retrieves the user-friendly display message that corresponds to an exception. This will be called when encountering an exception while reading records from the stream, and used to build the AirbyteTraceMessage.

The default implementation of this method only handles HTTPErrors by passing the response to self.parse_response_error_message(). The method should be overriden as needed to handle any additional exception types.

Parameters
  • exception: The exception that was raised
Returns

A user-friendly message that indicates the cause of the error

def read_records( self, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]] = None, stream_slice: Optional[Mapping[str, Any]] = None, stream_state: Optional[Mapping[str, Any]] = None) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
362    def read_records(
363        self,
364        sync_mode: SyncMode,
365        cursor_field: Optional[List[str]] = None,
366        stream_slice: Optional[Mapping[str, Any]] = None,
367        stream_state: Optional[Mapping[str, Any]] = None,
368    ) -> Iterable[StreamData]:
369        # A cursor_field indicates this is an incremental stream which offers better checkpointing than RFR enabled via the cursor
370        if self.cursor_field or not isinstance(self.get_cursor(), ResumableFullRefreshCursor):
371            yield from self._read_pages(
372                lambda req, res, state, _slice: self.parse_response(
373                    res, stream_slice=_slice, stream_state=state
374                ),
375                stream_slice,
376                stream_state,
377            )
378        else:
379            yield from self._read_single_page(
380                lambda req, res, state, _slice: self.parse_response(
381                    res, stream_slice=_slice, stream_state=state
382                ),
383                stream_slice,
384                stream_state,
385            )

This method should be overridden by subclasses to read records based on the inputs

state: MutableMapping[str, Any]
387    @property
388    def state(self) -> MutableMapping[str, Any]:
389        cursor = self.get_cursor()
390        if cursor:
391            return cursor.get_stream_state()  # type: ignore
392        return self._state

State getter, should return state in form that can serialized to a string and send to the output as a STATE AirbyteMessage.

A good example of a state is a cursor_value: { self.cursor_field: "cursor_value" }

State should try to be as small as possible but at the same time descriptive enough to restore syncing process from the point where it stopped.

def get_cursor(self) -> Optional[airbyte_cdk.sources.streams.checkpoint.Cursor]:
401    def get_cursor(self) -> Optional[Cursor]:
402        # I don't love that this is semi-stateful but not sure what else to do. We don't know exactly what type of cursor to
403        # instantiate when creating the class. We can make a few assumptions like if there is a cursor_field which implies
404        # incremental, but we don't know until runtime if this is a substream. Ideally, a stream should explicitly define
405        # its cursor, but because we're trying to automatically apply RFR we're stuck with this logic where we replace the
406        # cursor at runtime once we detect this is a substream based on self.has_multiple_slices being reassigned
407        if self.has_multiple_slices and isinstance(self.cursor, ResumableFullRefreshCursor):
408            self.cursor = SubstreamResumableFullRefreshCursor()
409            return self.cursor
410        else:
411            return self.cursor

A Cursor is an interface that a stream can implement to manage how its internal state is read and updated while reading records. Historically, Python connectors had no concept of a cursor to manage state. Python streams need to define a cursor implementation and override this method to manage state through a Cursor.

def get_log_formatter(self) -> Optional[Callable[[requests.models.Response], Any]]:
554    def get_log_formatter(self) -> Optional[Callable[[requests.Response], Any]]:
555        """
556
557        :return Optional[Callable[[requests.Response], Any]]: Function that will be used in logging inside HttpClient
558        """
559        return None
Returns

Function that will be used in logging inside HttpClient

class HttpSubStream(airbyte_cdk.sources.streams.http.HttpStream, abc.ABC):
562class HttpSubStream(HttpStream, ABC):
563    def __init__(self, parent: HttpStream, **kwargs: Any):
564        """
565        :param parent: should be the instance of HttpStream class
566        """
567        super().__init__(**kwargs)
568        self.parent = parent
569        self.has_multiple_slices = (
570            True  # Substreams are based on parent records which implies there are multiple slices
571        )
572
573        # There are three conditions that dictate if RFR should automatically be applied to a stream
574        # 1. Streams that explicitly initialize their own cursor should defer to it and not automatically apply RFR
575        # 2. Streams with at least one cursor_field are incremental and thus a superior sync to RFR.
576        # 3. Streams overriding read_records() do not guarantee that they will call the parent implementation which can perform
577        #    per-page checkpointing so RFR is only supported if a stream use the default `HttpStream.read_records()` method
578        if (
579            not self.cursor
580            and len(self.cursor_field) == 0
581            and type(self).read_records is HttpStream.read_records
582        ):
583            self.cursor = SubstreamResumableFullRefreshCursor()
584
585    def stream_slices(
586        self,
587        sync_mode: SyncMode,
588        cursor_field: Optional[List[str]] = None,
589        stream_state: Optional[Mapping[str, Any]] = None,
590    ) -> Iterable[Optional[Mapping[str, Any]]]:
591        # read_stateless() assumes the parent is not concurrent. This is currently okay since the concurrent CDK does
592        # not support either substreams or RFR, but something that needs to be considered once we do
593        for parent_record in self.parent.read_only_records(stream_state):
594            # Skip non-records (eg AirbyteLogMessage)
595            if isinstance(parent_record, AirbyteMessage):
596                if parent_record.type == MessageType.RECORD:
597                    parent_record = parent_record.record.data  # type: ignore [assignment, union-attr]  # Incorrect type for assignment
598                else:
599                    continue
600            elif isinstance(parent_record, Record):
601                parent_record = parent_record.data
602            yield {"parent": parent_record}

Base abstract class for an Airbyte Stream using the HTTP protocol. Basic building block for users building an Airbyte source for a HTTP API.

HttpSubStream( parent: HttpStream, **kwargs: Any)
563    def __init__(self, parent: HttpStream, **kwargs: Any):
564        """
565        :param parent: should be the instance of HttpStream class
566        """
567        super().__init__(**kwargs)
568        self.parent = parent
569        self.has_multiple_slices = (
570            True  # Substreams are based on parent records which implies there are multiple slices
571        )
572
573        # There are three conditions that dictate if RFR should automatically be applied to a stream
574        # 1. Streams that explicitly initialize their own cursor should defer to it and not automatically apply RFR
575        # 2. Streams with at least one cursor_field are incremental and thus a superior sync to RFR.
576        # 3. Streams overriding read_records() do not guarantee that they will call the parent implementation which can perform
577        #    per-page checkpointing so RFR is only supported if a stream use the default `HttpStream.read_records()` method
578        if (
579            not self.cursor
580            and len(self.cursor_field) == 0
581            and type(self).read_records is HttpStream.read_records
582        ):
583            self.cursor = SubstreamResumableFullRefreshCursor()
Parameters
  • parent: should be the instance of HttpStream class
parent
has_multiple_slices = False
def stream_slices( self, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None) -> Iterable[Optional[Mapping[str, Any]]]:
585    def stream_slices(
586        self,
587        sync_mode: SyncMode,
588        cursor_field: Optional[List[str]] = None,
589        stream_state: Optional[Mapping[str, Any]] = None,
590    ) -> Iterable[Optional[Mapping[str, Any]]]:
591        # read_stateless() assumes the parent is not concurrent. This is currently okay since the concurrent CDK does
592        # not support either substreams or RFR, but something that needs to be considered once we do
593        for parent_record in self.parent.read_only_records(stream_state):
594            # Skip non-records (eg AirbyteLogMessage)
595            if isinstance(parent_record, AirbyteMessage):
596                if parent_record.type == MessageType.RECORD:
597                    parent_record = parent_record.record.data  # type: ignore [assignment, union-attr]  # Incorrect type for assignment
598                else:
599                    continue
600            elif isinstance(parent_record, Record):
601                parent_record = parent_record.data
602            yield {"parent": parent_record}

Override to define the slices for this stream. See the stream slicing section of the docs for more information.

Parameters
  • sync_mode:
  • cursor_field:
  • stream_state:
Returns
class UserDefinedBackoffException(airbyte_cdk.sources.streams.http.exceptions.BaseBackoffException):
36class UserDefinedBackoffException(BaseBackoffException):
37    """
38    An exception that exposes how long it attempted to backoff
39    """
40
41    def __init__(
42        self,
43        backoff: Union[int, float],
44        request: requests.PreparedRequest,
45        response: Optional[Union[requests.Response, Exception]],
46        error_message: str = "",
47    ):
48        """
49        :param backoff: how long to backoff in seconds
50        :param request: the request that triggered this backoff exception
51        :param response: the response that triggered the backoff exception
52        """
53        self.backoff = backoff
54        super().__init__(request=request, response=response, error_message=error_message)

An exception that exposes how long it attempted to backoff

UserDefinedBackoffException( backoff: Union[int, float], request: requests.models.PreparedRequest, response: Union[requests.models.Response, Exception, NoneType], error_message: str = '')
41    def __init__(
42        self,
43        backoff: Union[int, float],
44        request: requests.PreparedRequest,
45        response: Optional[Union[requests.Response, Exception]],
46        error_message: str = "",
47    ):
48        """
49        :param backoff: how long to backoff in seconds
50        :param request: the request that triggered this backoff exception
51        :param response: the response that triggered the backoff exception
52        """
53        self.backoff = backoff
54        super().__init__(request=request, response=response, error_message=error_message)
Parameters
  • backoff: how long to backoff in seconds
  • request: the request that triggered this backoff exception
  • response: the response that triggered the backoff exception
backoff