airbyte_cdk.sources.declarative.requesters.error_handlers

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategy import (
 6    BackoffStrategy,
 7)
 8from airbyte_cdk.sources.declarative.requesters.error_handlers.composite_error_handler import (
 9    CompositeErrorHandler,
10)
11from airbyte_cdk.sources.declarative.requesters.error_handlers.default_error_handler import (
12    DefaultErrorHandler,
13)
14from airbyte_cdk.sources.declarative.requesters.error_handlers.error_handler import ErrorHandler
15from airbyte_cdk.sources.declarative.requesters.error_handlers.http_response_filter import (
16    HttpResponseFilter,
17)
18
19__all__ = [
20    "BackoffStrategy",
21    "CompositeErrorHandler",
22    "DefaultErrorHandler",
23    "ErrorHandler",
24    "HttpResponseFilter",
25]
class BackoffStrategy(abc.ABC):
12class BackoffStrategy(ABC):
13    @abstractmethod
14    def backoff_time(
15        self,
16        response_or_exception: Optional[Union[requests.Response, requests.RequestException]],
17        attempt_count: int,
18    ) -> Optional[float]:
19        """
20        Override this method to dynamically determine backoff time e.g: by reading the X-Retry-After header.
21
22        This method is called only if should_backoff() returns True for the input request.
23
24        :param response_or_exception: The response or exception that caused the backoff.
25        :param attempt_count: The number of attempts already performed for this request.
26        :return how long to backoff in seconds. The return value may be a floating point number for subsecond precision. Returning None defers backoff
27        to the default backoff behavior (e.g using an exponential algorithm).
28        """
29        pass

Helper class that provides a standard way to create an ABC using inheritance.

@abstractmethod
def backoff_time( self, response_or_exception: Union[requests.models.Response, requests.exceptions.RequestException, NoneType], attempt_count: int) -> Optional[float]:
13    @abstractmethod
14    def backoff_time(
15        self,
16        response_or_exception: Optional[Union[requests.Response, requests.RequestException]],
17        attempt_count: int,
18    ) -> Optional[float]:
19        """
20        Override this method to dynamically determine backoff time e.g: by reading the X-Retry-After header.
21
22        This method is called only if should_backoff() returns True for the input request.
23
24        :param response_or_exception: The response or exception that caused the backoff.
25        :param attempt_count: The number of attempts already performed for this request.
26        :return how long to backoff in seconds. The return value may be a floating point number for subsecond precision. Returning None defers backoff
27        to the default backoff behavior (e.g using an exponential algorithm).
28        """
29        pass

Override this method to dynamically determine backoff time e.g: by reading the X-Retry-After header.

This method is called only if should_backoff() returns True for the input request.

Parameters
  • response_or_exception: The response or exception that caused the backoff.
  • attempt_count: The number of attempts already performed for this request. :return how long to backoff in seconds. The return value may be a floating point number for subsecond precision. Returning None defers backoff to the default backoff behavior (e.g using an exponential algorithm).
