
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 5# Initialize Streams Package
 6from .exceptions import UserDefinedBackoffException
 7from .http import HttpStream, HttpSubStream
 8from .http_client import HttpClient
10__all__ = ["HttpClient", "HttpStream", "HttpSubStream", "UserDefinedBackoffException"]
 77class HttpClient:
 78    _DEFAULT_MAX_RETRY: int = 5
 79    _DEFAULT_MAX_TIME: int = 60 * 10
 80    _ACTIONS_TO_RETRY_ON = {ResponseAction.RETRY, ResponseAction.RATE_LIMITED}
 82    def __init__(
 83        self,
 84        name: str,
 85        logger: logging.Logger,
 86        error_handler: Optional[ErrorHandler] = None,
 87        api_budget: Optional[APIBudget] = None,
 88        session: Optional[Union[requests.Session, requests_cache.CachedSession]] = None,
 89        authenticator: Optional[AuthBase] = None,
 90        use_cache: bool = False,
 91        backoff_strategy: Optional[Union[BackoffStrategy, List[BackoffStrategy]]] = None,
 92        error_message_parser: Optional[ErrorMessageParser] = None,
 93        disable_retries: bool = False,
 94        message_repository: Optional[MessageRepository] = None,
 95    ):
 96        self._name = name
 97        self._api_budget: APIBudget = api_budget or APIBudget(policies=[])
 98        if session:
 99            self._session = session
