airbyte_cdk.sources.streams.http.error_handlers

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5from .backoff_strategy import BackoffStrategy
 6from .default_backoff_strategy import DefaultBackoffStrategy
 7from .error_handler import ErrorHandler
 8from .error_message_parser import ErrorMessageParser
 9from .http_status_error_handler import HttpStatusErrorHandler
10from .json_error_message_parser import JsonErrorMessageParser
11from .response_models import ErrorResolution, ResponseAction
12
13__all__ = [
14    "BackoffStrategy",
15    "DefaultBackoffStrategy",
16    "ErrorHandler",
17    "ErrorMessageParser",
18    "HttpStatusErrorHandler",
19    "JsonErrorMessageParser",
20    "ResponseAction",
21    "ErrorResolution",
22]
class BackoffStrategy(abc.ABC):
12class BackoffStrategy(ABC):
13    @abstractmethod
14    def backoff_time(
15        self,
16        response_or_exception: Optional[Union[requests.Response, requests.RequestException]],
17        attempt_count: int,
18    ) -> Optional[float]:
19        """
20        Override this method to dynamically determine backoff time e.g: by reading the X-Retry-After header.
21
22        This method is called only if should_backoff() returns True for the input request.
23
24        :param response_or_exception: The response or exception that caused the backoff.
25        :param attempt_count: The number of attempts already performed for this request.
26        :return how long to backoff in seconds. The return value may be a floating point number for subsecond precision. Returning None defers backoff
27        to the default backoff behavior (e.g using an exponential algorithm).
28        """
29        pass

Helper class that provides a standard way to create an ABC using inheritance.

@abstractmethod
def backoff_time( self, response_or_exception: Union[requests.models.Response, requests.exceptions.RequestException, NoneType], attempt_count: int) -> Optional[float]:
13    @abstractmethod
14    def backoff_time(
15        self,
16        response_or_exception: Optional[Union[requests.Response, requests.RequestException]],
17        attempt_count: int,
18    ) -> Optional[float]:
19        """
20        Override this method to dynamically determine backoff time e.g: by reading the X-Retry-After header.
21
22        This method is called only if should_backoff() returns True for the input request.
23
24        :param response_or_exception: The response or exception that caused the backoff.
25        :param attempt_count: The number of attempts already performed for this request.
26        :return how long to backoff in seconds. The return value may be a floating point number for subsecond precision. Returning None defers backoff
27        to the default backoff behavior (e.g using an exponential algorithm).
28        """
29        pass

Override this method to dynamically determine backoff time e.g: by reading the X-Retry-After header.

This method is called only if should_backoff() returns True for the input request.

Parameters
  • response_or_exception: The response or exception that caused the backoff.
  • attempt_count: The number of attempts already performed for this request. :return how long to backoff in seconds. The return value may be a floating point number for subsecond precision. Returning None defers backoff to the default backoff behavior (e.g using an exponential algorithm).
class DefaultBackoffStrategy(airbyte_cdk.sources.streams.http.error_handlers.BackoffStrategy):
12class DefaultBackoffStrategy(BackoffStrategy):
13    def backoff_time(
14        self,
15        response_or_exception: Optional[Union[requests.Response, requests.RequestException]],
16        attempt_count: int,
17    ) -> Optional[float]:
18        return None

Helper class that provides a standard way to create an ABC using inheritance.

def backoff_time( self, response_or_exception: Union[requests.models.Response, requests.exceptions.RequestException, NoneType], attempt_count: int) -> Optional[float]:
13    def backoff_time(
14        self,
15        response_or_exception: Optional[Union[requests.Response, requests.RequestException]],
16        attempt_count: int,
17    ) -> Optional[float]:
18        return None

Override this method to dynamically determine backoff time e.g: by reading the X-Retry-After header.

This method is called only if should_backoff() returns True for the input request.

Parameters
  • response_or_exception: The response or exception that caused the backoff.
  • attempt_count: The number of attempts already performed for this request. :return how long to backoff in seconds. The return value may be a floating point number for subsecond precision. Returning None defers backoff to the default backoff behavior (e.g using an exponential algorithm).