@dataclass
class CompositeErrorHandler(airbyte_cdk.sources.declarative.requesters.error_handlers.ErrorHandler):
 20@dataclass
 21class CompositeErrorHandler(ErrorHandler):
 22    """
 23    Error handler that sequentially iterates over a list of `ErrorHandler`s
 24
 25    Sample config chaining 2 different retriers:
 26        error_handler:
 27          type: "CompositeErrorHandler"
 28          error_handlers:
 29            - response_filters:
 30                - predicate: "{{ 'codase' in response }}"
 31                  action: RETRY
 32              backoff_strategies:
 33                - type: "ConstantBackoff"
 34                  backoff_time_in_seconds: 5
 35            - response_filters:
 36                - http_codes: [ 403 ]
 37                  action: RETRY
 38              backoff_strategies:
 39                - type: "ConstantBackoff"
 40                  backoff_time_in_seconds: 10
 41    Attributes:
 42        error_handlers (List[ErrorHandler]): list of error handlers
 43    """
 44
 45    error_handlers: List[ErrorHandler]
 46    parameters: InitVar[Mapping[str, Any]]
 47
 48    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 49        if not self.error_handlers:
 50            raise ValueError("CompositeErrorHandler expects at least 1 underlying error handler")
 51
 52    @property
 53    def max_retries(self) -> Optional[int]:
 54        return self.error_handlers[0].max_retries
 55
 56    @property
 57    def max_time(self) -> Optional[int]:
 58        return max([error_handler.max_time or 0 for error_handler in self.error_handlers])
 59
 60    def interpret_response(
 61        self, response_or_exception: Optional[Union[requests.Response, Exception]]
 62    ) -> ErrorResolution:
 63        matched_error_resolution = None
 64        for error_handler in self.error_handlers:
 65            matched_error_resolution = error_handler.interpret_response(response_or_exception)
 66
 67            if not isinstance(matched_error_resolution, ErrorResolution):
 68                continue
 69
 70            if matched_error_resolution.response_action == ResponseAction.SUCCESS:
 71                return matched_error_resolution
 72
 73            if (
 74                matched_error_resolution.response_action == ResponseAction.RETRY
 75                or matched_error_resolution.response_action == ResponseAction.IGNORE
 76            ):
 77                return matched_error_resolution
 78        if matched_error_resolution:
 79            return matched_error_resolution
 80
 81        return create_fallback_error_resolution(response_or_exception)
 82
 83    @property
 84    def backoff_strategies(self) -> Optional[List[BackoffStrategy]]:
 85        """
 86        Combines backoff strategies from all child error handlers into a single flattened list.
 87
 88        When used with HttpRequester, note the following behavior:
 89        - In HttpRequester.__post_init__, the entire list of backoff strategies is assigned to the error handler
 90        - However, the error handler's backoff_time() method only ever uses the first non-None strategy in the list
 91        - This means that if any backoff strategies are present, the first non-None strategy becomes the default
 92        - This applies to both user-defined response filters and errors from DEFAULT_ERROR_MAPPING
 93        - The list structure is not used to map different strategies to different error conditions
 94        - Therefore, subsequent strategies in the list will not be used
 95
 96        Returns None if no handlers have strategies defined, which will result in HttpRequester using its default backoff strategy.
 97        """
 98        all_strategies = []
 99        for handler in self.error_handlers:
100            if hasattr(handler, "backoff_strategies") and handler.backoff_strategies:
101                all_strategies.extend(handler.backoff_strategies)
102        return all_strategies if all_strategies else None

Error handler that sequentially iterates over a list of ErrorHandlers

Sample config chaining 2 different retriers: error_handler: type: "CompositeErrorHandler" error_handlers: - response_filters: - predicate: "{{ 'codase' in response }}" action: RETRY backoff_strategies: - type: "ConstantBackoff" backoff_time_in_seconds: 5 - response_filters: - http_codes: [ 403 ] action: RETRY backoff_strategies: - type: "ConstantBackoff" backoff_time_in_seconds: 10

Attributes:
  • error_handlers (List[ErrorHandler]): list of error handlers