100        else:
101            self._use_cache = use_cache
102            self._session = self._request_session()
103            self._session.mount(
104                "https://",
105                requests.adapters.HTTPAdapter(
106                    pool_connections=MAX_CONNECTION_POOL_SIZE, pool_maxsize=MAX_CONNECTION_POOL_SIZE
107                ),
108            )
109        if isinstance(authenticator, AuthBase):
110            self._session.auth = authenticator
111        self._logger = logger
112        self._error_handler = error_handler or HttpStatusErrorHandler(self._logger)
113        if backoff_strategy is not None:
114            if isinstance(backoff_strategy, list):
115                self._backoff_strategies = backoff_strategy
116            else:
117                self._backoff_strategies = [backoff_strategy]
118        else:
119            self._backoff_strategies = [DefaultBackoffStrategy()]
120        self._error_message_parser = error_message_parser or JsonErrorMessageParser()
121        self._request_attempt_count: Dict[requests.PreparedRequest, int] = {}
122        self._disable_retries = disable_retries
123        self._message_repository = message_repository
125    @property
126    def cache_filename(self) -> str:
127        """
128        Override if needed. Return the name of cache file
129        Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.
130        """
131        return f"{self._name}.sqlite"
133    def _request_session(self) -> requests.Session:
134        """
135        Session factory based on use_cache property and call rate limits (api_budget parameter)
136        :return: instance of request-based session
137        """
138        if self._use_cache:
139            cache_dir = os.getenv(ENV_REQUEST_CACHE_PATH)
140            # Use in-memory cache if cache_dir is not set
141            # This is a non-obvious interface, but it ensures we don't write sql files when running unit tests
142            # Use in-memory cache if cache_dir is not set
143            # This is a non-obvious interface, but it ensures we don't write sql files when running unit tests
144            sqlite_path = (
145                str(Path(cache_dir) / self.cache_filename)
146                if cache_dir
147                else "file::memory:?cache=shared"
148            )
149            # By using `PRAGMA synchronous=OFF` and `PRAGMA journal_mode=WAL`, we reduce the possible occurrences of `database table is locked` errors.
150            # Note that those were blindly added at the same time and one or the other might be sufficient to prevent the issues but we have seen good results with both. Feel free to revisit given more information.
151            # There are strong signals that `fast_save` might create problems but if the sync crashes, we start back from the beginning in terms of sqlite anyway so the impact should be minimal. Signals are:
152            # *
153            # *
154            # * `If the application running SQLite crashes, the data will be safe, but the database [might become corrupted]( if the operating system crashes or the computer loses power before that data has been written to the disk surface.` in [this description](
155            backend = requests_cache.SQLiteCache(sqlite_path, fast_save=True, wal=True)
156            return CachedLimiterSession(
157                sqlite_path, backend=backend, api_budget=self._api_budget, match_headers=True
158            )
159        else:
160            return LimiterSession(api_budget=self._api_budget)
162    def clear_cache(self) -> None:
163        """
164        Clear cached requests for current session, can be called any time
165        """
166        if isinstance(self._session, requests_cache.CachedSession):
167            self._session.cache.clear()  # type: ignore # cache.clear is not typed
169    def _dedupe_query_params(
170        self, url: str, params: Optional[Mapping[str, str]]
171    ) -> Mapping[str, str]:
172        """
173        Remove query parameters from params mapping if they are already encoded in the URL.
174        :param url: URL with
175        :param params:
176        :return:
177        """
178        if params is None:
179            params = {}
180        query_string = urllib.parse.urlparse(url).query
181        query_dict = {k: v[0] for k, v in urllib.parse.parse_qs(query_string).items()}
183        duplicate_keys_with_same_value = {
184            k for k in query_dict.keys() if str(params.get(k)) == str(query_dict[k])
185        }
186        return {k: v for k, v in params.items() if k not in duplicate_keys_with_same_value}
188    def _create_prepared_request(
189        self,
190        http_method: str,
191        url: str,
192        dedupe_query_params: bool = False,
193        headers: Optional[Mapping[str, str]] = None,
194        params: Optional[Mapping[str, str]] = None,
195        json: Optional[Mapping[str, Any]] = None,
196        data: Optional[Union[str, Mapping[str, Any]]] = None,
197    ) -> requests.PreparedRequest:
198        if dedupe_query_params:
199            query_params = self._dedupe_query_params(url, params)
200        else:
201            query_params = params or {}
202        args = {"method": http_method, "url": url, "headers": headers, "params": query_params}
203        if http_method.upper() in BODY_REQUEST_METHODS:
204            if json and data:
205                raise RequestBodyException(
206                    "At the same time only one of the 'request_body_data' and 'request_body_json' functions can return data"
207                )
208            elif json:
209                args["json"] = json
210            elif data:
211                args["data"] = data
212        prepared_request: requests.PreparedRequest = self._session.prepare_request(
213            requests.Request(**args)
214        )
216        return prepared_request
218    @property
219    def _max_retries(self) -> int:
220        """
221        Determines the max retries based on the provided error handler.
222        """
223        max_retries = None
224        if self._disable_retries:
225            max_retries = 0
226        else:
227            max_retries = self._error_handler.max_retries
228        return max_retries if max_retries is not None else self._DEFAULT_MAX_RETRY
230    @property
231    def _max_time(self) -> int:
232        """
233        Determines the max time based on the provided error handler.
234        """
235        return (
236            self._error_handler.max_time
237            if self._error_handler.max_time is not None
238            else self._DEFAULT_MAX_TIME
239        )
241    def _send_with_retry(
242        self,
243        request: requests.PreparedRequest,
244        request_kwargs: Mapping[str, Any],
245        log_formatter: Optional[Callable[[requests.Response], Any]] = None,
246        exit_on_rate_limit: Optional[bool] = False,
247    ) -> requests.Response:
248        """
249        Sends a request with retry logic.
251        Args:
252            request (requests.PreparedRequest): The prepared HTTP request to send.
253            request_kwargs (Mapping[str, Any]): Additional keyword arguments for the request.
255        Returns:
256            requests.Response: The HTTP response received from the server after retries.
257        """
259        max_retries = self._max_retries
260        max_tries = max(0, max_retries) + 1
261        max_time = self._max_time
263        user_backoff_handler = user_defined_backoff_handler(max_tries=max_tries, max_time=max_time)(
264            self._send
265        )
266        rate_limit_backoff_handler = rate_limit_default_backoff_handler(max_tries=max_tries)
267        backoff_handler = http_client_default_backoff_handler(
268            max_tries=max_tries, max_time=max_time
269        )
270        # backoff handlers wrap _send, so it will always return a response
271        response = backoff_handler(rate_limit_backoff_handler(user_backoff_handler))(
272            request,
273            request_kwargs,
274            log_formatter=log_formatter,
275            exit_on_rate_limit=exit_on_rate_limit,
276        )  # type: ignore # mypy can't infer that backoff_handler wraps _send
278        return response
280    def _send(
281        self,
282        request: requests.PreparedRequest,
283        request_kwargs: Mapping[str, Any],
284        log_formatter: Optional[Callable[[requests.Response], Any]] = None,
285        exit_on_rate_limit: Optional[bool] = False,
286    ) -> requests.Response:
287        if request not in self._request_attempt_count:
288            self._request_attempt_count[request] = 1
289        else:
290            self._request_attempt_count[request] += 1
291            if hasattr(self._session, "auth") and isinstance(self._session.auth, AuthBase):
292                self._session.auth(request)
294        self._logger.debug(
295            "Making outbound API request",
296            extra={"headers": request.headers, "url": request.url, "request_body": request.body},
297        )
299        response: Optional[requests.Response] = None
300        exc: Optional[requests.RequestException] = None
302        try:
303            response = self._session.send(request, **request_kwargs)
304        except requests.RequestException as e:
305            exc = e
307        error_resolution: ErrorResolution = self._error_handler.interpret_response(
308            response if response is not None else exc
309        )
311        # Evaluation of response.text can be heavy, for example, if streaming a large response
312        # Do it only in debug mode
313        if self._logger.isEnabledFor(logging.DEBUG) and response is not None:
314            if request_kwargs.get("stream"):
315                self._logger.debug(
316                    "Receiving response, but not logging it as the response is streamed",
317                    extra={"headers": response.headers, "status": response.status_code},
318                )
319            else:
320                self._logger.debug(
321                    "Receiving response",
322                    extra={
323                        "headers": response.headers,
324                        "status": response.status_code,
325                        "body": response.text,
326                    },
327                )
329        # Request/response logging for declarative cdk
330        if (
331            log_formatter is not None
332            and response is not None
333            and self._message_repository is not None
334        ):
335            formatter = log_formatter
336            self._message_repository.log_message(
337                Level.DEBUG,
338                lambda: formatter(response),
339            )
341        self._handle_error_resolution(
342            response=response,
343            exc=exc,
344            request=request,
345            error_resolution=error_resolution,
346            exit_on_rate_limit=exit_on_rate_limit,
347        )
349        return response  # type: ignore # will either return a valid response of type requests.Response or raise an exception
351    def _get_response_body(self, response: requests.Response) -> Optional[JsonType]:
352        """
353        Extracts and returns the body of an HTTP response.
355        This method attempts to parse the response body as JSON. If the response
356        body is not valid JSON, it falls back to decoding the response content
357        as a UTF-8 string. If both attempts fail, it returns None.
359        Args:
360            response (requests.Response): The HTTP response object.
362        Returns:
363            Optional[JsonType]: The parsed JSON object as a string, the decoded
364            response content as a string, or None if both parsing attempts fail.
365        """
366        try:
367            return str(response.json())
368        except requests.exceptions.JSONDecodeError:
369            try:
370                return response.content.decode("utf-8")
371            except Exception:
372                return "The Content of the Response couldn't be decoded."
374    def _evict_key(self, prepared_request: requests.PreparedRequest) -> None:
375        """
376        Addresses high memory consumption when enabling concurrency in
378        The `_request_attempt_count` attribute keeps growing as multiple requests are made using the same `http_client`.
379        To mitigate this issue, we evict keys for completed requests once we confirm that no further retries are needed.
380        This helps manage memory usage more efficiently while maintaining the necessary logic for retry attempts.
381        """
382        if prepared_request in self._request_attempt_count:
383            del self._request_attempt_count[prepared_request]
385    def _handle_error_resolution(
386        self,
387        response: Optional[requests.Response],
388        exc: Optional[requests.RequestException],
389        request: requests.PreparedRequest,
390        error_resolution: ErrorResolution,
391        exit_on_rate_limit: Optional[bool] = False,
392    ) -> None:
393        if error_resolution.response_action not in self._ACTIONS_TO_RETRY_ON:
394            self._evict_key(request)
396        # Emit stream status RUNNING with the reason RATE_LIMITED to log that the rate limit has been reached
397        if error_resolution.response_action == ResponseAction.RATE_LIMITED:
398            # TODO: Update to handle with message repository when concurrent message repository is ready
399            reasons = [AirbyteStreamStatusReason(type=AirbyteStreamStatusReasonType.RATE_LIMITED)]
400            message = orjson.dumps(
401                AirbyteMessageSerializer.dump(
402                    stream_status_as_airbyte_message(
403                        StreamDescriptor(name=self._name), AirbyteStreamStatus.RUNNING, reasons
404                    )
405                )
406            ).decode()
408            # Simply printing the stream status is a temporary solution and can cause future issues. Currently, the _send method is
409            # wrapped with backoff decorators, and we can only emit messages by iterating record_iterator in the abstract source at the
410            # end of the retry decorator behavior. This approach does not allow us to emit messages in the queue before exiting the
411            # backoff retry loop. Adding `\n` to the message and ignore 'end' ensure that few messages are printed at the same time.
412            print(f"{message}\n", end="", flush=True)
414        if error_resolution.response_action == ResponseAction.FAIL:
415            if response is not None:
416                filtered_response_message = filter_secrets(
417                    f"Request (body): '{str(request.body)}'. Response (body): '{self._get_response_body(response)}'. Response (headers): '{response.headers}'."
418                )
419                error_message = f"'{request.method}' request to '{request.url}' failed with status code '{response.status_code}' and error message: '{self._error_message_parser.parse_response_error_message(response)}'. {filtered_response_message}"
420            else:
421                error_message = (
422                    f"'{request.method}' request to '{request.url}' failed with exception: '{exc}'"
423                )
425            # ensure the exception message is emitted before raised
426            self._logger.error(error_message)
428            raise MessageRepresentationAirbyteTracedErrors(
429                internal_message=error_message,
430                message=error_resolution.error_message or error_message,
431                failure_type=error_resolution.failure_type,
432            )
434        elif error_resolution.response_action == ResponseAction.IGNORE:
435            if response is not None:
436                log_message = f"Ignoring response for '{request.method}' request to '{request.url}' with response code '{response.status_code}'"
437            else:
438                log_message = f"Ignoring response for '{request.method}' request to '{request.url}' with error '{exc}'"
440   or log_message)
442        # TODO: Consider dynamic retry count depending on subsequent error codes
443        elif (
444            error_resolution.response_action == ResponseAction.RETRY
445            or error_resolution.response_action == ResponseAction.RATE_LIMITED
446        ):
447            user_defined_backoff_time = None
448            for backoff_strategy in self._backoff_strategies:
449                backoff_time = backoff_strategy.backoff_time(
450                    response_or_exception=response if response is not None else exc,
451                    attempt_count=self._request_attempt_count[request],
452                )
453                if backoff_time:
454                    user_defined_backoff_time = backoff_time
455                    break
456            error_message = (
457                error_resolution.error_message
458                or f"Request to {request.url} failed with failure type {error_resolution.failure_type}, response action {error_resolution.response_action}."
459            )
461            retry_endlessly = (
462                error_resolution.response_action == ResponseAction.RATE_LIMITED
463                and not exit_on_rate_limit
464            )
466            if user_defined_backoff_time:
467                raise UserDefinedBackoffException(
468                    backoff=user_defined_backoff_time,
469                    request=request,
470                    response=(response if response is not None else exc),
471                    error_message=error_message,
472                )
474            elif retry_endlessly:
475                raise RateLimitBackoffException(
476                    request=request,
477                    response=(response if response is not None else exc),
478                    error_message=error_message,
479                )
481            raise DefaultBackoffException(
482                request=request,
483                response=(response if response is not None else exc),
484                error_message=error_message,
485            )
487        elif response:
488            try:
489                response.raise_for_status()
490            except requests.HTTPError as e:
491                self._logger.error(response.text)
492                raise e
494    @property
495    def name(self) -> str:
496        return self._name
498    def send_request(
499        self,
500        http_method: str,
501        url: str,
502        request_kwargs: Mapping[str, Any],
503        headers: Optional[Mapping[str, str]] = None,
504        params: Optional[Mapping[str, str]] = None,
505        json: Optional[Mapping[str, Any]] = None,
506        data: Optional[Union[str, Mapping[str, Any]]] = None,
507        dedupe_query_params: bool = False,
508        log_formatter: Optional[Callable[[requests.Response], Any]] = None,
509        exit_on_rate_limit: Optional[bool] = False,
510    ) -> Tuple[requests.PreparedRequest, requests.Response]:
511        """
512        Prepares and sends request and return request and response objects.
513        """
515        request: requests.PreparedRequest = self._create_prepared_request(
516            http_method=http_method,
517            url=url,
518            dedupe_query_params=dedupe_query_params,
519            headers=headers,
520            params=params,
521            json=json,
522            data=data,
523        )
525        response: requests.Response = self._send_with_retry(
526            request=request,
527            request_kwargs=request_kwargs,
528            log_formatter=log_formatter,
529            exit_on_rate_limit=exit_on_rate_limit,
530        )
532        return request, response
 45class HttpStream(Stream, CheckpointMixin, ABC):
 46    """
 47    Base abstract class for an Airbyte Stream using the HTTP protocol. Basic building block for users building an Airbyte source for a HTTP API.
 48    """
 50    source_defined_cursor = True  # Most HTTP streams use a source defined cursor (i.e: the user can't configure it like on a SQL table)
 51    page_size: Optional[int] = (
 52        None  # Use this variable to define page size for API http requests with pagination support
 53    )
 55    def __init__(
 56        self, authenticator: Optional[AuthBase] = None, api_budget: Optional[APIBudget] = None
 57    ):
 58        self._exit_on_rate_limit: bool = False
 59        self._http_client = HttpClient(
 60  ,
 61            logger=self.logger,
 62            error_handler=self.get_error_handler(),
 63            api_budget=api_budget or APIBudget(policies=[]),
 64            authenticator=authenticator,
 65            use_cache=self.use_cache,
 66            backoff_strategy=self.get_backoff_strategy(),
 67            message_repository=InMemoryMessageRepository(),
 68        )
 70        # There are three conditions that dictate if RFR should automatically be applied to a stream
 71        # 1. Streams that explicitly initialize their own cursor should defer to it and not automatically apply RFR
 72        # 2. Streams with at least one cursor_field are incremental and thus a superior sync to RFR.
 73        # 3. Streams overriding read_records() do not guarantee that they will call the parent implementation which can perform
 74        #    per-page checkpointing so RFR is only supported if a stream use the default `HttpStream.read_records()` method
 75        if (
 76            not self.cursor
 77            and len(self.cursor_field) == 0
 78            and type(self).read_records is HttpStream.read_records
 79        ):
 80            self.cursor = ResumableFullRefreshCursor()
 82    @property
 83    def exit_on_rate_limit(self) -> bool:
 84        """
 85        :return: False if the stream will retry endlessly when rate limited
 86        """
 87        return self._exit_on_rate_limit
 89    @exit_on_rate_limit.setter
 90    def exit_on_rate_limit(self, value: bool) -> None:
 91        self._exit_on_rate_limit = value
 93    @property
 94    def cache_filename(self) -> str:
 95        """
 96        Override if needed. Return the name of cache file
 97        Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.
 98        """
 99        return f"{}.sqlite"
