airbyte_cdk.sources.streams.http.error_handlers
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from .backoff_strategy import BackoffStrategy 6from .default_backoff_strategy import DefaultBackoffStrategy 7from .error_handler import ErrorHandler 8from .error_message_parser import ErrorMessageParser 9from .http_status_error_handler import HttpStatusErrorHandler 10from .json_error_message_parser import JsonErrorMessageParser 11from .response_models import ErrorResolution, ResponseAction 12 13__all__ = [ 14 "BackoffStrategy", 15 "DefaultBackoffStrategy", 16 "ErrorHandler", 17 "ErrorMessageParser", 18 "HttpStatusErrorHandler", 19 "JsonErrorMessageParser", 20 "ResponseAction", 21 "ErrorResolution", 22]
12class BackoffStrategy(ABC): 13 @abstractmethod 14 def backoff_time( 15 self, 16 response_or_exception: Optional[Union[requests.Response, requests.RequestException]], 17 attempt_count: int, 18 ) -> Optional[float]: 19 """ 20 Override this method to dynamically determine backoff time e.g: by reading the X-Retry-After header. 21 22 This method is called only if should_backoff() returns True for the input request. 23 24 :param response_or_exception: The response or exception that caused the backoff. 25 :param attempt_count: The number of attempts already performed for this request. 26 :return how long to backoff in seconds. The return value may be a floating point number for subsecond precision. Returning None defers backoff 27 to the default backoff behavior (e.g using an exponential algorithm). 28 """ 29 pass
Helper class that provides a standard way to create an ABC using inheritance.
13 @abstractmethod 14 def backoff_time( 15 self, 16 response_or_exception: Optional[Union[requests.Response, requests.RequestException]], 17 attempt_count: int, 18 ) -> Optional[float]: 19 """ 20 Override this method to dynamically determine backoff time e.g: by reading the X-Retry-After header. 21 22 This method is called only if should_backoff() returns True for the input request. 23 24 :param response_or_exception: The response or exception that caused the backoff. 25 :param attempt_count: The number of attempts already performed for this request. 26 :return how long to backoff in seconds. The return value may be a floating point number for subsecond precision. Returning None defers backoff 27 to the default backoff behavior (e.g using an exponential algorithm). 28 """ 29 pass
Override this method to dynamically determine backoff time e.g: by reading the X-Retry-After header.
This method is called only if should_backoff() returns True for the input request.
Parameters
- response_or_exception: The response or exception that caused the backoff.
- attempt_count: The number of attempts already performed for this request. :return how long to backoff in seconds. The return value may be a floating point number for subsecond precision. Returning None defers backoff to the default backoff behavior (e.g using an exponential algorithm).
12class DefaultBackoffStrategy(BackoffStrategy): 13 def backoff_time( 14 self, 15 response_or_exception: Optional[Union[requests.Response, requests.RequestException]], 16 attempt_count: int, 17 ) -> Optional[float]: 18 return None
Helper class that provides a standard way to create an ABC using inheritance.
13 def backoff_time( 14 self, 15 response_or_exception: Optional[Union[requests.Response, requests.RequestException]], 16 attempt_count: int, 17 ) -> Optional[float]: 18 return None
Override this method to dynamically determine backoff time e.g: by reading the X-Retry-After header.
This method is called only if should_backoff() returns True for the input request.
Parameters
- response_or_exception: The response or exception that caused the backoff.
- attempt_count: The number of attempts already performed for this request. :return how long to backoff in seconds. The return value may be a floating point number for subsecond precision. Returning None defers backoff to the default backoff behavior (e.g using an exponential algorithm).
12class ErrorHandler(ABC): 13 """ 14 Abstract base class to determine how to handle a failed HTTP request. 15 """ 16 17 @property 18 @abstractmethod 19 def max_retries(self) -> Optional[int]: 20 """ 21 The maximum number of retries to attempt before giving up. 22 """ 23 pass 24 25 @property 26 @abstractmethod 27 def max_time(self) -> Optional[int]: 28 """ 29 The maximum amount of time in seconds to retry before giving up. 30 """ 31 pass 32 33 @abstractmethod 34 def interpret_response( 35 self, response: Optional[Union[requests.Response, Exception]] 36 ) -> ErrorResolution: 37 """ 38 Interpret the response or exception and return the corresponding response action, failure type, and error message. 39 40 :param response: The HTTP response object or exception raised during the request. 41 :return: A tuple containing the response action, failure type, and error message. 42 """ 43 pass
Abstract base class to determine how to handle a failed HTTP request.
17 @property 18 @abstractmethod 19 def max_retries(self) -> Optional[int]: 20 """ 21 The maximum number of retries to attempt before giving up. 22 """ 23 pass
The maximum number of retries to attempt before giving up.
25 @property 26 @abstractmethod 27 def max_time(self) -> Optional[int]: 28 """ 29 The maximum amount of time in seconds to retry before giving up. 30 """ 31 pass
The maximum amount of time in seconds to retry before giving up.
33 @abstractmethod 34 def interpret_response( 35 self, response: Optional[Union[requests.Response, Exception]] 36 ) -> ErrorResolution: 37 """ 38 Interpret the response or exception and return the corresponding response action, failure type, and error message. 39 40 :param response: The HTTP response object or exception raised during the request. 41 :return: A tuple containing the response action, failure type, and error message. 42 """ 43 pass
Interpret the response or exception and return the corresponding response action, failure type, and error message.
Parameters
- response: The HTTP response object or exception raised during the request.
Returns
A tuple containing the response action, failure type, and error message.
12class ErrorMessageParser(ABC): 13 @abstractmethod 14 def parse_response_error_message(self, response: requests.Response) -> Optional[str]: 15 """ 16 Parse error message from response. 17 :param response: response received for the request 18 :return: error message 19 """ 20 pass
Helper class that provides a standard way to create an ABC using inheritance.
13 @abstractmethod 14 def parse_response_error_message(self, response: requests.Response) -> Optional[str]: 15 """ 16 Parse error message from response. 17 :param response: response received for the request 18 :return: error message 19 """ 20 pass
Parse error message from response.
Parameters
- response: response received for the request
Returns
error message
23class HttpStatusErrorHandler(ErrorHandler): 24 def __init__( 25 self, 26 logger: logging.Logger, 27 error_mapping: Optional[Mapping[Union[int, str, type[Exception]], ErrorResolution]] = None, 28 max_retries: int = 5, 29 max_time: timedelta = timedelta(seconds=600), 30 ) -> None: 31 """ 32 Initialize the HttpStatusErrorHandler. 33 34 :param error_mapping: Custom error mappings to extend or override the default mappings. 35 """ 36 self._logger = logger 37 self._error_mapping = error_mapping or DEFAULT_ERROR_MAPPING 38 self._max_retries = max_retries 39 self._max_time = int(max_time.total_seconds()) 40 41 @property 42 def max_retries(self) -> Optional[int]: 43 return self._max_retries 44 45 @property 46 def max_time(self) -> Optional[int]: 47 return self._max_time 48 49 def interpret_response( 50 self, response_or_exception: Optional[Union[requests.Response, Exception]] = None 51 ) -> ErrorResolution: 52 """ 53 Interpret the response and return the corresponding response action, failure type, and error message. 54 55 :param response: The HTTP response object. 56 :return: A tuple containing the response action, failure type, and error message. 57 """ 58 59 if isinstance(response_or_exception, Exception): 60 mapped_error: Optional[ErrorResolution] = self._error_mapping.get( 61 response_or_exception.__class__ 62 ) 63 64 if mapped_error is not None: 65 return mapped_error 66 else: 67 self._logger.error( 68 f"Unexpected exception in error handler: {response_or_exception}" 69 ) 70 return ErrorResolution( 71 response_action=ResponseAction.RETRY, 72 failure_type=FailureType.system_error, 73 error_message=f"Unexpected exception in error handler: {response_or_exception}", 74 ) 75 76 elif isinstance(response_or_exception, requests.Response): 77 if response_or_exception.status_code is None: 78 self._logger.error("Response does not include an HTTP status code.") 79 return ErrorResolution( 80 response_action=ResponseAction.RETRY, 81 failure_type=FailureType.transient_error, 82 error_message="Response does not include an HTTP status code.", 83 ) 84 85 if response_or_exception.ok: 86 return ErrorResolution( 87 response_action=ResponseAction.SUCCESS, 88 failure_type=None, 89 error_message=None, 90 ) 91 92 error_key = response_or_exception.status_code 93 94 mapped_error = self._error_mapping.get(error_key) 95 96 if mapped_error is not None: 97 return mapped_error 98 else: 99 self._logger.warning(f"Unexpected HTTP Status Code in error handler: '{error_key}'") 100 return ErrorResolution( 101 response_action=ResponseAction.RETRY, 102 failure_type=FailureType.system_error, 103 error_message=f"Unexpected HTTP Status Code in error handler: {error_key}", 104 ) 105 else: 106 self._logger.error(f"Received unexpected response type: {type(response_or_exception)}") 107 return ErrorResolution( 108 response_action=ResponseAction.FAIL, 109 failure_type=FailureType.system_error, 110 error_message=f"Received unexpected response type: {type(response_or_exception)}", 111 )
Abstract base class to determine how to handle a failed HTTP request.
24 def __init__( 25 self, 26 logger: logging.Logger, 27 error_mapping: Optional[Mapping[Union[int, str, type[Exception]], ErrorResolution]] = None, 28 max_retries: int = 5, 29 max_time: timedelta = timedelta(seconds=600), 30 ) -> None: 31 """ 32 Initialize the HttpStatusErrorHandler. 33 34 :param error_mapping: Custom error mappings to extend or override the default mappings. 35 """ 36 self._logger = logger 37 self._error_mapping = error_mapping or DEFAULT_ERROR_MAPPING 38 self._max_retries = max_retries 39 self._max_time = int(max_time.total_seconds())
Initialize the HttpStatusErrorHandler.
Parameters
- error_mapping: Custom error mappings to extend or override the default mappings.
49 def interpret_response( 50 self, response_or_exception: Optional[Union[requests.Response, Exception]] = None 51 ) -> ErrorResolution: 52 """ 53 Interpret the response and return the corresponding response action, failure type, and error message. 54 55 :param response: The HTTP response object. 56 :return: A tuple containing the response action, failure type, and error message. 57 """ 58 59 if isinstance(response_or_exception, Exception): 60 mapped_error: Optional[ErrorResolution] = self._error_mapping.get( 61 response_or_exception.__class__ 62 ) 63 64 if mapped_error is not None: 65 return mapped_error 66 else: 67 self._logger.error( 68 f"Unexpected exception in error handler: {response_or_exception}" 69 ) 70 return ErrorResolution( 71 response_action=ResponseAction.RETRY, 72 failure_type=FailureType.system_error, 73 error_message=f"Unexpected exception in error handler: {response_or_exception}", 74 ) 75 76 elif isinstance(response_or_exception, requests.Response): 77 if response_or_exception.status_code is None: 78 self._logger.error("Response does not include an HTTP status code.") 79 return ErrorResolution( 80 response_action=ResponseAction.RETRY, 81 failure_type=FailureType.transient_error, 82 error_message="Response does not include an HTTP status code.", 83 ) 84 85 if response_or_exception.ok: 86 return ErrorResolution( 87 response_action=ResponseAction.SUCCESS, 88 failure_type=None, 89 error_message=None, 90 ) 91 92 error_key = response_or_exception.status_code 93 94 mapped_error = self._error_mapping.get(error_key) 95 96 if mapped_error is not None: 97 return mapped_error 98 else: 99 self._logger.warning(f"Unexpected HTTP Status Code in error handler: '{error_key}'") 100 return ErrorResolution( 101 response_action=ResponseAction.RETRY, 102 failure_type=FailureType.system_error, 103 error_message=f"Unexpected HTTP Status Code in error handler: {error_key}", 104 ) 105 else: 106 self._logger.error(f"Received unexpected response type: {type(response_or_exception)}") 107 return ErrorResolution( 108 response_action=ResponseAction.FAIL, 109 failure_type=FailureType.system_error, 110 error_message=f"Received unexpected response type: {type(response_or_exception)}", 111 )
Interpret the response and return the corresponding response action, failure type, and error message.
Parameters
- response: The HTTP response object.
Returns
A tuple containing the response action, failure type, and error message.
14class JsonErrorMessageParser(ErrorMessageParser): 15 def _try_get_error(self, value: Optional[JsonType]) -> Optional[str]: 16 if isinstance(value, str): 17 return value 18 elif isinstance(value, list): 19 errors_in_value = [self._try_get_error(v) for v in value] 20 return ", ".join(v for v in errors_in_value if v is not None) 21 elif isinstance(value, dict): 22 new_value = ( 23 value.get("message") 24 or value.get("messages") 25 or value.get("error") 26 or value.get("errors") 27 or value.get("failures") 28 or value.get("failure") 29 or value.get("detail") 30 or value.get("err") 31 or value.get("error_message") 32 or value.get("msg") 33 or value.get("reason") 34 or value.get("status_message") 35 ) 36 return self._try_get_error(new_value) 37 return None 38 39 def parse_response_error_message(self, response: requests.Response) -> Optional[str]: 40 """ 41 Parses the raw response object from a failed request into a user-friendly error message. 42 43 :param response: 44 :return: A user-friendly message that indicates the cause of the error 45 """ 46 try: 47 body = response.json() 48 return self._try_get_error(body) 49 except requests.exceptions.JSONDecodeError: 50 try: 51 return response.content.decode("utf-8") 52 except Exception: 53 return None
Helper class that provides a standard way to create an ABC using inheritance.
39 def parse_response_error_message(self, response: requests.Response) -> Optional[str]: 40 """ 41 Parses the raw response object from a failed request into a user-friendly error message. 42 43 :param response: 44 :return: A user-friendly message that indicates the cause of the error 45 """ 46 try: 47 body = response.json() 48 return self._try_get_error(body) 49 except requests.exceptions.JSONDecodeError: 50 try: 51 return response.content.decode("utf-8") 52 except Exception: 53 return None
Parses the raw response object from a failed request into a user-friendly error message.
Parameters
- response:
Returns
A user-friendly message that indicates the cause of the error
15class ResponseAction(Enum): 16 SUCCESS = "SUCCESS" 17 RETRY = "RETRY" 18 FAIL = "FAIL" 19 IGNORE = "IGNORE" 20 RATE_LIMITED = "RATE_LIMITED"
An enumeration.
23@dataclass 24class ErrorResolution: 25 response_action: Optional[ResponseAction] = None 26 failure_type: Optional[FailureType] = None 27 error_message: Optional[str] = None