CompositeErrorHandler( error_handlers: List[ErrorHandler], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
error_handlers: List[ErrorHandler]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
max_retries: Optional[int]
52    @property
53    def max_retries(self) -> Optional[int]:
54        return self.error_handlers[0].max_retries

The maximum number of retries to attempt before giving up.

max_time: Optional[int]
56    @property
57    def max_time(self) -> Optional[int]:
58        return max([error_handler.max_time or 0 for error_handler in self.error_handlers])

The maximum amount of time in seconds to retry before giving up.

def interpret_response( self, response_or_exception: Union[requests.models.Response, Exception, NoneType]) -> airbyte_cdk.sources.streams.http.error_handlers.ErrorResolution:
60    def interpret_response(
61        self, response_or_exception: Optional[Union[requests.Response, Exception]]
62    ) -> ErrorResolution:
63        matched_error_resolution = None
64        for error_handler in self.error_handlers:
65            matched_error_resolution = error_handler.interpret_response(response_or_exception)
66
67            if not isinstance(matched_error_resolution, ErrorResolution):
68                continue
69
70            if matched_error_resolution.response_action == ResponseAction.SUCCESS:
71                return matched_error_resolution
72
73            if (
74                matched_error_resolution.response_action == ResponseAction.RETRY
75                or matched_error_resolution.response_action == ResponseAction.IGNORE
76            ):
77                return matched_error_resolution
78        if matched_error_resolution:
79            return matched_error_resolution
80
81        return create_fallback_error_resolution(response_or_exception)

Interpret the response or exception and return the corresponding response action, failure type, and error message.

Parameters
  • response: The HTTP response object or exception raised during the request.
Returns

A tuple containing the response action, failure type, and error message.

backoff_strategies: Optional[List[BackoffStrategy]]
 83    @property
 84    def backoff_strategies(self) -> Optional[List[BackoffStrategy]]:
 85        """
 86        Combines backoff strategies from all child error handlers into a single flattened list.
 87
 88        When used with HttpRequester, note the following behavior:
 89        - In HttpRequester.__post_init__, the entire list of backoff strategies is assigned to the error handler
 90        - However, the error handler's backoff_time() method only ever uses the first non-None strategy in the list
 91        - This means that if any backoff strategies are present, the first non-None strategy becomes the default
 92        - This applies to both user-defined response filters and errors from DEFAULT_ERROR_MAPPING
 93        - The list structure is not used to map different strategies to different error conditions
 94        - Therefore, subsequent strategies in the list will not be used
 95
 96        Returns None if no handlers have strategies defined, which will result in HttpRequester using its default backoff strategy.
 97        """
 98        all_strategies = []
 99        for handler in self.error_handlers:
100            if hasattr(handler, "backoff_strategies") and handler.backoff_strategies:
101                all_strategies.extend(handler.backoff_strategies)
102        return all_strategies if all_strategies else None

Combines backoff strategies from all child error handlers into a single flattened list.

When used with HttpRequester, note the following behavior:

  • In HttpRequester.__post_init__, the entire list of backoff strategies is assigned to the error handler
  • However, the error handler's backoff_time() method only ever uses the first non-None strategy in the list
  • This means that if any backoff strategies are present, the first non-None strategy becomes the default
  • This applies to both user-defined response filters and errors from DEFAULT_ERROR_MAPPING
  • The list structure is not used to map different strategies to different error conditions
  • Therefore, subsequent strategies in the list will not be used

Returns None if no handlers have strategies defined, which will result in HttpRequester using its default backoff strategy.

@dataclass
class DefaultErrorHandler(airbyte_cdk.sources.declarative.requesters.error_handlers.ErrorHandler):
 26@dataclass
 27class DefaultErrorHandler(ErrorHandler):
 28    """
 29    Default error handler.
 30
 31    By default, the handler will only use the `DEFAULT_ERROR_MAPPING` that is part of the Python CDK's `HttpStatusErrorHandler`.
 32
 33    If the response is successful, then a SUCCESS_RESOLUTION is returned.
 34    Otherwise, iterate over the response_filters.
 35    If any of the filter match the response, then return the appropriate status.
 36    When `DefaultErrorHandler.backoff_time()` is invoked, iterate sequentially over the backoff_strategies and return the first non-None backoff time, else return None.
 37
 38    Sample configs:
 39
 40    1. retry 10 times
 41    `
 42        error_handler:
 43          max_retries: 10
 44    `
 45    2. backoff for 5 seconds
 46    `
 47        error_handler:
 48          backoff_strategies:
 49            - type: "ConstantBackoff"
 50              backoff_time_in_seconds: 5
 51    `
 52    3. retry on HTTP 404
 53    `
 54        error_handler:
 55          response_filters:
 56            - http_codes: [ 404 ]
 57              action: RETRY
 58    `
 59    4. ignore HTTP 404
 60    `
 61      error_handler:
 62        response_filters:
 63          - http_codes: [ 404 ]
 64            action: IGNORE
 65    `
 66    5. retry if error message contains `retrythisrequest!` substring
 67    `
 68        error_handler:
 69          response_filters:
 70            - error_message_contain: "retrythisrequest!"
 71              action: IGNORE
 72    `
 73    6. retry if 'code' is a field present in the response body
 74    `
 75        error_handler:
 76          response_filters:
 77            - predicate: "{{ 'code' in response }}"
 78              action: IGNORE
 79    `
 80
 81    7. ignore 429 and retry on 404
 82    `
 83        error_handler:
 84        - http_codes: [ 429 ]
 85          action: IGNORE
 86        - http_codes: [ 404 ]
 87          action: RETRY
 88    `
 89
 90    Attributes:
 91        response_filters (Optional[List[HttpResponseFilter]]): response filters to iterate on
 92        max_retries (Optional[int]): maximum retry attempts
 93        backoff_strategies (Optional[List[BackoffStrategy]]): list of backoff strategies to use to determine how long
 94        to wait before retrying
 95    """
 96
 97    parameters: InitVar[Mapping[str, Any]]
 98    config: Config
 99    response_filters: Optional[List[HttpResponseFilter]] = None
100    max_retries: Optional[int] = 5
101    max_time: int = 60 * 10
102    _max_retries: int = field(init=False, repr=False, default=5)
103    _max_time: int = field(init=False, repr=False, default=60 * 10)
104    backoff_strategies: Optional[List[BackoffStrategy]] = None
105
106    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
107        if not self.response_filters:
108            self.response_filters = [HttpResponseFilter(config=self.config, parameters={})]
109
110        self._last_request_to_attempt_count: MutableMapping[requests.PreparedRequest, int] = {}
111
112    def interpret_response(
113        self, response_or_exception: Optional[Union[requests.Response, Exception]]
114    ) -> ErrorResolution:
115        if self.response_filters:
116            for response_filter in self.response_filters:
117                matched_error_resolution = response_filter.matches(
118                    response_or_exception=response_or_exception
119                )
120                if matched_error_resolution:
121                    return matched_error_resolution
122        if isinstance(response_or_exception, requests.Response):
123            if response_or_exception.ok:
124                return SUCCESS_RESOLUTION
125
126        default_reponse_filter = DefaultHttpResponseFilter(parameters={}, config=self.config)
127        default_response_filter_resolution = default_reponse_filter.matches(response_or_exception)
128
129        return (
130            default_response_filter_resolution
131            if default_response_filter_resolution
132            else create_fallback_error_resolution(response_or_exception)
133        )
134
135    def backoff_time(
136        self,
137        response_or_exception: Optional[Union[requests.Response, requests.RequestException]],
138        attempt_count: int = 0,
139    ) -> Optional[float]:
140        backoff = None
141        if self.backoff_strategies:
142            for backoff_strategy in self.backoff_strategies:
143                backoff = backoff_strategy.backoff_time(
144                    response_or_exception=response_or_exception, attempt_count=attempt_count
145                )
146                if backoff:
147                    return backoff
148        return backoff

Default error handler.

By default, the handler will only use the DEFAULT_ERROR_MAPPING that is part of the Python CDK's HttpStatusErrorHandler.

If the response is successful, then a SUCCESS_RESOLUTION is returned. Otherwise, iterate over the response_filters. If any of the filter match the response, then return the appropriate status. When DefaultErrorHandler.backoff_time() is invoked, iterate sequentially over the backoff_strategies and return the first non-None backoff time, else return None.

Sample configs:

  1. retry 10 times error_handler: max_retries: 10
  2. backoff for 5 seconds error_handler: backoff_strategies: - type: "ConstantBackoff" backoff_time_in_seconds: 5
  3. retry on HTTP 404 error_handler: response_filters: - http_codes: [ 404 ] action: RETRY
  4. ignore HTTP 404 error_handler: response_filters: - http_codes: [ 404 ] action: IGNORE
  5. retry if error message contains retrythisrequest! substring error_handler: response_filters: - error_message_contain: "retrythisrequest!" action: IGNORE
  6. retry if 'code' is a field present in the response body error_handler: response_filters: - predicate: "{{ 'code' in response }}" action: IGNORE

  7. ignore 429 and retry on 404 ` error_handler:

    • http_codes: [ 429 ] action: IGNORE
    • http_codes: [ 404 ] action: RETRY `
Attributes:
  • response_filters (Optional[List[HttpResponseFilter]]): response filters to iterate on
  • max_retries (Optional[int]): maximum retry attempts
  • backoff_strategies (Optional[List[BackoffStrategy]]): list of backoff strategies to use to determine how long
  • to wait before retrying
DefaultErrorHandler( parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], config: Mapping[str, Any], response_filters: Optional[List[HttpResponseFilter]] = None, max_retries: Optional[int] = 5, max_time: int = 600, backoff_strategies: Optional[List[BackoffStrategy]] = None)
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
config: Mapping[str, Any]
response_filters: Optional[List[HttpResponseFilter]] = None
max_retries: Optional[int] = 5