101    @property
102    def use_cache(self) -> bool:
103        """
104        Override if needed. If True, all records will be cached.
105        Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.
106        """
107        return False
109    @property
110    @abstractmethod
111    def url_base(self) -> str:
112        """
113        :return: URL base for the  API endpoint e.g: if you wanted to hit then this should return ""
114        """
116    @property
117    def http_method(self) -> str:
118        """
119        Override if needed. See get_request_data/get_request_json if using POST/PUT/PATCH.
120        """
121        return "GET"
123    @property
124    @deprecated(
125        "Deprecated as of CDK version 3.0.0. "
126        "You should set error_handler explicitly in HttpStream.get_error_handler() instead."
127    )
128    def raise_on_http_errors(self) -> bool:
129        """
130        Override if needed. If set to False, allows opting-out of raising HTTP code exception.
131        """
132        return True
134    @property
135    @deprecated(
136        "Deprecated as of CDK version 3.0.0. "
137        "You should set backoff_strategies explicitly in HttpStream.get_backoff_strategy() instead."
138    )
139    def max_retries(self) -> Union[int, None]:
140        """
141        Override if needed. Specifies maximum amount of retries for backoff policy. Return None for no limit.
142        """
143        return 5
145    @property
146    @deprecated(
147        "Deprecated as of CDK version 3.0.0. "
148        "You should set backoff_strategies explicitly in HttpStream.get_backoff_strategy() instead."
149    )
150    def max_time(self) -> Union[int, None]:
151        """
152        Override if needed. Specifies maximum total waiting time (in seconds) for backoff policy. Return None for no limit.
153        """
154        return 60 * 10
156    @property
157    @deprecated(
158        "Deprecated as of CDK version 3.0.0. "
159        "You should set backoff_strategies explicitly in HttpStream.get_backoff_strategy() instead."
160    )
161    def retry_factor(self) -> float:
162        """
163        Override if needed. Specifies factor for backoff policy.
164        """
165        return 5
167    @abstractmethod
168    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
169        """
170        Override this method to define a pagination strategy.
172        The value returned from this method is passed to most other methods in this class. Use it to form a request e.g: set headers or query params.
174        :return: The token for the next page from the input response object. Returning None means there are no more pages to read in this response.
175        """
177    @abstractmethod
178    def path(
179        self,
180        *,
181        stream_state: Optional[Mapping[str, Any]] = None,
182        stream_slice: Optional[Mapping[str, Any]] = None,
183        next_page_token: Optional[Mapping[str, Any]] = None,
184    ) -> str:
185        """
186        Returns the URL path for the API endpoint e.g: if you wanted to hit then this should return "some_entity"
187        """
189    def request_params(
190        self,
191        stream_state: Optional[Mapping[str, Any]],
192        stream_slice: Optional[Mapping[str, Any]] = None,
193        next_page_token: Optional[Mapping[str, Any]] = None,
194    ) -> MutableMapping[str, Any]:
195        """
196        Override this method to define the query parameters that should be set on an outgoing HTTP request given the inputs.
198        E.g: you might want to define query parameters for paging if next_page_token is not None.
199        """
200        return {}
202    def request_headers(
203        self,
204        stream_state: Optional[Mapping[str, Any]],
205        stream_slice: Optional[Mapping[str, Any]] = None,
206        next_page_token: Optional[Mapping[str, Any]] = None,
207    ) -> Mapping[str, Any]:
208        """
209        Override to return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
210        """
211        return {}
213    def request_body_data(
214        self,
215        stream_state: Optional[Mapping[str, Any]],
216        stream_slice: Optional[Mapping[str, Any]] = None,
217        next_page_token: Optional[Mapping[str, Any]] = None,
218    ) -> Optional[Union[Mapping[str, Any], str]]:
219        """
220        Override when creating POST/PUT/PATCH requests to populate the body of the request with a non-JSON payload.
222        If returns a ready text that it will be sent as is.
223        If returns a dict that it will be converted to a urlencoded form.
224        E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
226        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
227        """
228        return None
230    def request_body_json(
231        self,
232        stream_state: Optional[Mapping[str, Any]],
233        stream_slice: Optional[Mapping[str, Any]] = None,
234        next_page_token: Optional[Mapping[str, Any]] = None,
235    ) -> Optional[Mapping[str, Any]]:
236        """
237        Override when creating POST/PUT/PATCH requests to populate the body of the request with a JSON payload.
239        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
240        """
241        return None
243    def request_kwargs(
244        self,
245        stream_state: Optional[Mapping[str, Any]],
246        stream_slice: Optional[Mapping[str, Any]] = None,
247        next_page_token: Optional[Mapping[str, Any]] = None,
248    ) -> Mapping[str, Any]:
249        """
250        Override to return a mapping of keyword arguments to be used when creating the HTTP request.
251        Any option listed in for can be returned from
252        this method. Note that these options do not conflict with request-level options such as headers, request params, etc..
253        """
254        return {}
256    @abstractmethod
257    def parse_response(
258        self,
259        response: requests.Response,
260        *,
261        stream_state: Mapping[str, Any],
262        stream_slice: Optional[Mapping[str, Any]] = None,
263        next_page_token: Optional[Mapping[str, Any]] = None,
264    ) -> Iterable[Mapping[str, Any]]:
265        """
266        Parses the raw response object into a list of records.
267        By default, this returns an iterable containing the input. Override to parse differently.
268        :param response:
269        :param stream_state:
270        :param stream_slice:
271        :param next_page_token:
272        :return: An iterable containing the parsed response
273        """
275    def get_backoff_strategy(self) -> Optional[Union[BackoffStrategy, List[BackoffStrategy]]]:
276        """
277        Used to initialize Adapter to avoid breaking changes.
278        If Stream has a `backoff_time` method implementation, we know this stream uses old (pre-HTTPClient) backoff handlers and thus an adapter is needed.
280        Override to provide custom BackoffStrategy
281        :return Optional[BackoffStrategy]:
282        """
283        if hasattr(self, "backoff_time"):
284            return HttpStreamAdapterBackoffStrategy(self)
285        else:
286            return None
288    def get_error_handler(self) -> Optional[ErrorHandler]:
289        """
290        Used to initialize Adapter to avoid breaking changes.
291        If Stream has a `should_retry` method implementation, we know this stream uses old (pre-HTTPClient) error handlers and thus an adapter is needed.
293        Override to provide custom ErrorHandler
294        :return Optional[ErrorHandler]:
295        """
296        if hasattr(self, "should_retry"):
297            error_handler = HttpStreamAdapterHttpStatusErrorHandler(
298                stream=self,
299                logger=logging.getLogger(),
300                max_retries=self.max_retries,
301                max_time=timedelta(seconds=self.max_time or 0),
302            )
303            return error_handler
304        else:
305            return None
307    @classmethod
308    def _join_url(cls, url_base: str, path: str) -> str:
309        return urljoin(url_base, path)
311    @classmethod
312    def parse_response_error_message(cls, response: requests.Response) -> Optional[str]:
313        """
314        Parses the raw response object from a failed request into a user-friendly error message.
315        By default, this method tries to grab the error message from JSON responses by following common API patterns. Override to parse differently.
317        :param response:
318        :return: A user-friendly message that indicates the cause of the error
319        """
321        # default logic to grab error from common fields
322        def _try_get_error(value: Optional[JsonType]) -> Optional[str]:
323            if isinstance(value, str):
324                return value
325            elif isinstance(value, list):
326                errors_in_value = [_try_get_error(v) for v in value]
327                return ", ".join(v for v in errors_in_value if v is not None)
328            elif isinstance(value, dict):
329                new_value = (
330                    value.get("message")
331                    or value.get("messages")
332                    or value.get("error")
333                    or value.get("errors")
334                    or value.get("failures")
335                    or value.get("failure")
336                    or value.get("detail")
337                )
338                return _try_get_error(new_value)
339            return None
341        try:
342            body = response.json()
343            return _try_get_error(body)
344        except requests.exceptions.JSONDecodeError:
345            return None
347    def get_error_display_message(self, exception: BaseException) -> Optional[str]:
348        """
349        Retrieves the user-friendly display message that corresponds to an exception.
350        This will be called when encountering an exception while reading records from the stream, and used to build the AirbyteTraceMessage.
352        The default implementation of this method only handles HTTPErrors by passing the response to self.parse_response_error_message().
353        The method should be overriden as needed to handle any additional exception types.
355        :param exception: The exception that was raised
356        :return: A user-friendly message that indicates the cause of the error
357        """
358        if isinstance(exception, requests.HTTPError) and exception.response is not None:
359            return self.parse_response_error_message(exception.response)
360        return None
362    def read_records(
363        self,
364        sync_mode: SyncMode,
365        cursor_field: Optional[List[str]] = None,
366        stream_slice: Optional[Mapping[str, Any]] = None,
367        stream_state: Optional[Mapping[str, Any]] = None,
368    ) -> Iterable[StreamData]:
369        # A cursor_field indicates this is an incremental stream which offers better checkpointing than RFR enabled via the cursor
370        if self.cursor_field or not isinstance(self.get_cursor(), ResumableFullRefreshCursor):
371            yield from self._read_pages(
372                lambda req, res, state, _slice: self.parse_response(
373                    res, stream_slice=_slice, stream_state=state
374                ),
375                stream_slice,
376                stream_state,
377            )
378        else:
379            yield from self._read_single_page(
380                lambda req, res, state, _slice: self.parse_response(
381                    res, stream_slice=_slice, stream_state=state
382                ),
383                stream_slice,
384                stream_state,
385            )
387    @property
388    def state(self) -> MutableMapping[str, Any]:
389        cursor = self.get_cursor()
390        if cursor:
391            return cursor.get_stream_state()  # type: ignore
392        return self._state
394    @state.setter
395    def state(self, value: MutableMapping[str, Any]) -> None:
396        cursor = self.get_cursor()
397        if cursor:
398            cursor.set_initial_state(value)
399        self._state = value
401    def get_cursor(self) -> Optional[Cursor]:
402        # I don't love that this is semi-stateful but not sure what else to do. We don't know exactly what type of cursor to
403        # instantiate when creating the class. We can make a few assumptions like if there is a cursor_field which implies
404        # incremental, but we don't know until runtime if this is a substream. Ideally, a stream should explicitly define
405        # its cursor, but because we're trying to automatically apply RFR we're stuck with this logic where we replace the
406        # cursor at runtime once we detect this is a substream based on self.has_multiple_slices being reassigned
407        if self.has_multiple_slices and isinstance(self.cursor, ResumableFullRefreshCursor):
408            self.cursor = SubstreamResumableFullRefreshCursor()
409            return self.cursor
410        else:
411            return self.cursor
413    def _read_pages(
414        self,
415        records_generator_fn: Callable[
416            [
417                requests.PreparedRequest,
418                requests.Response,
419                Mapping[str, Any],
420                Optional[Mapping[str, Any]],
421            ],
422            Iterable[StreamData],
423        ],
424        stream_slice: Optional[Mapping[str, Any]] = None,
425        stream_state: Optional[Mapping[str, Any]] = None,
426    ) -> Iterable[StreamData]:
427        stream_state = stream_state or {}
428        pagination_complete = False
429        next_page_token = None
430        while not pagination_complete:
431            request, response = self._fetch_next_page(stream_slice, stream_state, next_page_token)
432            yield from records_generator_fn(request, response, stream_state, stream_slice)
434            next_page_token = self.next_page_token(response)
435            if not next_page_token:
436                pagination_complete = True
438        cursor = self.get_cursor()
439        if cursor and isinstance(cursor, SubstreamResumableFullRefreshCursor):
440            partition, _, _ = self._extract_slice_fields(stream_slice=stream_slice)
441            # Substreams checkpoint state by marking an entire parent partition as completed so that on the subsequent attempt
442            # after a failure, completed parents are skipped and the sync can make progress
443            cursor.close_slice(StreamSlice(cursor_slice={}, partition=partition))
445        # Always return an empty generator just in case no records were ever yielded
446        yield from []
448    def _read_single_page(
449        self,
450        records_generator_fn: Callable[
451            [
452                requests.PreparedRequest,
453                requests.Response,
454                Mapping[str, Any],
455                Optional[Mapping[str, Any]],
456            ],
457            Iterable[StreamData],
458        ],
459        stream_slice: Optional[Mapping[str, Any]] = None,
460        stream_state: Optional[Mapping[str, Any]] = None,
461    ) -> Iterable[StreamData]:
462        partition, cursor_slice, remaining_slice = self._extract_slice_fields(
463            stream_slice=stream_slice
464        )
465        stream_state = stream_state or {}
466        next_page_token = cursor_slice or None
468        request, response = self._fetch_next_page(remaining_slice, stream_state, next_page_token)
469        yield from records_generator_fn(request, response, stream_state, remaining_slice)
471        next_page_token = self.next_page_token(response) or {
472            "__ab_full_refresh_sync_complete": True
473        }
475        cursor = self.get_cursor()
476        if cursor:
477            cursor.close_slice(StreamSlice(cursor_slice=next_page_token, partition=partition))
479        # Always return an empty generator just in case no records were ever yielded
480        yield from []
482    @staticmethod
483    def _extract_slice_fields(
484        stream_slice: Optional[Mapping[str, Any]],
485    ) -> tuple[Mapping[str, Any], Mapping[str, Any], Mapping[str, Any]]:
486        if not stream_slice:
487            return {}, {}, {}
489        if isinstance(stream_slice, StreamSlice):
490            partition = stream_slice.partition
491            cursor_slice = stream_slice.cursor_slice
492            remaining = {k: v for k, v in stream_slice.items()}
493        else:
494            # RFR streams that implement stream_slices() to generate stream slices in the legacy mapping format are converted into a
495            # structured stream slice mapping by the LegacyCursorBasedCheckpointReader. The structured mapping object has separate
496            # fields for the partition and cursor_slice value
497            partition = stream_slice.get("partition", {})
498            cursor_slice = stream_slice.get("cursor_slice", {})
499            remaining = {
500                key: val
501                for key, val in stream_slice.items()
502                if key != "partition" and key != "cursor_slice"
503            }
504        return partition, cursor_slice, remaining
506    def _fetch_next_page(
507        self,
508        stream_slice: Optional[Mapping[str, Any]] = None,
509        stream_state: Optional[Mapping[str, Any]] = None,
510        next_page_token: Optional[Mapping[str, Any]] = None,
511    ) -> Tuple[requests.PreparedRequest, requests.Response]:
512        request, response = self._http_client.send_request(
513            http_method=self.http_method,
514            url=self._join_url(
515                self.url_base,
516                self.path(
517                    stream_state=stream_state,
518                    stream_slice=stream_slice,
519                    next_page_token=next_page_token,
520                ),
521            ),
522            request_kwargs=self.request_kwargs(
523                stream_state=stream_state,
524                stream_slice=stream_slice,
525                next_page_token=next_page_token,
526            ),
527            headers=self.request_headers(
528                stream_state=stream_state,
529                stream_slice=stream_slice,
530                next_page_token=next_page_token,
531            ),
532            params=self.request_params(
533                stream_state=stream_state,
534                stream_slice=stream_slice,
535                next_page_token=next_page_token,
536            ),
537            json=self.request_body_json(
538                stream_state=stream_state,
539                stream_slice=stream_slice,
540                next_page_token=next_page_token,
541            ),
542            data=self.request_body_data(
543                stream_state=stream_state,
544                stream_slice=stream_slice,
545                next_page_token=next_page_token,
546            ),
547            dedupe_query_params=True,
548            log_formatter=self.get_log_formatter(),
549            exit_on_rate_limit=self.exit_on_rate_limit,
550        )
552        return request, response
554    def get_log_formatter(self) -> Optional[Callable[[requests.Response], Any]]:
555        """
557        :return Optional[Callable[[requests.Response], Any]]: Function that will be used in logging inside HttpClient
558        """
559        return None

