airbyte_cdk.sources.streams.http
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5# Initialize Streams Package 6from .exceptions import UserDefinedBackoffException 7from .http import HttpStream, HttpSubStream 8from .http_client import HttpClient 9 10__all__ = ["HttpClient", "HttpStream", "HttpSubStream", "UserDefinedBackoffException"]
103class HttpClient: 104 _DEFAULT_MAX_RETRY: int = 5 105 _DEFAULT_MAX_TIME: int = 60 * 10 106 _ACTIONS_TO_RETRY_ON = { 107 ResponseAction.RETRY, 108 ResponseAction.RATE_LIMITED, 109 ResponseAction.REFRESH_TOKEN_THEN_RETRY, 110 } 111 112 def __init__( 113 self, 114 name: str, 115 logger: logging.Logger, 116 error_handler: Optional[ErrorHandler] = None, 117 api_budget: Optional[APIBudget] = None, 118 session: Optional[Union[requests.Session, requests_cache.CachedSession]] = None, 119 authenticator: Optional[AuthBase] = None, 120 use_cache: bool = False, 121 backoff_strategy: Optional[Union[BackoffStrategy, List[BackoffStrategy]]] = None, 122 error_message_parser: Optional[ErrorMessageParser] = None, 123 disable_retries: bool = False, 124 message_repository: Optional[MessageRepository] = None, 125 ): 126 self._name = name 127 self._api_budget: APIBudget = api_budget or APIBudget(policies=[]) 128 if session: 129 self._session = session 130 else: 131 self._use_cache = use_cache 132 self._session = self._request_session() 133 self._session.mount( 134 "https://", 135 requests.adapters.HTTPAdapter( 136 pool_connections=MAX_CONNECTION_POOL_SIZE, pool_maxsize=MAX_CONNECTION_POOL_SIZE 137 ), 138 ) 139 if isinstance(authenticator, AuthBase): 140 self._session.auth = authenticator 141 self._logger = logger 142 self._error_handler = error_handler or HttpStatusErrorHandler(self._logger) 143 if backoff_strategy is not None: 144 if isinstance(backoff_strategy, list): 145 self._backoff_strategies = backoff_strategy 146 else: 147 self._backoff_strategies = [backoff_strategy] 148 else: 149 self._backoff_strategies = [DefaultBackoffStrategy()] 150 self._error_message_parser = error_message_parser or JsonErrorMessageParser() 151 self._request_attempt_count: Dict[requests.PreparedRequest, int] = {} 152 self._disable_retries = disable_retries 153 self._message_repository = message_repository 154 155 @property 156 def cache_filename(self) -> str: 157 """ 158 Override if needed. Return the name of cache file 159 Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only. 160 """ 161 return f"{self._name}.sqlite" 162 163 def _request_session(self) -> requests.Session: 164 """ 165 Session factory based on use_cache property and call rate limits (api_budget parameter) 166 :return: instance of request-based session 167 """ 168 if self._use_cache: 169 cache_dir = os.getenv(ENV_REQUEST_CACHE_PATH) 170 # Use in-memory cache if cache_dir is not set 171 # This is a non-obvious interface, but it ensures we don't write sql files when running unit tests 172 # Use in-memory cache if cache_dir is not set 173 # This is a non-obvious interface, but it ensures we don't write sql files when running unit tests 174 sqlite_path = ( 175 str(Path(cache_dir) / self.cache_filename) 176 if cache_dir 177 else "file::memory:?cache=shared" 178 ) 179 # By using `PRAGMA synchronous=OFF` and `PRAGMA journal_mode=WAL`, we reduce the possible occurrences of `database table is locked` errors. 180 # Note that those were blindly added at the same time and one or the other might be sufficient to prevent the issues but we have seen good results with both. Feel free to revisit given more information. 181 # There are strong signals that `fast_save` might create problems but if the sync crashes, we start back from the beginning in terms of sqlite anyway so the impact should be minimal. Signals are: 182 # * https://github.com/requests-cache/requests-cache/commit/7fa89ffda300331c37d8fad7f773348a3b5b0236#diff-f43db4a5edf931647c32dec28ea7557aae4cae8444af4b26c8ecbe88d8c925aaR238 183 # * https://github.com/requests-cache/requests-cache/commit/7fa89ffda300331c37d8fad7f773348a3b5b0236#diff-2e7f95b7d7be270ff1a8118f817ea3e6663cdad273592e536a116c24e6d23c18R164-R168 184 # * `If the application running SQLite crashes, the data will be safe, but the database [might become corrupted](https://www.sqlite.org/howtocorrupt.html#cfgerr) if the operating system crashes or the computer loses power before that data has been written to the disk surface.` in [this description](https://www.sqlite.org/pragma.html#pragma_synchronous). 185 backend = requests_cache.SQLiteCache(sqlite_path, fast_save=True, wal=True) 186 return CachedLimiterSession( 187 cache_name=sqlite_path, 188 backend=backend, 189 api_budget=self._api_budget, 190 match_headers=True, 191 ) 192 else: 193 return LimiterSession(api_budget=self._api_budget) 194 195 def clear_cache(self) -> None: 196 """ 197 Clear cached requests for current session, can be called any time 198 """ 199 if isinstance(self._session, requests_cache.CachedSession): 200 self._session.cache.clear() # type: ignore # cache.clear is not typed 201 202 def _dedupe_query_params( 203 self, url: str, params: Optional[Mapping[str, str]] 204 ) -> Mapping[str, str]: 205 """ 206 Remove query parameters from params mapping if they are already encoded in the URL. 207 :param url: URL with 208 :param params: 209 :return: 210 """ 211 if params is None: 212 params = {} 213 query_string = urllib.parse.urlparse(url).query 214 query_dict = {k: v[0] for k, v in urllib.parse.parse_qs(query_string).items()} 215 216 duplicate_keys_with_same_value = { 217 k for k in query_dict.keys() if str(params.get(k)) == str(query_dict[k]) 218 } 219 return {k: v for k, v in params.items() if k not in duplicate_keys_with_same_value} 220 221 def _create_prepared_request( 222 self, 223 http_method: str, 224 url: str, 225 dedupe_query_params: bool = False, 226 headers: Optional[Mapping[str, str]] = None, 227 params: Optional[Mapping[str, str]] = None, 228 json: Optional[Mapping[str, Any]] = None, 229 data: Optional[Union[str, Mapping[str, Any]]] = None, 230 ) -> requests.PreparedRequest: 231 if dedupe_query_params: 232 query_params = self._dedupe_query_params(url, params) 233 else: 234 query_params = params or {} 235 args = {"method": http_method, "url": url, "headers": headers, "params": query_params} 236 if http_method.upper() in BODY_REQUEST_METHODS: 237 if json and data: 238 raise RequestBodyException( 239 "At the same time only one of the 'request_body_data' and 'request_body_json' functions can return data" 240 ) 241 elif json: 242 args["json"] = json 243 elif data: 244 args["data"] = data 245 prepared_request: requests.PreparedRequest = self._session.prepare_request( 246 requests.Request(**args) 247 ) 248 249 return prepared_request 250 251 @property 252 def _max_retries(self) -> int: 253 """ 254 Determines the max retries based on the provided error handler. 255 """ 256 max_retries = None 257 if self._disable_retries: 258 max_retries = 0 259 else: 260 max_retries = self._error_handler.max_retries 261 return max_retries if max_retries is not None else self._DEFAULT_MAX_RETRY 262 263 @property 264 def _max_time(self) -> int: 265 """ 266 Determines the max time based on the provided error handler. 267 """ 268 return ( 269 self._error_handler.max_time 270 if self._error_handler.max_time is not None 271 else self._DEFAULT_MAX_TIME 272 ) 273 274 def _send_with_retry( 275 self, 276 request: requests.PreparedRequest, 277 request_kwargs: Mapping[str, Any], 278 log_formatter: Optional[Callable[[requests.Response], Any]] = None, 279 exit_on_rate_limit: Optional[bool] = False, 280 ) -> requests.Response: 281 """ 282 Sends a request with retry logic. 283 284 Args: 285 request (requests.PreparedRequest): The prepared HTTP request to send. 286 request_kwargs (Mapping[str, Any]): Additional keyword arguments for the request. 287 288 Returns: 289 requests.Response: The HTTP response received from the server after retries. 290 """ 291 292 max_retries = self._max_retries 293 max_tries = max(0, max_retries) + 1 294 max_time = self._max_time 295 296 user_backoff_handler = user_defined_backoff_handler(max_tries=max_tries, max_time=max_time)( 297 self._send 298 ) 299 rate_limit_backoff_handler = rate_limit_default_backoff_handler(max_tries=max_tries) 300 backoff_handler = http_client_default_backoff_handler( 301 max_tries=max_tries, max_time=max_time 302 ) 303 # backoff handlers wrap _send, so it will always return a response -- except when all retries are exhausted 304 try: 305 response = backoff_handler(rate_limit_backoff_handler(user_backoff_handler))( 306 request, 307 request_kwargs, 308 log_formatter=log_formatter, 309 exit_on_rate_limit=exit_on_rate_limit, 310 ) # type: ignore # mypy can't infer that backoff_handler wraps _send 311 312 return response 313 except BaseBackoffException as e: 314 self._logger.error(f"Retries exhausted with backoff exception.", exc_info=True) 315 raise MessageRepresentationAirbyteTracedErrors( 316 internal_message=f"Exhausted available request attempts. Exception: {e}", 317 message=f"Exhausted available request attempts. Please see logs for more details. Exception: {e}", 318 failure_type=e.failure_type or FailureType.system_error, 319 exception=e, 320 stream_descriptor=StreamDescriptor(name=self._name), 321 ) 322 323 def _send( 324 self, 325 request: requests.PreparedRequest, 326 request_kwargs: Mapping[str, Any], 327 log_formatter: Optional[Callable[[requests.Response], Any]] = None, 328 exit_on_rate_limit: Optional[bool] = False, 329 ) -> requests.Response: 330 if request not in self._request_attempt_count: 331 self._request_attempt_count[request] = 1 332 else: 333 self._request_attempt_count[request] += 1 334 if hasattr(self._session, "auth") and isinstance(self._session.auth, AuthBase): 335 self._session.auth(request) 336 337 self._logger.debug( 338 "Making outbound API request", 339 extra={"headers": request.headers, "url": request.url, "request_body": request.body}, 340 ) 341 342 response: Optional[requests.Response] = None 343 exc: Optional[requests.RequestException] = None 344 345 try: 346 response = self._session.send(request, **request_kwargs) 347 except requests.RequestException as e: 348 exc = e 349 350 error_resolution: ErrorResolution = self._error_handler.interpret_response( 351 response if response is not None else exc 352 ) 353 354 # Evaluation of response.text can be heavy, for example, if streaming a large response 355 # Do it only in debug mode 356 if self._logger.isEnabledFor(logging.DEBUG) and response is not None: 357 if request_kwargs.get("stream"): 358 self._logger.debug( 359 "Receiving response, but not logging it as the response is streamed", 360 extra={"headers": response.headers, "status": response.status_code}, 361 ) 362 else: 363 self._logger.debug( 364 "Receiving response", 365 extra={ 366 "headers": response.headers, 367 "status": response.status_code, 368 "body": response.text, 369 }, 370 ) 371 372 # Request/response logging for declarative cdk 373 if ( 374 log_formatter is not None 375 and response is not None 376 and self._message_repository is not None 377 ): 378 formatter = log_formatter 379 self._message_repository.log_message( 380 Level.DEBUG, 381 lambda: formatter(response), 382 ) 383 384 self._handle_error_resolution( 385 response=response, 386 exc=exc, 387 request=request, 388 error_resolution=error_resolution, 389 exit_on_rate_limit=exit_on_rate_limit, 390 ) 391 392 return response # type: ignore # will either return a valid response of type requests.Response or raise an exception 393 394 def _get_response_body(self, response: requests.Response) -> Optional[JsonType]: 395 """ 396 Extracts and returns the body of an HTTP response. 397 398 This method attempts to parse the response body as JSON. If the response 399 body is not valid JSON, it falls back to decoding the response content 400 as a UTF-8 string. If both attempts fail, it returns None. 401 402 Args: 403 response (requests.Response): The HTTP response object. 404 405 Returns: 406 Optional[JsonType]: The parsed JSON object as a string, the decoded 407 response content as a string, or None if both parsing attempts fail. 408 """ 409 try: 410 return str(response.json()) 411 except requests.exceptions.JSONDecodeError: 412 try: 413 return response.content.decode("utf-8") 414 except Exception: 415 return "The Content of the Response couldn't be decoded." 416 417 def _evict_key(self, prepared_request: requests.PreparedRequest) -> None: 418 """ 419 Addresses high memory consumption when enabling concurrency in https://github.com/airbytehq/oncall/issues/6821. 420 421 The `_request_attempt_count` attribute keeps growing as multiple requests are made using the same `http_client`. 422 To mitigate this issue, we evict keys for completed requests once we confirm that no further retries are needed. 423 This helps manage memory usage more efficiently while maintaining the necessary logic for retry attempts. 424 """ 425 if prepared_request in self._request_attempt_count: 426 del self._request_attempt_count[prepared_request] 427 428 def _handle_error_resolution( 429 self, 430 response: Optional[requests.Response], 431 exc: Optional[requests.RequestException], 432 request: requests.PreparedRequest, 433 error_resolution: ErrorResolution, 434 exit_on_rate_limit: Optional[bool] = False, 435 ) -> None: 436 if error_resolution.response_action not in self._ACTIONS_TO_RETRY_ON: 437 self._evict_key(request) 438 439 if error_resolution.response_action == ResponseAction.RESET_PAGINATION: 440 raise PaginationResetRequiredException() 441 442 # Emit stream status RUNNING with the reason RATE_LIMITED to log that the rate limit has been reached 443 if error_resolution.response_action == ResponseAction.RATE_LIMITED: 444 # TODO: Update to handle with message repository when concurrent message repository is ready 445 reasons = [AirbyteStreamStatusReason(type=AirbyteStreamStatusReasonType.RATE_LIMITED)] 446 message = orjson.dumps( 447 AirbyteMessageSerializer.dump( 448 stream_status_as_airbyte_message( 449 StreamDescriptor(name=self._name), AirbyteStreamStatus.RUNNING, reasons 450 ) 451 ) 452 ).decode() 453 454 # Simply printing the stream status is a temporary solution and can cause future issues. Currently, the _send method is 455 # wrapped with backoff decorators, and we can only emit messages by iterating record_iterator in the abstract source at the 456 # end of the retry decorator behavior. This approach does not allow us to emit messages in the queue before exiting the 457 # backoff retry loop. Adding `\n` to the message and ignore 'end' ensure that few messages are printed at the same time. 458 print(f"{message}\n", end="", flush=True) 459 460 # Handle REFRESH_TOKEN_THEN_RETRY: Force refresh the OAuth token before retry 461 # This is useful when the API returns 401 but the stored token expiry hasn't been reached yet 462 # Only OAuth authenticators have refresh_and_set_access_token method 463 # Non-OAuth auth types (e.g., BearerAuthenticator) will fall through to normal retry 464 if error_resolution.response_action == ResponseAction.REFRESH_TOKEN_THEN_RETRY: 465 if ( 466 hasattr(self._session, "auth") 467 and self._session.auth is not None 468 and hasattr(self._session.auth, "refresh_and_set_access_token") 469 ): 470 try: 471 self._session.auth.refresh_and_set_access_token() # type: ignore[union-attr] 472 self._logger.info( 473 "Refreshed OAuth token due to REFRESH_TOKEN_THEN_RETRY response action" 474 ) 475 except Exception as refresh_error: 476 self._logger.warning( 477 f"Failed to refresh OAuth token: {refresh_error}. Proceeding with retry using existing token." 478 ) 479 else: 480 self._logger.warning( 481 "REFRESH_TOKEN_THEN_RETRY action received but authenticator does not support token refresh. " 482 "Proceeding with normal retry." 483 ) 484 485 if error_resolution.response_action == ResponseAction.FAIL: 486 if response is not None: 487 filtered_response_message = filter_secrets( 488 f"Request (body): '{str(request.body)}'. Response (body): '{self._get_response_body(response)}'. Response (headers): '{response.headers}'." 489 ) 490 error_message = f"'{request.method}' request to '{request.url}' failed with status code '{response.status_code}' and error message: '{self._error_message_parser.parse_response_error_message(response)}'. {filtered_response_message}" 491 else: 492 error_message = ( 493 f"'{request.method}' request to '{request.url}' failed with exception: '{exc}'" 494 ) 495 496 # ensure the exception message is emitted before raised 497 self._logger.error(error_message) 498 499 raise MessageRepresentationAirbyteTracedErrors( 500 internal_message=error_message, 501 message=error_resolution.error_message or error_message, 502 failure_type=error_resolution.failure_type, 503 ) 504 505 elif error_resolution.response_action == ResponseAction.IGNORE: 506 if response is not None: 507 log_message = f"Ignoring response for '{request.method}' request to '{request.url}' with response code '{response.status_code}'" 508 else: 509 log_message = f"Ignoring response for '{request.method}' request to '{request.url}' with error '{exc}'" 510 511 self._logger.info(error_resolution.error_message or log_message) 512 513 # TODO: Consider dynamic retry count depending on subsequent error codes 514 elif error_resolution.response_action in ( 515 ResponseAction.RETRY, 516 ResponseAction.RATE_LIMITED, 517 ResponseAction.REFRESH_TOKEN_THEN_RETRY, 518 ): 519 user_defined_backoff_time = None 520 for backoff_strategy in self._backoff_strategies: 521 backoff_time = backoff_strategy.backoff_time( 522 response_or_exception=response if response is not None else exc, 523 attempt_count=self._request_attempt_count[request], 524 ) 525 if backoff_time: 526 user_defined_backoff_time = backoff_time 527 break 528 error_message = ( 529 error_resolution.error_message 530 or f"Request to {request.url} failed with failure type {error_resolution.failure_type}, response action {error_resolution.response_action}." 531 ) 532 533 retry_endlessly = ( 534 error_resolution.response_action == ResponseAction.RATE_LIMITED 535 and not exit_on_rate_limit 536 ) 537 538 if user_defined_backoff_time: 539 raise UserDefinedBackoffException( 540 backoff=user_defined_backoff_time, 541 request=request, 542 response=(response if response is not None else exc), 543 error_message=error_message, 544 failure_type=error_resolution.failure_type, 545 ) 546 547 elif retry_endlessly: 548 raise RateLimitBackoffException( 549 request=request, 550 response=(response if response is not None else exc), 551 error_message=error_message, 552 failure_type=error_resolution.failure_type, 553 ) 554 555 raise DefaultBackoffException( 556 request=request, 557 response=(response if response is not None else exc), 558 error_message=error_message, 559 failure_type=error_resolution.failure_type, 560 ) 561 562 elif response: 563 try: 564 response.raise_for_status() 565 except requests.HTTPError as e: 566 self._logger.error(response.text) 567 raise e 568 569 @property 570 def name(self) -> str: 571 return self._name 572 573 def send_request( 574 self, 575 http_method: str, 576 url: str, 577 request_kwargs: Mapping[str, Any], 578 headers: Optional[Mapping[str, str]] = None, 579 params: Optional[Mapping[str, str]] = None, 580 json: Optional[Mapping[str, Any]] = None, 581 data: Optional[Union[str, Mapping[str, Any]]] = None, 582 dedupe_query_params: bool = False, 583 log_formatter: Optional[Callable[[requests.Response], Any]] = None, 584 exit_on_rate_limit: Optional[bool] = False, 585 ) -> Tuple[requests.PreparedRequest, requests.Response]: 586 """ 587 Prepares and sends request and return request and response objects. 588 """ 589 590 request: requests.PreparedRequest = self._create_prepared_request( 591 http_method=http_method, 592 url=url, 593 dedupe_query_params=dedupe_query_params, 594 headers=headers, 595 params=params, 596 json=json, 597 data=data, 598 ) 599 600 env_settings = self._session.merge_environment_settings( 601 url=request.url, 602 proxies=request_kwargs.get("proxies", {}), 603 stream=request_kwargs.get("stream"), 604 verify=request_kwargs.get("verify"), 605 cert=request_kwargs.get("cert"), 606 ) 607 request_kwargs = {**request_kwargs, **env_settings} 608 609 response: requests.Response = self._send_with_retry( 610 request=request, 611 request_kwargs=request_kwargs, 612 log_formatter=log_formatter, 613 exit_on_rate_limit=exit_on_rate_limit, 614 ) 615 616 return request, response
112 def __init__( 113 self, 114 name: str, 115 logger: logging.Logger, 116 error_handler: Optional[ErrorHandler] = None, 117 api_budget: Optional[APIBudget] = None, 118 session: Optional[Union[requests.Session, requests_cache.CachedSession]] = None, 119 authenticator: Optional[AuthBase] = None, 120 use_cache: bool = False, 121 backoff_strategy: Optional[Union[BackoffStrategy, List[BackoffStrategy]]] = None, 122 error_message_parser: Optional[ErrorMessageParser] = None, 123 disable_retries: bool = False, 124 message_repository: Optional[MessageRepository] = None, 125 ): 126 self._name = name 127 self._api_budget: APIBudget = api_budget or APIBudget(policies=[]) 128 if session: 129 self._session = session 130 else: 131 self._use_cache = use_cache 132 self._session = self._request_session() 133 self._session.mount( 134 "https://", 135 requests.adapters.HTTPAdapter( 136 pool_connections=MAX_CONNECTION_POOL_SIZE, pool_maxsize=MAX_CONNECTION_POOL_SIZE 137 ), 138 ) 139 if isinstance(authenticator, AuthBase): 140 self._session.auth = authenticator 141 self._logger = logger 142 self._error_handler = error_handler or HttpStatusErrorHandler(self._logger) 143 if backoff_strategy is not None: 144 if isinstance(backoff_strategy, list): 145 self._backoff_strategies = backoff_strategy 146 else: 147 self._backoff_strategies = [backoff_strategy] 148 else: 149 self._backoff_strategies = [DefaultBackoffStrategy()] 150 self._error_message_parser = error_message_parser or JsonErrorMessageParser() 151 self._request_attempt_count: Dict[requests.PreparedRequest, int] = {} 152 self._disable_retries = disable_retries 153 self._message_repository = message_repository
155 @property 156 def cache_filename(self) -> str: 157 """ 158 Override if needed. Return the name of cache file 159 Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only. 160 """ 161 return f"{self._name}.sqlite"
Override if needed. Return the name of cache file Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.
195 def clear_cache(self) -> None: 196 """ 197 Clear cached requests for current session, can be called any time 198 """ 199 if isinstance(self._session, requests_cache.CachedSession): 200 self._session.cache.clear() # type: ignore # cache.clear is not typed
Clear cached requests for current session, can be called any time
573 def send_request( 574 self, 575 http_method: str, 576 url: str, 577 request_kwargs: Mapping[str, Any], 578 headers: Optional[Mapping[str, str]] = None, 579 params: Optional[Mapping[str, str]] = None, 580 json: Optional[Mapping[str, Any]] = None, 581 data: Optional[Union[str, Mapping[str, Any]]] = None, 582 dedupe_query_params: bool = False, 583 log_formatter: Optional[Callable[[requests.Response], Any]] = None, 584 exit_on_rate_limit: Optional[bool] = False, 585 ) -> Tuple[requests.PreparedRequest, requests.Response]: 586 """ 587 Prepares and sends request and return request and response objects. 588 """ 589 590 request: requests.PreparedRequest = self._create_prepared_request( 591 http_method=http_method, 592 url=url, 593 dedupe_query_params=dedupe_query_params, 594 headers=headers, 595 params=params, 596 json=json, 597 data=data, 598 ) 599 600 env_settings = self._session.merge_environment_settings( 601 url=request.url, 602 proxies=request_kwargs.get("proxies", {}), 603 stream=request_kwargs.get("stream"), 604 verify=request_kwargs.get("verify"), 605 cert=request_kwargs.get("cert"), 606 ) 607 request_kwargs = {**request_kwargs, **env_settings} 608 609 response: requests.Response = self._send_with_retry( 610 request=request, 611 request_kwargs=request_kwargs, 612 log_formatter=log_formatter, 613 exit_on_rate_limit=exit_on_rate_limit, 614 ) 615 616 return request, response
Prepares and sends request and return request and response objects.
45class HttpStream(Stream, CheckpointMixin, ABC): 46 """ 47 Base abstract class for an Airbyte Stream using the HTTP protocol. Basic building block for users building an Airbyte source for a HTTP API. 48 """ 49 50 source_defined_cursor = True # Most HTTP streams use a source defined cursor (i.e: the user can't configure it like on a SQL table) 51 page_size: Optional[int] = ( 52 None # Use this variable to define page size for API http requests with pagination support 53 ) 54 55 def __init__( 56 self, authenticator: Optional[AuthBase] = None, api_budget: Optional[APIBudget] = None 57 ): 58 self._exit_on_rate_limit: bool = False 59 self._http_client = HttpClient( 60 name=self.name, 61 logger=self.logger, 62 error_handler=self.get_error_handler(), 63 api_budget=api_budget or APIBudget(policies=[]), 64 authenticator=authenticator, 65 use_cache=self.use_cache, 66 backoff_strategy=self.get_backoff_strategy(), 67 message_repository=InMemoryMessageRepository(), 68 ) 69 70 # There are three conditions that dictate if RFR should automatically be applied to a stream 71 # 1. Streams that explicitly initialize their own cursor should defer to it and not automatically apply RFR 72 # 2. Streams with at least one cursor_field are incremental and thus a superior sync to RFR. 73 # 3. Streams overriding read_records() do not guarantee that they will call the parent implementation which can perform 74 # per-page checkpointing so RFR is only supported if a stream use the default `HttpStream.read_records()` method 75 if ( 76 not self.cursor 77 and len(self.cursor_field) == 0 78 and type(self).read_records is HttpStream.read_records 79 ): 80 self.cursor = ResumableFullRefreshCursor() 81 82 @property 83 def exit_on_rate_limit(self) -> bool: 84 """ 85 :return: False if the stream will retry endlessly when rate limited 86 """ 87 return self._exit_on_rate_limit 88 89 @exit_on_rate_limit.setter 90 def exit_on_rate_limit(self, value: bool) -> None: 91 self._exit_on_rate_limit = value 92 93 @property 94 def cache_filename(self) -> str: 95 """ 96 Override if needed. Return the name of cache file 97 Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only. 98 """ 99 return f"{self.name}.sqlite" 100 101 @property 102 def use_cache(self) -> bool: 103 """ 104 Override if needed. If True, all records will be cached. 105 Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only. 106 """ 107 return False 108 109 @property 110 @abstractmethod 111 def url_base(self) -> str: 112 """ 113 :return: URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/" 114 """ 115 116 @property 117 def http_method(self) -> str: 118 """ 119 Override if needed. See get_request_data/get_request_json if using POST/PUT/PATCH. 120 """ 121 return "GET" 122 123 @property 124 @deprecated( 125 "Deprecated as of CDK version 3.0.0. " 126 "You should set error_handler explicitly in HttpStream.get_error_handler() instead." 127 ) 128 def raise_on_http_errors(self) -> bool: 129 """ 130 Override if needed. If set to False, allows opting-out of raising HTTP code exception. 131 """ 132 return True 133 134 @property 135 @deprecated( 136 "Deprecated as of CDK version 3.0.0. " 137 "You should set backoff_strategies explicitly in HttpStream.get_backoff_strategy() instead." 138 ) 139 def max_retries(self) -> Union[int, None]: 140 """ 141 Override if needed. Specifies maximum amount of retries for backoff policy. Return None for no limit. 142 """ 143 return 5 144 145 @property 146 @deprecated( 147 "Deprecated as of CDK version 3.0.0. " 148 "You should set backoff_strategies explicitly in HttpStream.get_backoff_strategy() instead." 149 ) 150 def max_time(self) -> Union[int, None]: 151 """ 152 Override if needed. Specifies maximum total waiting time (in seconds) for backoff policy. Return None for no limit. 153 """ 154 return 60 * 10 155 156 @property 157 @deprecated( 158 "Deprecated as of CDK version 3.0.0. " 159 "You should set backoff_strategies explicitly in HttpStream.get_backoff_strategy() instead." 160 ) 161 def retry_factor(self) -> float: 162 """ 163 Override if needed. Specifies factor for backoff policy. 164 """ 165 return 5 166 167 @abstractmethod 168 def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: 169 """ 170 Override this method to define a pagination strategy. 171 172 The value returned from this method is passed to most other methods in this class. Use it to form a request e.g: set headers or query params. 173 174 :return: The token for the next page from the input response object. Returning None means there are no more pages to read in this response. 175 """ 176 177 @abstractmethod 178 def path( 179 self, 180 *, 181 stream_state: Optional[Mapping[str, Any]] = None, 182 stream_slice: Optional[Mapping[str, Any]] = None, 183 next_page_token: Optional[Mapping[str, Any]] = None, 184 ) -> str: 185 """ 186 Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity" 187 """ 188 189 def request_params( 190 self, 191 stream_state: Optional[Mapping[str, Any]], 192 stream_slice: Optional[Mapping[str, Any]] = None, 193 next_page_token: Optional[Mapping[str, Any]] = None, 194 ) -> MutableMapping[str, Any]: 195 """ 196 Override this method to define the query parameters that should be set on an outgoing HTTP request given the inputs. 197 198 E.g: you might want to define query parameters for paging if next_page_token is not None. 199 """ 200 return {} 201 202 def request_headers( 203 self, 204 stream_state: Optional[Mapping[str, Any]], 205 stream_slice: Optional[Mapping[str, Any]] = None, 206 next_page_token: Optional[Mapping[str, Any]] = None, 207 ) -> Mapping[str, Any]: 208 """ 209 Override to return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method. 210 """ 211 return {} 212 213 def request_body_data( 214 self, 215 stream_state: Optional[Mapping[str, Any]], 216 stream_slice: Optional[Mapping[str, Any]] = None, 217 next_page_token: Optional[Mapping[str, Any]] = None, 218 ) -> Optional[Union[Mapping[str, Any], str]]: 219 """ 220 Override when creating POST/PUT/PATCH requests to populate the body of the request with a non-JSON payload. 221 222 If returns a ready text that it will be sent as is. 223 If returns a dict that it will be converted to a urlencoded form. 224 E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2" 225 226 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 227 """ 228 return None 229 230 def request_body_json( 231 self, 232 stream_state: Optional[Mapping[str, Any]], 233 stream_slice: Optional[Mapping[str, Any]] = None, 234 next_page_token: Optional[Mapping[str, Any]] = None, 235 ) -> Optional[Mapping[str, Any]]: 236 """ 237 Override when creating POST/PUT/PATCH requests to populate the body of the request with a JSON payload. 238 239 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 240 """ 241 return None 242 243 def request_kwargs( 244 self, 245 stream_state: Optional[Mapping[str, Any]], 246 stream_slice: Optional[Mapping[str, Any]] = None, 247 next_page_token: Optional[Mapping[str, Any]] = None, 248 ) -> Mapping[str, Any]: 249 """ 250 Override to return a mapping of keyword arguments to be used when creating the HTTP request. 251 Any option listed in https://docs.python-requests.org/en/latest/api/#requests.adapters.BaseAdapter.send for can be returned from 252 this method. Note that these options do not conflict with request-level options such as headers, request params, etc.. 253 """ 254 return {} 255 256 @abstractmethod 257 def parse_response( 258 self, 259 response: requests.Response, 260 *, 261 stream_state: Mapping[str, Any], 262 stream_slice: Optional[Mapping[str, Any]] = None, 263 next_page_token: Optional[Mapping[str, Any]] = None, 264 ) -> Iterable[Mapping[str, Any]]: 265 """ 266 Parses the raw response object into a list of records. 267 By default, this returns an iterable containing the input. Override to parse differently. 268 :param response: 269 :param stream_state: 270 :param stream_slice: 271 :param next_page_token: 272 :return: An iterable containing the parsed response 273 """ 274 275 def get_backoff_strategy(self) -> Optional[Union[BackoffStrategy, List[BackoffStrategy]]]: 276 """ 277 Used to initialize Adapter to avoid breaking changes. 278 If Stream has a `backoff_time` method implementation, we know this stream uses old (pre-HTTPClient) backoff handlers and thus an adapter is needed. 279 280 Override to provide custom BackoffStrategy 281 :return Optional[BackoffStrategy]: 282 """ 283 if hasattr(self, "backoff_time"): 284 return HttpStreamAdapterBackoffStrategy(self) 285 else: 286 return None 287 288 def get_error_handler(self) -> Optional[ErrorHandler]: 289 """ 290 Used to initialize Adapter to avoid breaking changes. 291 If Stream has a `should_retry` method implementation, we know this stream uses old (pre-HTTPClient) error handlers and thus an adapter is needed. 292 293 Override to provide custom ErrorHandler 294 :return Optional[ErrorHandler]: 295 """ 296 if hasattr(self, "should_retry"): 297 error_handler = HttpStreamAdapterHttpStatusErrorHandler( 298 stream=self, 299 logger=logging.getLogger(), 300 max_retries=self.max_retries, 301 max_time=timedelta(seconds=self.max_time or 0), 302 ) 303 return error_handler 304 else: 305 return None 306 307 @classmethod 308 def _join_url(cls, url_base: str, path: str) -> str: 309 return urljoin(url_base, path) 310 311 @classmethod 312 def parse_response_error_message(cls, response: requests.Response) -> Optional[str]: 313 """ 314 Parses the raw response object from a failed request into a user-friendly error message. 315 By default, this method tries to grab the error message from JSON responses by following common API patterns. Override to parse differently. 316 317 :param response: 318 :return: A user-friendly message that indicates the cause of the error 319 """ 320 321 # default logic to grab error from common fields 322 def _try_get_error(value: Optional[JsonType]) -> Optional[str]: 323 if isinstance(value, str): 324 return value 325 elif isinstance(value, list): 326 errors_in_value = [_try_get_error(v) for v in value] 327 return ", ".join(v for v in errors_in_value if v is not None) 328 elif isinstance(value, dict): 329 new_value = ( 330 value.get("message") 331 or value.get("messages") 332 or value.get("error") 333 or value.get("errors") 334 or value.get("failures") 335 or value.get("failure") 336 or value.get("detail") 337 ) 338 return _try_get_error(new_value) 339 return None 340 341 try: 342 body = response.json() 343 return _try_get_error(body) 344 except requests.exceptions.JSONDecodeError: 345 return None 346 347 def get_error_display_message(self, exception: BaseException) -> Optional[str]: 348 """ 349 Retrieves the user-friendly display message that corresponds to an exception. 350 This will be called when encountering an exception while reading records from the stream, and used to build the AirbyteTraceMessage. 351 352 The default implementation of this method only handles HTTPErrors by passing the response to self.parse_response_error_message(). 353 The method should be overriden as needed to handle any additional exception types. 354 355 :param exception: The exception that was raised 356 :return: A user-friendly message that indicates the cause of the error 357 """ 358 if isinstance(exception, requests.HTTPError) and exception.response is not None: 359 return self.parse_response_error_message(exception.response) 360 return None 361 362 def read_records( 363 self, 364 sync_mode: SyncMode, 365 cursor_field: Optional[List[str]] = None, 366 stream_slice: Optional[Mapping[str, Any]] = None, 367 stream_state: Optional[Mapping[str, Any]] = None, 368 ) -> Iterable[StreamData]: 369 # A cursor_field indicates this is an incremental stream which offers better checkpointing than RFR enabled via the cursor 370 if self.cursor_field or not isinstance(self.get_cursor(), ResumableFullRefreshCursor): 371 yield from self._read_pages( 372 lambda req, res, state, _slice: self.parse_response( 373 res, stream_slice=_slice, stream_state=state 374 ), 375 stream_slice, 376 stream_state, 377 ) 378 else: 379 yield from self._read_single_page( 380 lambda req, res, state, _slice: self.parse_response( 381 res, stream_slice=_slice, stream_state=state 382 ), 383 stream_slice, 384 stream_state, 385 ) 386 387 @property 388 def state(self) -> MutableMapping[str, Any]: 389 cursor = self.get_cursor() 390 if cursor: 391 return cursor.get_stream_state() # type: ignore 392 return self._state 393 394 @state.setter 395 def state(self, value: MutableMapping[str, Any]) -> None: 396 cursor = self.get_cursor() 397 if cursor: 398 cursor.set_initial_state(value) 399 self._state = value 400 401 def get_cursor(self) -> Optional[Cursor]: 402 # I don't love that this is semi-stateful but not sure what else to do. We don't know exactly what type of cursor to 403 # instantiate when creating the class. We can make a few assumptions like if there is a cursor_field which implies 404 # incremental, but we don't know until runtime if this is a substream. Ideally, a stream should explicitly define 405 # its cursor, but because we're trying to automatically apply RFR we're stuck with this logic where we replace the 406 # cursor at runtime once we detect this is a substream based on self.has_multiple_slices being reassigned 407 if self.has_multiple_slices and isinstance(self.cursor, ResumableFullRefreshCursor): 408 self.cursor = SubstreamResumableFullRefreshCursor() 409 return self.cursor 410 else: 411 return self.cursor 412 413 def _read_pages( 414 self, 415 records_generator_fn: Callable[ 416 [ 417 requests.PreparedRequest, 418 requests.Response, 419 Mapping[str, Any], 420 Optional[Mapping[str, Any]], 421 ], 422 Iterable[StreamData], 423 ], 424 stream_slice: Optional[Mapping[str, Any]] = None, 425 stream_state: Optional[Mapping[str, Any]] = None, 426 ) -> Iterable[StreamData]: 427 stream_state = stream_state or {} 428 pagination_complete = False 429 next_page_token = None 430 while not pagination_complete: 431 request, response = self._fetch_next_page(stream_slice, stream_state, next_page_token) 432 yield from records_generator_fn(request, response, stream_state, stream_slice) 433 434 next_page_token = self.next_page_token(response) 435 if not next_page_token: 436 pagination_complete = True 437 438 cursor = self.get_cursor() 439 if cursor and isinstance(cursor, SubstreamResumableFullRefreshCursor): 440 partition, _, _ = self._extract_slice_fields(stream_slice=stream_slice) 441 # Substreams checkpoint state by marking an entire parent partition as completed so that on the subsequent attempt 442 # after a failure, completed parents are skipped and the sync can make progress 443 cursor.close_slice(StreamSlice(cursor_slice={}, partition=partition)) 444 445 # Always return an empty generator just in case no records were ever yielded 446 yield from [] 447 448 def _read_single_page( 449 self, 450 records_generator_fn: Callable[ 451 [ 452 requests.PreparedRequest, 453 requests.Response, 454 Mapping[str, Any], 455 Optional[Mapping[str, Any]], 456 ], 457 Iterable[StreamData], 458 ], 459 stream_slice: Optional[Mapping[str, Any]] = None, 460 stream_state: Optional[Mapping[str, Any]] = None, 461 ) -> Iterable[StreamData]: 462 partition, cursor_slice, remaining_slice = self._extract_slice_fields( 463 stream_slice=stream_slice 464 ) 465 stream_state = stream_state or {} 466 next_page_token = cursor_slice or None 467 468 request, response = self._fetch_next_page(remaining_slice, stream_state, next_page_token) 469 yield from records_generator_fn(request, response, stream_state, remaining_slice) 470 471 next_page_token = self.next_page_token(response) or { 472 "__ab_full_refresh_sync_complete": True 473 } 474 475 cursor = self.get_cursor() 476 if cursor: 477 cursor.close_slice(StreamSlice(cursor_slice=next_page_token, partition=partition)) 478 479 # Always return an empty generator just in case no records were ever yielded 480 yield from [] 481 482 @staticmethod 483 def _extract_slice_fields( 484 stream_slice: Optional[Mapping[str, Any]], 485 ) -> tuple[Mapping[str, Any], Mapping[str, Any], Mapping[str, Any]]: 486 if not stream_slice: 487 return {}, {}, {} 488 489 if isinstance(stream_slice, StreamSlice): 490 partition = stream_slice.partition 491 cursor_slice = stream_slice.cursor_slice 492 remaining = {k: v for k, v in stream_slice.items()} 493 else: 494 # RFR streams that implement stream_slices() to generate stream slices in the legacy mapping format are converted into a 495 # structured stream slice mapping by the LegacyCursorBasedCheckpointReader. The structured mapping object has separate 496 # fields for the partition and cursor_slice value 497 partition = stream_slice.get("partition", {}) 498 cursor_slice = stream_slice.get("cursor_slice", {}) 499 remaining = { 500 key: val 501 for key, val in stream_slice.items() 502 if key != "partition" and key != "cursor_slice" 503 } 504 return partition, cursor_slice, remaining 505 506 def _fetch_next_page( 507 self, 508 stream_slice: Optional[Mapping[str, Any]] = None, 509 stream_state: Optional[Mapping[str, Any]] = None, 510 next_page_token: Optional[Mapping[str, Any]] = None, 511 ) -> Tuple[requests.PreparedRequest, requests.Response]: 512 request, response = self._http_client.send_request( 513 http_method=self.http_method, 514 url=self._join_url( 515 self.url_base, 516 self.path( 517 stream_state=stream_state, 518 stream_slice=stream_slice, 519 next_page_token=next_page_token, 520 ), 521 ), 522 request_kwargs=self.request_kwargs( 523 stream_state=stream_state, 524 stream_slice=stream_slice, 525 next_page_token=next_page_token, 526 ), 527 headers=self.request_headers( 528 stream_state=stream_state, 529 stream_slice=stream_slice, 530 next_page_token=next_page_token, 531 ), 532 params=self.request_params( 533 stream_state=stream_state, 534 stream_slice=stream_slice, 535 next_page_token=next_page_token, 536 ), 537 json=self.request_body_json( 538 stream_state=stream_state, 539 stream_slice=stream_slice, 540 next_page_token=next_page_token, 541 ), 542 data=self.request_body_data( 543 stream_state=stream_state, 544 stream_slice=stream_slice, 545 next_page_token=next_page_token, 546 ), 547 dedupe_query_params=True, 548 log_formatter=self.get_log_formatter(), 549 exit_on_rate_limit=self.exit_on_rate_limit, 550 ) 551 552 return request, response 553 554 def get_log_formatter(self) -> Optional[Callable[[requests.Response], Any]]: 555 """ 556 557 :return Optional[Callable[[requests.Response], Any]]: Function that will be used in logging inside HttpClient 558 """ 559 return None
Base abstract class for an Airbyte Stream using the HTTP protocol. Basic building block for users building an Airbyte source for a HTTP API.
82 @property 83 def exit_on_rate_limit(self) -> bool: 84 """ 85 :return: False if the stream will retry endlessly when rate limited 86 """ 87 return self._exit_on_rate_limit
Returns
False if the stream will retry endlessly when rate limited
93 @property 94 def cache_filename(self) -> str: 95 """ 96 Override if needed. Return the name of cache file 97 Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only. 98 """ 99 return f"{self.name}.sqlite"
Override if needed. Return the name of cache file Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.
101 @property 102 def use_cache(self) -> bool: 103 """ 104 Override if needed. If True, all records will be cached. 105 Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only. 106 """ 107 return False
Override if needed. If True, all records will be cached. Note that if the environment variable REQUEST_CACHE_PATH is not set, the cache will be in-memory only.
109 @property 110 @abstractmethod 111 def url_base(self) -> str: 112 """ 113 :return: URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/" 114 """
Returns
URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"
116 @property 117 def http_method(self) -> str: 118 """ 119 Override if needed. See get_request_data/get_request_json if using POST/PUT/PATCH. 120 """ 121 return "GET"
Override if needed. See get_request_data/get_request_json if using POST/PUT/PATCH.
123 @property 124 @deprecated( 125 "Deprecated as of CDK version 3.0.0. " 126 "You should set error_handler explicitly in HttpStream.get_error_handler() instead." 127 ) 128 def raise_on_http_errors(self) -> bool: 129 """ 130 Override if needed. If set to False, allows opting-out of raising HTTP code exception. 131 """ 132 return True
Override if needed. If set to False, allows opting-out of raising HTTP code exception.
134 @property 135 @deprecated( 136 "Deprecated as of CDK version 3.0.0. " 137 "You should set backoff_strategies explicitly in HttpStream.get_backoff_strategy() instead." 138 ) 139 def max_retries(self) -> Union[int, None]: 140 """ 141 Override if needed. Specifies maximum amount of retries for backoff policy. Return None for no limit. 142 """ 143 return 5
Override if needed. Specifies maximum amount of retries for backoff policy. Return None for no limit.
145 @property 146 @deprecated( 147 "Deprecated as of CDK version 3.0.0. " 148 "You should set backoff_strategies explicitly in HttpStream.get_backoff_strategy() instead." 149 ) 150 def max_time(self) -> Union[int, None]: 151 """ 152 Override if needed. Specifies maximum total waiting time (in seconds) for backoff policy. Return None for no limit. 153 """ 154 return 60 * 10
Override if needed. Specifies maximum total waiting time (in seconds) for backoff policy. Return None for no limit.
156 @property 157 @deprecated( 158 "Deprecated as of CDK version 3.0.0. " 159 "You should set backoff_strategies explicitly in HttpStream.get_backoff_strategy() instead." 160 ) 161 def retry_factor(self) -> float: 162 """ 163 Override if needed. Specifies factor for backoff policy. 164 """ 165 return 5
Override if needed. Specifies factor for backoff policy.
167 @abstractmethod 168 def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: 169 """ 170 Override this method to define a pagination strategy. 171 172 The value returned from this method is passed to most other methods in this class. Use it to form a request e.g: set headers or query params. 173 174 :return: The token for the next page from the input response object. Returning None means there are no more pages to read in this response. 175 """
Override this method to define a pagination strategy.
The value returned from this method is passed to most other methods in this class. Use it to form a request e.g: set headers or query params.
Returns
The token for the next page from the input response object. Returning None means there are no more pages to read in this response.
177 @abstractmethod 178 def path( 179 self, 180 *, 181 stream_state: Optional[Mapping[str, Any]] = None, 182 stream_slice: Optional[Mapping[str, Any]] = None, 183 next_page_token: Optional[Mapping[str, Any]] = None, 184 ) -> str: 185 """ 186 Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity" 187 """
Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity"
189 def request_params( 190 self, 191 stream_state: Optional[Mapping[str, Any]], 192 stream_slice: Optional[Mapping[str, Any]] = None, 193 next_page_token: Optional[Mapping[str, Any]] = None, 194 ) -> MutableMapping[str, Any]: 195 """ 196 Override this method to define the query parameters that should be set on an outgoing HTTP request given the inputs. 197 198 E.g: you might want to define query parameters for paging if next_page_token is not None. 199 """ 200 return {}
Override this method to define the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
202 def request_headers( 203 self, 204 stream_state: Optional[Mapping[str, Any]], 205 stream_slice: Optional[Mapping[str, Any]] = None, 206 next_page_token: Optional[Mapping[str, Any]] = None, 207 ) -> Mapping[str, Any]: 208 """ 209 Override to return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method. 210 """ 211 return {}
Override to return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
213 def request_body_data( 214 self, 215 stream_state: Optional[Mapping[str, Any]], 216 stream_slice: Optional[Mapping[str, Any]] = None, 217 next_page_token: Optional[Mapping[str, Any]] = None, 218 ) -> Optional[Union[Mapping[str, Any], str]]: 219 """ 220 Override when creating POST/PUT/PATCH requests to populate the body of the request with a non-JSON payload. 221 222 If returns a ready text that it will be sent as is. 223 If returns a dict that it will be converted to a urlencoded form. 224 E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2" 225 226 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 227 """ 228 return None
Override when creating POST/PUT/PATCH requests to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
230 def request_body_json( 231 self, 232 stream_state: Optional[Mapping[str, Any]], 233 stream_slice: Optional[Mapping[str, Any]] = None, 234 next_page_token: Optional[Mapping[str, Any]] = None, 235 ) -> Optional[Mapping[str, Any]]: 236 """ 237 Override when creating POST/PUT/PATCH requests to populate the body of the request with a JSON payload. 238 239 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 240 """ 241 return None
Override when creating POST/PUT/PATCH requests to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
243 def request_kwargs( 244 self, 245 stream_state: Optional[Mapping[str, Any]], 246 stream_slice: Optional[Mapping[str, Any]] = None, 247 next_page_token: Optional[Mapping[str, Any]] = None, 248 ) -> Mapping[str, Any]: 249 """ 250 Override to return a mapping of keyword arguments to be used when creating the HTTP request. 251 Any option listed in https://docs.python-requests.org/en/latest/api/#requests.adapters.BaseAdapter.send for can be returned from 252 this method. Note that these options do not conflict with request-level options such as headers, request params, etc.. 253 """ 254 return {}
Override to return a mapping of keyword arguments to be used when creating the HTTP request. Any option listed in https://docs.python-requests.org/en/latest/api/#requests.adapters.BaseAdapter.send for can be returned from this method. Note that these options do not conflict with request-level options such as headers, request params, etc..
256 @abstractmethod 257 def parse_response( 258 self, 259 response: requests.Response, 260 *, 261 stream_state: Mapping[str, Any], 262 stream_slice: Optional[Mapping[str, Any]] = None, 263 next_page_token: Optional[Mapping[str, Any]] = None, 264 ) -> Iterable[Mapping[str, Any]]: 265 """ 266 Parses the raw response object into a list of records. 267 By default, this returns an iterable containing the input. Override to parse differently. 268 :param response: 269 :param stream_state: 270 :param stream_slice: 271 :param next_page_token: 272 :return: An iterable containing the parsed response 273 """
Parses the raw response object into a list of records. By default, this returns an iterable containing the input. Override to parse differently.
Parameters
- response:
- stream_state:
- stream_slice:
- next_page_token:
Returns
An iterable containing the parsed response
275 def get_backoff_strategy(self) -> Optional[Union[BackoffStrategy, List[BackoffStrategy]]]: 276 """ 277 Used to initialize Adapter to avoid breaking changes. 278 If Stream has a `backoff_time` method implementation, we know this stream uses old (pre-HTTPClient) backoff handlers and thus an adapter is needed. 279 280 Override to provide custom BackoffStrategy 281 :return Optional[BackoffStrategy]: 282 """ 283 if hasattr(self, "backoff_time"): 284 return HttpStreamAdapterBackoffStrategy(self) 285 else: 286 return None
Used to initialize Adapter to avoid breaking changes.
If Stream has a backoff_time method implementation, we know this stream uses old (pre-HTTPClient) backoff handlers and thus an adapter is needed.
Override to provide custom BackoffStrategy
Returns
288 def get_error_handler(self) -> Optional[ErrorHandler]: 289 """ 290 Used to initialize Adapter to avoid breaking changes. 291 If Stream has a `should_retry` method implementation, we know this stream uses old (pre-HTTPClient) error handlers and thus an adapter is needed. 292 293 Override to provide custom ErrorHandler 294 :return Optional[ErrorHandler]: 295 """ 296 if hasattr(self, "should_retry"): 297 error_handler = HttpStreamAdapterHttpStatusErrorHandler( 298 stream=self, 299 logger=logging.getLogger(), 300 max_retries=self.max_retries, 301 max_time=timedelta(seconds=self.max_time or 0), 302 ) 303 return error_handler 304 else: 305 return None
Used to initialize Adapter to avoid breaking changes.
If Stream has a should_retry method implementation, we know this stream uses old (pre-HTTPClient) error handlers and thus an adapter is needed.
Override to provide custom ErrorHandler
Returns
311 @classmethod 312 def parse_response_error_message(cls, response: requests.Response) -> Optional[str]: 313 """ 314 Parses the raw response object from a failed request into a user-friendly error message. 315 By default, this method tries to grab the error message from JSON responses by following common API patterns. Override to parse differently. 316 317 :param response: 318 :return: A user-friendly message that indicates the cause of the error 319 """ 320 321 # default logic to grab error from common fields 322 def _try_get_error(value: Optional[JsonType]) -> Optional[str]: 323 if isinstance(value, str): 324 return value 325 elif isinstance(value, list): 326 errors_in_value = [_try_get_error(v) for v in value] 327 return ", ".join(v for v in errors_in_value if v is not None) 328 elif isinstance(value, dict): 329 new_value = ( 330 value.get("message") 331 or value.get("messages") 332 or value.get("error") 333 or value.get("errors") 334 or value.get("failures") 335 or value.get("failure") 336 or value.get("detail") 337 ) 338 return _try_get_error(new_value) 339 return None 340 341 try: 342 body = response.json() 343 return _try_get_error(body) 344 except requests.exceptions.JSONDecodeError: 345 return None
Parses the raw response object from a failed request into a user-friendly error message. By default, this method tries to grab the error message from JSON responses by following common API patterns. Override to parse differently.
Parameters
- response:
Returns
A user-friendly message that indicates the cause of the error
347 def get_error_display_message(self, exception: BaseException) -> Optional[str]: 348 """ 349 Retrieves the user-friendly display message that corresponds to an exception. 350 This will be called when encountering an exception while reading records from the stream, and used to build the AirbyteTraceMessage. 351 352 The default implementation of this method only handles HTTPErrors by passing the response to self.parse_response_error_message(). 353 The method should be overriden as needed to handle any additional exception types. 354 355 :param exception: The exception that was raised 356 :return: A user-friendly message that indicates the cause of the error 357 """ 358 if isinstance(exception, requests.HTTPError) and exception.response is not None: 359 return self.parse_response_error_message(exception.response) 360 return None
Retrieves the user-friendly display message that corresponds to an exception. This will be called when encountering an exception while reading records from the stream, and used to build the AirbyteTraceMessage.
The default implementation of this method only handles HTTPErrors by passing the response to self.parse_response_error_message(). The method should be overriden as needed to handle any additional exception types.
Parameters
- exception: The exception that was raised
Returns
A user-friendly message that indicates the cause of the error
362 def read_records( 363 self, 364 sync_mode: SyncMode, 365 cursor_field: Optional[List[str]] = None, 366 stream_slice: Optional[Mapping[str, Any]] = None, 367 stream_state: Optional[Mapping[str, Any]] = None, 368 ) -> Iterable[StreamData]: 369 # A cursor_field indicates this is an incremental stream which offers better checkpointing than RFR enabled via the cursor 370 if self.cursor_field or not isinstance(self.get_cursor(), ResumableFullRefreshCursor): 371 yield from self._read_pages( 372 lambda req, res, state, _slice: self.parse_response( 373 res, stream_slice=_slice, stream_state=state 374 ), 375 stream_slice, 376 stream_state, 377 ) 378 else: 379 yield from self._read_single_page( 380 lambda req, res, state, _slice: self.parse_response( 381 res, stream_slice=_slice, stream_state=state 382 ), 383 stream_slice, 384 stream_state, 385 )
This method should be overridden by subclasses to read records based on the inputs
387 @property 388 def state(self) -> MutableMapping[str, Any]: 389 cursor = self.get_cursor() 390 if cursor: 391 return cursor.get_stream_state() # type: ignore 392 return self._state
State getter, should return state in form that can serialized to a string and send to the output as a STATE AirbyteMessage.
A good example of a state is a cursor_value: { self.cursor_field: "cursor_value" }
State should try to be as small as possible but at the same time descriptive enough to restore syncing process from the point where it stopped.
401 def get_cursor(self) -> Optional[Cursor]: 402 # I don't love that this is semi-stateful but not sure what else to do. We don't know exactly what type of cursor to 403 # instantiate when creating the class. We can make a few assumptions like if there is a cursor_field which implies 404 # incremental, but we don't know until runtime if this is a substream. Ideally, a stream should explicitly define 405 # its cursor, but because we're trying to automatically apply RFR we're stuck with this logic where we replace the 406 # cursor at runtime once we detect this is a substream based on self.has_multiple_slices being reassigned 407 if self.has_multiple_slices and isinstance(self.cursor, ResumableFullRefreshCursor): 408 self.cursor = SubstreamResumableFullRefreshCursor() 409 return self.cursor 410 else: 411 return self.cursor
A Cursor is an interface that a stream can implement to manage how its internal state is read and updated while reading records. Historically, Python connectors had no concept of a cursor to manage state. Python streams need to define a cursor implementation and override this method to manage state through a Cursor.
554 def get_log_formatter(self) -> Optional[Callable[[requests.Response], Any]]: 555 """ 556 557 :return Optional[Callable[[requests.Response], Any]]: Function that will be used in logging inside HttpClient 558 """ 559 return None
Returns
Function that will be used in logging inside HttpClient
Inherited Members
- airbyte_cdk.sources.streams.core.Stream
- logger
- transformer
- cursor
- has_multiple_slices
- name
- read
- read_only_records
- get_json_schema
- as_airbyte_stream
- supports_incremental
- is_resumable
- cursor_field
- namespace
- primary_key
- stream_slices
- state_checkpoint_interval
- get_updated_state
- log_stream_sync_configuration
- configured_json_schema
562class HttpSubStream(HttpStream, ABC): 563 def __init__(self, parent: HttpStream, **kwargs: Any): 564 """ 565 :param parent: should be the instance of HttpStream class 566 """ 567 super().__init__(**kwargs) 568 self.parent = parent 569 self.has_multiple_slices = ( 570 True # Substreams are based on parent records which implies there are multiple slices 571 ) 572 573 # There are three conditions that dictate if RFR should automatically be applied to a stream 574 # 1. Streams that explicitly initialize their own cursor should defer to it and not automatically apply RFR 575 # 2. Streams with at least one cursor_field are incremental and thus a superior sync to RFR. 576 # 3. Streams overriding read_records() do not guarantee that they will call the parent implementation which can perform 577 # per-page checkpointing so RFR is only supported if a stream use the default `HttpStream.read_records()` method 578 if ( 579 not self.cursor 580 and len(self.cursor_field) == 0 581 and type(self).read_records is HttpStream.read_records 582 ): 583 self.cursor = SubstreamResumableFullRefreshCursor() 584 585 def stream_slices( 586 self, 587 sync_mode: SyncMode, 588 cursor_field: Optional[List[str]] = None, 589 stream_state: Optional[Mapping[str, Any]] = None, 590 ) -> Iterable[Optional[Mapping[str, Any]]]: 591 # read_stateless() assumes the parent is not concurrent. This is currently okay since the concurrent CDK does 592 # not support either substreams or RFR, but something that needs to be considered once we do 593 for parent_record in self.parent.read_only_records(stream_state): 594 # Skip non-records (eg AirbyteLogMessage) 595 if isinstance(parent_record, AirbyteMessage): 596 if parent_record.type == MessageType.RECORD: 597 parent_record = parent_record.record.data # type: ignore [assignment, union-attr] # Incorrect type for assignment 598 else: 599 continue 600 elif isinstance(parent_record, Record): 601 parent_record = parent_record.data 602 yield {"parent": parent_record}
Base abstract class for an Airbyte Stream using the HTTP protocol. Basic building block for users building an Airbyte source for a HTTP API.
563 def __init__(self, parent: HttpStream, **kwargs: Any): 564 """ 565 :param parent: should be the instance of HttpStream class 566 """ 567 super().__init__(**kwargs) 568 self.parent = parent 569 self.has_multiple_slices = ( 570 True # Substreams are based on parent records which implies there are multiple slices 571 ) 572 573 # There are three conditions that dictate if RFR should automatically be applied to a stream 574 # 1. Streams that explicitly initialize their own cursor should defer to it and not automatically apply RFR 575 # 2. Streams with at least one cursor_field are incremental and thus a superior sync to RFR. 576 # 3. Streams overriding read_records() do not guarantee that they will call the parent implementation which can perform 577 # per-page checkpointing so RFR is only supported if a stream use the default `HttpStream.read_records()` method 578 if ( 579 not self.cursor 580 and len(self.cursor_field) == 0 581 and type(self).read_records is HttpStream.read_records 582 ): 583 self.cursor = SubstreamResumableFullRefreshCursor()
Parameters
- parent: should be the instance of HttpStream class
585 def stream_slices( 586 self, 587 sync_mode: SyncMode, 588 cursor_field: Optional[List[str]] = None, 589 stream_state: Optional[Mapping[str, Any]] = None, 590 ) -> Iterable[Optional[Mapping[str, Any]]]: 591 # read_stateless() assumes the parent is not concurrent. This is currently okay since the concurrent CDK does 592 # not support either substreams or RFR, but something that needs to be considered once we do 593 for parent_record in self.parent.read_only_records(stream_state): 594 # Skip non-records (eg AirbyteLogMessage) 595 if isinstance(parent_record, AirbyteMessage): 596 if parent_record.type == MessageType.RECORD: 597 parent_record = parent_record.record.data # type: ignore [assignment, union-attr] # Incorrect type for assignment 598 else: 599 continue 600 elif isinstance(parent_record, Record): 601 parent_record = parent_record.data 602 yield {"parent": parent_record}
Override to define the slices for this stream. See the stream slicing section of the docs for more information.
Parameters
- sync_mode:
- cursor_field:
- stream_state:
Returns
Inherited Members
- HttpStream
- source_defined_cursor
- page_size
- exit_on_rate_limit
- cache_filename
- use_cache
- url_base
- http_method
- raise_on_http_errors
- max_retries
- max_time
- retry_factor
- next_page_token
- path
- request_params
- request_headers
- request_body_data
- request_body_json
- request_kwargs
- parse_response
- get_backoff_strategy
- get_error_handler
- parse_response_error_message
- get_error_display_message
- read_records
- state
- get_cursor
- get_log_formatter
40class UserDefinedBackoffException(BaseBackoffException): 41 """ 42 An exception that exposes how long it attempted to backoff 43 """ 44 45 def __init__( 46 self, 47 backoff: Union[int, float], 48 request: requests.PreparedRequest, 49 response: Optional[Union[requests.Response, Exception]], 50 error_message: str = "", 51 failure_type: Optional[FailureType] = None, 52 ): 53 """ 54 :param backoff: how long to backoff in seconds 55 :param request: the request that triggered this backoff exception 56 :param response: the response that triggered the backoff exception 57 """ 58 self.backoff = backoff 59 super().__init__( 60 request=request, 61 response=response, 62 error_message=error_message, 63 failure_type=failure_type, 64 )
An exception that exposes how long it attempted to backoff
45 def __init__( 46 self, 47 backoff: Union[int, float], 48 request: requests.PreparedRequest, 49 response: Optional[Union[requests.Response, Exception]], 50 error_message: str = "", 51 failure_type: Optional[FailureType] = None, 52 ): 53 """ 54 :param backoff: how long to backoff in seconds 55 :param request: the request that triggered this backoff exception 56 :param response: the response that triggered the backoff exception 57 """ 58 self.backoff = backoff 59 super().__init__( 60 request=request, 61 response=response, 62 error_message=error_message, 63 failure_type=failure_type, 64 )
Parameters
- backoff: how long to backoff in seconds
- request: the request that triggered this backoff exception
- response: the response that triggered the backoff exception