The maximum number of retries to attempt before giving up.

max_time: int = 600

The maximum amount of time in seconds to retry before giving up.

backoff_strategies: Optional[List[BackoffStrategy]] = None
def interpret_response( self, response_or_exception: Union[requests.models.Response, Exception, NoneType]) -> airbyte_cdk.sources.streams.http.error_handlers.ErrorResolution:
112    def interpret_response(
113        self, response_or_exception: Optional[Union[requests.Response, Exception]]
114    ) -> ErrorResolution:
115        if self.response_filters:
116            for response_filter in self.response_filters:
117                matched_error_resolution = response_filter.matches(
118                    response_or_exception=response_or_exception
119                )
120                if matched_error_resolution:
121                    return matched_error_resolution
122        if isinstance(response_or_exception, requests.Response):
123            if response_or_exception.ok:
124                return SUCCESS_RESOLUTION
125
126        default_reponse_filter = DefaultHttpResponseFilter(parameters={}, config=self.config)
127        default_response_filter_resolution = default_reponse_filter.matches(response_or_exception)
128
129        return (
130            default_response_filter_resolution
131            if default_response_filter_resolution
132            else create_fallback_error_resolution(response_or_exception)
133        )

Interpret the response or exception and return the corresponding response action, failure type, and error message.

Parameters
  • response: The HTTP response object or exception raised during the request.