class ErrorHandler(abc.ABC):
12class ErrorHandler(ABC):
13    """
14    Abstract base class to determine how to handle a failed HTTP request.
15    """
16
17    @property
18    @abstractmethod
19    def max_retries(self) -> Optional[int]:
20        """
21        The maximum number of retries to attempt before giving up.
22        """
23        pass
24
25    @property
26    @abstractmethod
27    def max_time(self) -> Optional[int]:
28        """
29        The maximum amount of time in seconds to retry before giving up.
30        """
31        pass
32
33    @abstractmethod
34    def interpret_response(
35        self, response: Optional[Union[requests.Response, Exception]]
36    ) -> ErrorResolution:
37        """
38        Interpret the response or exception and return the corresponding response action, failure type, and error message.
39
40        :param response: The HTTP response object or exception raised during the request.
41        :return: A tuple containing the response action, failure type, and error message.
42        """
43        pass

Abstract base class to determine how to handle a failed HTTP request.

max_retries: Optional[int]
17    @property
18    @abstractmethod
19    def max_retries(self) -> Optional[int]:
20        """
21        The maximum number of retries to attempt before giving up.
22        """
23        pass

The maximum number of retries to attempt before giving up.

max_time: Optional[int]
25    @property
26    @abstractmethod
27    def max_time(self) -> Optional[int]:
28        """
29        The maximum amount of time in seconds to retry before giving up.
30        """
31        pass

The maximum amount of time in seconds to retry before giving up.

@abstractmethod
def interpret_response( self, response: Union[requests.models.Response, Exception, NoneType]) -> ErrorResolution:
33    @abstractmethod
34    def interpret_response(
35        self, response: Optional[Union[requests.Response, Exception]]
36    ) -> ErrorResolution:
37        """
38        Interpret the response or exception and return the corresponding response action, failure type, and error message.
39
40        :param response: The HTTP response object or exception raised during the request.
41        :return: A tuple containing the response action, failure type, and error message.
42        """
43        pass

Interpret the response or exception and return the corresponding response action, failure type, and error message.

Parameters
  • response: The HTTP response object or exception raised during the request.
Returns

A tuple containing the response action, failure type, and error message.

class ErrorMessageParser(abc.ABC):
12class ErrorMessageParser(ABC):
13    @abstractmethod
14    def parse_response_error_message(self, response: requests.Response) -> Optional[str]:
15        """
16        Parse error message from response.
17        :param response: response received for the request
18        :return: error message
19        """
20        pass

Helper class that provides a standard way to create an ABC using inheritance.

@abstractmethod
def parse_response_error_message(self, response: requests.models.Response) -> Optional[str]:
13    @abstractmethod
14    def parse_response_error_message(self, response: requests.Response) -> Optional[str]:
15        """
16        Parse error message from response.
17        :param response: response received for the request
18        :return: error message
19        """
20        pass

Parse error message from response.

Parameters
  • response: response received for the request
Returns

error message

class HttpStatusErrorHandler(airbyte_cdk.sources.streams.http.error_handlers.ErrorHandler):
 23class HttpStatusErrorHandler(ErrorHandler):
 24    def __init__(
 25        self,
 26        logger: logging.Logger,
 27        error_mapping: Optional[Mapping[Union[int, str, type[Exception]], ErrorResolution]] = None,
 28        max_retries: int = 5,
 29        max_time: timedelta = timedelta(seconds=600),
 30    ) -> None:
 31        """
 32        Initialize the HttpStatusErrorHandler.
 33
 34        :param error_mapping: Custom error mappings to extend or override the default mappings.
 35        """
 36        self._logger = logger
 37        self._error_mapping = error_mapping or DEFAULT_ERROR_MAPPING
 38        self._max_retries = max_retries
 39        self._max_time = int(max_time.total_seconds())
 40
 41    @property
 42    def max_retries(self) -> Optional[int]:
 43        return self._max_retries
 44
 45    @property
 46    def max_time(self) -> Optional[int]:
 47        return self._max_time
 48
 49    def interpret_response(
 50        self, response_or_exception: Optional[Union[requests.Response, Exception]] = None
 51    ) -> ErrorResolution:
 52        """
 53        Interpret the response and return the corresponding response action, failure type, and error message.
 54
 55        :param response: The HTTP response object.
 56        :return: A tuple containing the response action, failure type, and error message.
 57        """
 58
 59        if isinstance(response_or_exception, Exception):
 60            mapped_error: Optional[ErrorResolution] = self._error_mapping.get(
 61                response_or_exception.__class__
 62            )
 63
 64            if mapped_error is not None:
 65                return mapped_error
 66            else:
 67                self._logger.error(
 68                    f"Unexpected exception in error handler: {response_or_exception}"
 69                )
 70                return ErrorResolution(
 71                    response_action=ResponseAction.RETRY,
 72                    failure_type=FailureType.system_error,
 73                    error_message=f"Unexpected exception in error handler: {response_or_exception}",
 74                )
 75
 76        elif isinstance(response_or_exception, requests.Response):
 77            if response_or_exception.status_code is None:
 78                self._logger.error("Response does not include an HTTP status code.")
 79                return ErrorResolution(
 80                    response_action=ResponseAction.RETRY,
 81                    failure_type=FailureType.transient_error,
 82                    error_message="Response does not include an HTTP status code.",
 83                )
 84
 85            if response_or_exception.ok:
 86                return ErrorResolution(
 87                    response_action=ResponseAction.SUCCESS,
 88                    failure_type=None,
 89                    error_message=None,
 90                )
 91
 92            error_key = response_or_exception.status_code
 93
 94            mapped_error = self._error_mapping.get(error_key)
 95
 96            if mapped_error is not None:
 97                return mapped_error
 98            else:
 99                self._logger.warning(f"Unexpected HTTP Status Code in error handler: '{error_key}'")