class HttpSubStream(airbyte_cdk.sources.streams.http.HttpStream, abc.ABC):
562class HttpSubStream(HttpStream, ABC):
563    def __init__(self, parent: HttpStream, **kwargs: Any):
564        """
565        :param parent: should be the instance of HttpStream class
566        """
567        super().__init__(**kwargs)
568        self.parent = parent
569        self.has_multiple_slices = (
570            True  # Substreams are based on parent records which implies there are multiple slices
571        )
573        # There are three conditions that dictate if RFR should automatically be applied to a stream
574        # 1. Streams that explicitly initialize their own cursor should defer to it and not automatically apply RFR
575        # 2. Streams with at least one cursor_field are incremental and thus a superior sync to RFR.
576        # 3. Streams overriding read_records() do not guarantee that they will call the parent implementation which can perform
577        #    per-page checkpointing so RFR is only supported if a stream use the default `HttpStream.read_records()` method
578        if (
579            not self.cursor
580            and len(self.cursor_field) == 0
581            and type(self).read_records is HttpStream.read_records
582        ):
583            self.cursor = SubstreamResumableFullRefreshCursor()
585    def stream_slices(
586        self,
587        sync_mode: SyncMode,
588        cursor_field: Optional[List[str]] = None,
589        stream_state: Optional[Mapping[str, Any]] = None,
590    ) -> Iterable[Optional[Mapping[str, Any]]]:
591        # read_stateless() assumes the parent is not concurrent. This is currently okay since the concurrent CDK does
592        # not support either substreams or RFR, but something that needs to be considered once we do
593        for parent_record in self.parent.read_only_records(stream_state):
594            # Skip non-records (eg AirbyteLogMessage)
595            if isinstance(parent_record, AirbyteMessage):
596                if parent_record.type == MessageType.RECORD:
597                    parent_record =  # type: ignore [assignment, union-attr]  # Incorrect type for assignment
598                else:
599                    continue
600            elif isinstance(parent_record, Record):
601                parent_record =
602            yield {"parent": parent_record}