Returns

A tuple containing the response action, failure type, and error message.

def backoff_time( self, response_or_exception: Union[requests.models.Response, requests.exceptions.RequestException, NoneType], attempt_count: int = 0) -> Optional[float]:
135    def backoff_time(
136        self,
137        response_or_exception: Optional[Union[requests.Response, requests.RequestException]],
138        attempt_count: int = 0,
139    ) -> Optional[float]:
140        backoff = None
141        if self.backoff_strategies:
142            for backoff_strategy in self.backoff_strategies:
143                backoff = backoff_strategy.backoff_time(
144                    response_or_exception=response_or_exception, attempt_count=attempt_count
145                )
146                if backoff:
147                    return backoff
148        return backoff
class ErrorHandler(abc.ABC):
12class ErrorHandler(ABC):
13    """
14    Abstract base class to determine how to handle a failed HTTP request.
15    """
16
17    @property
18    @abstractmethod
19    def max_retries(self) -> Optional[int]:
20        """
21        The maximum number of retries to attempt before giving up.
22        """
23        pass
24
25    @property
26    @abstractmethod
27    def max_time(self) -> Optional[int]:
28        """
29        The maximum amount of time in seconds to retry before giving up.
30        """
31        pass
32
33    @abstractmethod
34    def interpret_response(
35        self, response: Optional[Union[requests.Response, Exception]]
36    ) -> ErrorResolution:
37        """
38        Interpret the response or exception and return the corresponding response action, failure type, and error message.
39
40        :param response: The HTTP response object or exception raised during the request.
41        :return: A tuple containing the response action, failure type, and error message.
42        """
43        pass