100                return ErrorResolution(
101                    response_action=ResponseAction.RETRY,
102                    failure_type=FailureType.system_error,
103                    error_message=f"Unexpected HTTP Status Code in error handler: {error_key}",
104                )
105        else:
106            self._logger.error(f"Received unexpected response type: {type(response_or_exception)}")
107            return ErrorResolution(
108                response_action=ResponseAction.FAIL,
109                failure_type=FailureType.system_error,
110                error_message=f"Received unexpected response type: {type(response_or_exception)}",
111            )

Abstract base class to determine how to handle a failed HTTP request.

HttpStatusErrorHandler( logger: logging.Logger, error_mapping: Optional[Mapping[Union[int, str, type[Exception]], ErrorResolution]] = None, max_retries: int = 5, max_time: datetime.timedelta = datetime.timedelta(seconds=600))
24    def __init__(
25        self,
26        logger: logging.Logger,
27        error_mapping: Optional[Mapping[Union[int, str, type[Exception]], ErrorResolution]] = None,
28        max_retries: int = 5,
29        max_time: timedelta = timedelta(seconds=600),
30    ) -> None:
31        """
32        Initialize the HttpStatusErrorHandler.
33
34        :param error_mapping: Custom error mappings to extend or override the default mappings.
35        """
36        self._logger = logger
37        self._error_mapping = error_mapping or DEFAULT_ERROR_MAPPING
38        self._max_retries = max_retries
39        self._max_time = int(max_time.total_seconds())

Initialize the HttpStatusErrorHandler.

Parameters
  • error_mapping: Custom error mappings to extend or override the default mappings.
max_retries: Optional[int]
41    @property
42    def max_retries(self) -> Optional[int]:
43        return self._max_retries

The maximum number of retries to attempt before giving up.

max_time: Optional[int]
45    @property
46    def max_time(self) -> Optional[int]:
47        return self._max_time

The maximum amount of time in seconds to retry before giving up.

def interpret_response( self, response_or_exception: Union[requests.models.Response, Exception, NoneType] = None) -> ErrorResolution:
 49    def interpret_response(
 50        self, response_or_exception: Optional[Union[requests.Response, Exception]] = None
 51    ) -> ErrorResolution:
 52        """
 53        Interpret the response and return the corresponding response action, failure type, and error message.
 54
 55        :param response: The HTTP response object.
 56        :return: A tuple containing the response action, failure type, and error message.
 57        """
 58
 59        if isinstance(response_or_exception, Exception):
 60            mapped_error: Optional[ErrorResolution] = self._error_mapping.get(
 61                response_or_exception.__class__
 62            )
 63
 64            if mapped_error is not None:
 65                return mapped_error
 66            else:
 67                self._logger.error(
 68                    f"Unexpected exception in error handler: {response_or_exception}"
 69                )
 70                return ErrorResolution(
 71                    response_action=ResponseAction.RETRY,
 72                    failure_type=FailureType.system_error,
 73                    error_message=f"Unexpected exception in error handler: {response_or_exception}",
 74                )
 75
 76        elif isinstance(response_or_exception, requests.Response):
 77            if response_or_exception.status_code is None:
 78                self._logger.error("Response does not include an HTTP status code.")
 79                return ErrorResolution(
 80                    response_action=ResponseAction.RETRY,
 81                    failure_type=FailureType.transient_error,
 82                    error_message="Response does not include an HTTP status code.",
 83                )
 84
 85            if response_or_exception.ok:
 86                return ErrorResolution(
 87                    response_action=ResponseAction.SUCCESS,
 88                    failure_type=None,
 89                    error_message=None,
 90                )
 91
 92            error_key = response_or_exception.status_code
 93
 94            mapped_error = self._error_mapping.get(error_key)
 95
 96            if mapped_error is not None:
 97                return mapped_error
 98            else:
 99                self._logger.warning(f"Unexpected HTTP Status Code in error handler: '{error_key}'")