HttpSubStream( parent: HttpStream, **kwargs: Any)
563    def __init__(self, parent: HttpStream, **kwargs: Any):
564        """
565        :param parent: should be the instance of HttpStream class
566        """
567        super().__init__(**kwargs)
568        self.parent = parent
569        self.has_multiple_slices = (
570            True  # Substreams are based on parent records which implies there are multiple slices
571        )
573        # There are three conditions that dictate if RFR should automatically be applied to a stream
574        # 1. Streams that explicitly initialize their own cursor should defer to it and not automatically apply RFR
575        # 2. Streams with at least one cursor_field are incremental and thus a superior sync to RFR.
576        # 3. Streams overriding read_records() do not guarantee that they will call the parent implementation which can perform
577        #    per-page checkpointing so RFR is only supported if a stream use the default `HttpStream.read_records()` method
578        if (
579            not self.cursor
580            and len(self.cursor_field) == 0
581            and type(self).read_records is HttpStream.read_records
582        ):
583            self.cursor = SubstreamResumableFullRefreshCursor()
  • parent: should be the instance of HttpStream class
def stream_slices( self, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None) -> Iterable[Optional[Mapping[str, Any]]]:
585    def stream_slices(
586        self,
587        sync_mode: SyncMode,
588        cursor_field: Optional[List[str]] = None,
589        stream_state: Optional[Mapping[str, Any]] = None,
590    ) -> Iterable[Optional[Mapping[str, Any]]]:
591        # read_stateless() assumes the parent is not concurrent. This is currently okay since the concurrent CDK does
592        # not support either substreams or RFR, but something that needs to be considered once we do
593        for parent_record in self.parent.read_only_records(stream_state):
594            # Skip non-records (eg AirbyteLogMessage)
595            if isinstance(parent_record, AirbyteMessage):
596                if parent_record.type == MessageType.RECORD:
597                    parent_record =  # type: ignore [assignment, union-attr]  # Incorrect type for assignment
598                else:
599                    continue
600            elif isinstance(parent_record, Record):
601                parent_record =
602            yield {"parent": parent_record}