Abstract base class to determine how to handle a failed HTTP request.

max_retries: Optional[int]
17    @property
18    @abstractmethod
19    def max_retries(self) -> Optional[int]:
20        """
21        The maximum number of retries to attempt before giving up.
22        """
23        pass

The maximum number of retries to attempt before giving up.

max_time: Optional[int]
25    @property
26    @abstractmethod
27    def max_time(self) -> Optional[int]:
28        """
29        The maximum amount of time in seconds to retry before giving up.
30        """
31        pass

The maximum amount of time in seconds to retry before giving up.

@abstractmethod
def interpret_response( self, response: Union[requests.models.Response, Exception, NoneType]) -> airbyte_cdk.sources.streams.http.error_handlers.ErrorResolution:
33    @abstractmethod
34    def interpret_response(
35        self, response: Optional[Union[requests.Response, Exception]]
36    ) -> ErrorResolution:
37        """
38        Interpret the response or exception and return the corresponding response action, failure type, and error message.
39
40        :param response: The HTTP response object or exception raised during the request.
41        :return: A tuple containing the response action, failure type, and error message.
42        """
43        pass

Interpret the response or exception and return the corresponding response action, failure type, and error message.

Parameters
  • response: The HTTP response object or exception raised during the request.
Returns

A tuple containing the response action, failure type, and error message.

@dataclass
class HttpResponseFilter:
 25@dataclass
 26class HttpResponseFilter:
 27    """
 28    Filter to select a response based on its HTTP status code, error message or a predicate.
 29    If a response matches the filter, the response action, failure_type, and error message are returned as an ErrorResolution object.
 30    For http_codes declared in the filter, the failure_type will default to `system_error`.
 31    To override default failure_type use configured failure_type with ResponseAction.FAIL.
 32
 33    Attributes:
 34        action (Union[ResponseAction, str]): action to execute if a request matches
 35        failure_type (Union[ResponseAction, str]): failure type of traced exception if a response matches the filter
 36        http_codes (Set[int]): http code of matching requests
 37        error_message_contains (str): error substring of matching requests
 38        predicate (str): predicate to apply to determine if a request is matching
 39        error_message (Union[InterpolatedString, str): error message to display if the response matches the filter
 40    """
 41
 42    config: Config
 43    parameters: InitVar[Mapping[str, Any]]
 44    action: Optional[Union[ResponseAction, str]] = None
 45    failure_type: Optional[Union[FailureType, str]] = None
 46    http_codes: Optional[Set[int]] = None
 47    error_message_contains: Optional[str] = None
 48    predicate: Union[InterpolatedBoolean, str] = ""
 49    error_message: Union[InterpolatedString, str] = ""
 50
 51    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 52        if self.action is not None:
 53            if (
 54                self.http_codes is None
 55                and self.predicate is None
 56                and self.error_message_contains is None
 57            ):
 58                raise ValueError(
 59                    "HttpResponseFilter requires a filter condition if an action is specified"
 60                )
 61            elif isinstance(self.action, str):
 62                self.action = ResponseAction[self.action]
 63        self.http_codes = self.http_codes or set()
 64        if isinstance(self.predicate, str):
 65            self.predicate = InterpolatedBoolean(condition=self.predicate, parameters=parameters)
 66        self.error_message = InterpolatedString.create(
 67            string_or_interpolated=self.error_message, parameters=parameters
 68        )
 69        self._error_message_parser = JsonErrorMessageParser()
 70        if self.failure_type and isinstance(self.failure_type, str):
 71            self.failure_type = FailureType[self.failure_type]
 72
 73    def matches(
 74        self, response_or_exception: Optional[Union[requests.Response, Exception]]
 75    ) -> Optional[ErrorResolution]:
 76        filter_action = self._matches_filter(response_or_exception)
 77        mapped_key = (
 78            response_or_exception.status_code
 79            if isinstance(response_or_exception, requests.Response)
 80            else response_or_exception.__class__
 81        )
 82
 83        if isinstance(mapped_key, (int, Exception)):
 84            default_mapped_error_resolution = self._match_default_error_mapping(mapped_key)
 85        else:
 86            default_mapped_error_resolution = None
 87
 88        if filter_action is not None:
 89            default_error_message = (
 90                default_mapped_error_resolution.error_message
 91                if default_mapped_error_resolution
 92                else ""
 93            )
 94            error_message = None
 95            if isinstance(response_or_exception, requests.Response):
 96                error_message = self._create_error_message(response_or_exception)
 97            error_message = error_message or default_error_message
 98
 99            if self.failure_type and filter_action == ResponseAction.FAIL:
100                failure_type = self.failure_type
101            elif default_mapped_error_resolution:
102                failure_type = default_mapped_error_resolution.failure_type
103            else:
104                failure_type = FailureType.system_error
105
106            return ErrorResolution(
107                response_action=filter_action,
108                failure_type=failure_type,
109                error_message=error_message,
110            )
111
112        if (
113            (isinstance(self.http_codes, list) and len(self.http_codes)) is None
114            and self.predicate is None
115            and self.error_message_contains is None
116        ) and default_mapped_error_resolution:
117            return default_mapped_error_resolution
118
119        return None
120
121    def _match_default_error_mapping(
122        self, mapped_key: Union[int, type[Exception]]
123    ) -> Optional[ErrorResolution]:
124        return DEFAULT_ERROR_MAPPING.get(mapped_key)
125
126    def _matches_filter(
127        self, response_or_exception: Optional[Union[requests.Response, Exception]]
128    ) -> Optional[ResponseAction]:
129        """
130        Apply the HTTP filter on the response and return the action to execute if it matches
131        :param response: The HTTP response to evaluate
132        :return: The action to execute. None if the response does not match the filter
133        """
134        if isinstance(response_or_exception, requests.Response) and (
135            response_or_exception.status_code in self.http_codes  # type: ignore # http_codes set is always initialized to a value in __post_init__
136            or self._response_matches_predicate(response_or_exception)
137            or self._response_contains_error_message(response_or_exception)
138        ):
139            return self.action  # type: ignore # action is always cast to a ResponseAction not a str
140        return None
141
142    @staticmethod
143    def _safe_response_json(response: requests.Response) -> dict[str, Any]:
144        try:
145            return response.json()  # type: ignore # Response.json() returns a dictionary even if the signature does not
146        except requests.exceptions.JSONDecodeError:
147            return {}
148
149    def _create_error_message(self, response: requests.Response) -> Optional[str]:
150        """
151        Construct an error message based on the specified message template of the filter.
152        :param response: The HTTP response which can be used during interpolation
153        :return: The evaluated error message string to be emitted
154        """
155        return self.error_message.eval(  # type: ignore[no-any-return, union-attr]
156            self.config, response=self._safe_response_json(response), headers=response.headers
157        )
158
159    def _response_matches_predicate(self, response: requests.Response) -> bool:
160        return (
161            bool(
162                self.predicate.condition  # type:ignore[union-attr]
163                and self.predicate.eval(  # type:ignore[union-attr]
164                    None,  # type: ignore[arg-type]
165                    response=self._safe_response_json(response),
166                    headers=response.headers,
167                )
168            )
169            if self.predicate
170            else False
171        )
172
173    def _response_contains_error_message(self, response: requests.Response) -> bool:
174        if not self.error_message_contains:
175            return False
176        else:
177            error_message = self._error_message_parser.parse_response_error_message(
178                response=response
179            )
180            return bool(error_message and self.error_message_contains in error_message)