100                return ErrorResolution(
101                    response_action=ResponseAction.RETRY,
102                    failure_type=FailureType.system_error,
103                    error_message=f"Unexpected HTTP Status Code in error handler: {error_key}",
104                )
105        else:
106            self._logger.error(f"Received unexpected response type: {type(response_or_exception)}")
107            return ErrorResolution(
108                response_action=ResponseAction.FAIL,
109                failure_type=FailureType.system_error,
110                error_message=f"Received unexpected response type: {type(response_or_exception)}",
111            )

Interpret the response and return the corresponding response action, failure type, and error message.

Parameters
  • response: The HTTP response object.
Returns

A tuple containing the response action, failure type, and error message.

class JsonErrorMessageParser(airbyte_cdk.sources.streams.http.error_handlers.ErrorMessageParser):
14class JsonErrorMessageParser(ErrorMessageParser):
15    def _try_get_error(self, value: Optional[JsonType]) -> Optional[str]:
16        if isinstance(value, str):
17            return value
18        elif isinstance(value, list):
19            errors_in_value = [self._try_get_error(v) for v in value]
20            return ", ".join(v for v in errors_in_value if v is not None)
21        elif isinstance(value, dict):
22            new_value = (
23                value.get("message")
24                or value.get("messages")
25                or value.get("error")
26                or value.get("errors")
27                or value.get("failures")
28                or value.get("failure")
29                or value.get("detail")
30                or value.get("err")
31                or value.get("error_message")
32                or value.get("msg")
33                or value.get("reason")
34                or value.get("status_message")
35            )
36            return self._try_get_error(new_value)
37        return None
38
39    def parse_response_error_message(self, response: requests.Response) -> Optional[str]:
40        """
41        Parses the raw response object from a failed request into a user-friendly error message.
42
43        :param response:
44        :return: A user-friendly message that indicates the cause of the error
45        """
46        try:
47            body = response.json()
48            return self._try_get_error(body)
49        except requests.exceptions.JSONDecodeError:
50            try:
51                return response.content.decode("utf-8")
52            except Exception:
53                return None

Helper class that provides a standard way to create an ABC using inheritance.

def parse_response_error_message(self, response: requests.models.Response) -> Optional[str]:
39    def parse_response_error_message(self, response: requests.Response) -> Optional[str]:
40        """
41        Parses the raw response object from a failed request into a user-friendly error message.
42
43        :param response:
44        :return: A user-friendly message that indicates the cause of the error
45        """
46        try:
47            body = response.json()
48            return self._try_get_error(body)
49        except requests.exceptions.JSONDecodeError:
50            try:
51                return response.content.decode("utf-8")
52            except Exception:
53                return None

Parses the raw response object from a failed request into a user-friendly error message.

Parameters
  • response:
Returns

A user-friendly message that indicates the cause of the error

class ResponseAction(enum.Enum):
15class ResponseAction(Enum):
16    SUCCESS = "SUCCESS"
17    RETRY = "RETRY"
18    FAIL = "FAIL"
19    IGNORE = "IGNORE"
20    RATE_LIMITED = "RATE_LIMITED"

An enumeration.

SUCCESS = <ResponseAction.SUCCESS: 'SUCCESS'>
RETRY = <ResponseAction.RETRY: 'RETRY'>
FAIL = <ResponseAction.FAIL: 'FAIL'>
IGNORE = <ResponseAction.IGNORE: 'IGNORE'>
RATE_LIMITED = <ResponseAction.RATE_LIMITED: 'RATE_LIMITED'>
@dataclass
class ErrorResolution:
23@dataclass
24class ErrorResolution:
25    response_action: Optional[ResponseAction] = None
26    failure_type: Optional[FailureType] = None
27    error_message: Optional[str] = None
ErrorResolution( response_action: Optional[ResponseAction] = None, failure_type: Optional[airbyte_protocol_dataclasses.models.airbyte_protocol.FailureType] = None, error_message: Optional[str] = None)
response_action: Optional[ResponseAction] = None
failure_type: Optional[airbyte_protocol_dataclasses.models.airbyte_protocol.FailureType] = None
error_message: Optional[str] = None