Override to define the slices for this stream. See the stream slicing section of the docs for more information.

  • sync_mode:
  • cursor_field:
  • stream_state:
class UserDefinedBackoffException(airbyte_cdk.sources.streams.http.exceptions.BaseBackoffException):
36class UserDefinedBackoffException(BaseBackoffException):
37    """
38    An exception that exposes how long it attempted to backoff
39    """
41    def __init__(
42        self,
43        backoff: Union[int, float],
44        request: requests.PreparedRequest,
45        response: Optional[Union[requests.Response, Exception]],
46        error_message: str = "",
47    ):
48        """
49        :param backoff: how long to backoff in seconds
50        :param request: the request that triggered this backoff exception
51        :param response: the response that triggered the backoff exception
52        """
53        self.backoff = backoff
54        super().__init__(request=request, response=response, error_message=error_message)

An exception that exposes how long it attempted to backoff

UserDefinedBackoffException( backoff: Union[int, float], request: requests.models.PreparedRequest, response: Union[requests.models.Response, Exception, NoneType], error_message: str = '')
41    def __init__(
42        self,
43        backoff: Union[int, float],
44        request: requests.PreparedRequest,
45        response: Optional[Union[requests.Response, Exception]],
46        error_message: str = "",
47    ):
48        """
49        :param backoff: how long to backoff in seconds
50        :param request: the request that triggered this backoff exception
51        :param response: the response that triggered the backoff exception
52        """
53        self.backoff = backoff
54        super().__init__(request=request, response=response, error_message=error_message)
  • backoff: how long to backoff in seconds
  • request: the request that triggered this backoff exception
  • response: the response that triggered the backoff exception