Filter to select a response based on its HTTP status code, error message or a predicate. If a response matches the filter, the response action, failure_type, and error message are returned as an ErrorResolution object. For http_codes declared in the filter, the failure_type will default to system_error. To override default failure_type use configured failure_type with ResponseAction.FAIL.

Attributes:
  • action (Union[ResponseAction, str]): action to execute if a request matches
  • failure_type (Union[ResponseAction, str]): failure type of traced exception if a response matches the filter
  • http_codes (Set[int]): http code of matching requests
  • error_message_contains (str): error substring of matching requests
  • predicate (str): predicate to apply to determine if a request is matching
  • error_message (Union[InterpolatedString, str): error message to display if the response matches the filter
HttpResponseFilter( config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], action: Union[airbyte_cdk.sources.streams.http.error_handlers.ResponseAction, str, NoneType] = None, failure_type: Union[airbyte_protocol_dataclasses.models.airbyte_protocol.FailureType, str, NoneType] = None, http_codes: Optional[Set[int]] = None, error_message_contains: Optional[str] = None, predicate: Union[airbyte_cdk.InterpolatedBoolean, str] = '', error_message: Union[airbyte_cdk.InterpolatedString, str] = '')
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
failure_type: Union[airbyte_protocol_dataclasses.models.airbyte_protocol.FailureType, str, NoneType] = None
http_codes: Optional[Set[int]] = None
error_message_contains: Optional[str] = None
predicate: Union[airbyte_cdk.InterpolatedBoolean, str] = ''
error_message: Union[airbyte_cdk.InterpolatedString, str] = ''
def matches( self, response_or_exception: Union[requests.models.Response, Exception, NoneType]) -> Optional[airbyte_cdk.sources.streams.http.error_handlers.ErrorResolution]:
 73    def matches(
 74        self, response_or_exception: Optional[Union[requests.Response, Exception]]
 75    ) -> Optional[ErrorResolution]:
 76        filter_action = self._matches_filter(response_or_exception)
 77        mapped_key = (
 78            response_or_exception.status_code
 79            if isinstance(response_or_exception, requests.Response)
 80            else response_or_exception.__class__
 81        )
 82
 83        if isinstance(mapped_key, (int, Exception)):
 84            default_mapped_error_resolution = self._match_default_error_mapping(mapped_key)
 85        else:
 86            default_mapped_error_resolution = None
 87
 88        if filter_action is not None:
 89            default_error_message = (
 90                default_mapped_error_resolution.error_message
 91                if default_mapped_error_resolution
 92                else ""
 93            )
 94            error_message = None
 95            if isinstance(response_or_exception, requests.Response):
 96                error_message = self._create_error_message(response_or_exception)
 97            error_message = error_message or default_error_message
 98
 99            if self.failure_type and filter_action == ResponseAction.FAIL:
100                failure_type = self.failure_type
101            elif default_mapped_error_resolution:
102                failure_type = default_mapped_error_resolution.failure_type
103            else:
104                failure_type = FailureType.system_error
105
106            return ErrorResolution(
107                response_action=filter_action,
108                failure_type=failure_type,
109                error_message=error_message,
110            )
111
112        if (
113            (isinstance(self.http_codes, list) and len(self.http_codes)) is None
114            and self.predicate is None
115            and self.error_message_contains is None
116        ) and default_mapped_error_resolution:
117            return default_mapped_error_resolution
118
119        return None