airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies.constant_backoff_strategy import (
 6    ConstantBackoffStrategy,
 7)
 8from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies.exponential_backoff_strategy import (
 9    ExponentialBackoffStrategy,
10)
11from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies.wait_time_from_header_backoff_strategy import (
12    WaitTimeFromHeaderBackoffStrategy,
13)
14from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies.wait_until_time_from_header_backoff_strategy import (
15    WaitUntilTimeFromHeaderBackoffStrategy,
16)
17
18__all__ = [
19    "ConstantBackoffStrategy",
20    "ExponentialBackoffStrategy",
21    "WaitTimeFromHeaderBackoffStrategy",
22    "WaitUntilTimeFromHeaderBackoffStrategy",
23]
@dataclass
class ConstantBackoffStrategy(airbyte_cdk.sources.streams.http.error_handlers.backoff_strategy.BackoffStrategy):
16@dataclass
17class ConstantBackoffStrategy(BackoffStrategy):
18    """
19    Backoff strategy with a constant backoff interval
20
21    Attributes:
22        backoff_time_in_seconds (float): time to backoff before retrying a retryable request.
23    """
24
25    backoff_time_in_seconds: Union[float, InterpolatedString, str]
26    parameters: InitVar[Mapping[str, Any]]
27    config: Config
28
29    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
30        if not isinstance(self.backoff_time_in_seconds, InterpolatedString):
31            self.backoff_time_in_seconds = str(self.backoff_time_in_seconds)
32        if isinstance(self.backoff_time_in_seconds, float):
33            self.backoff_time_in_seconds = InterpolatedString.create(
34                str(self.backoff_time_in_seconds), parameters=parameters
35            )
36        else:
37            self.backoff_time_in_seconds = InterpolatedString.create(
38                self.backoff_time_in_seconds, parameters=parameters
39            )
40
41    def backoff_time(
42        self,
43        response_or_exception: Optional[Union[requests.Response, requests.RequestException]],
44        attempt_count: int,
45    ) -> Optional[float]:
46        return self.backoff_time_in_seconds.eval(self.config)  # type: ignore # backoff_time_in_seconds is always cast to an interpolated string

Backoff strategy with a constant backoff interval

Attributes:
  • backoff_time_in_seconds (float): time to backoff before retrying a retryable request.
ConstantBackoffStrategy( backoff_time_in_seconds: Union[float, airbyte_cdk.InterpolatedString, str], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], config: Mapping[str, Any])
backoff_time_in_seconds: Union[float, airbyte_cdk.InterpolatedString, str]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
config: Mapping[str, Any]
def backoff_time( self, response_or_exception: Union[requests.models.Response, requests.exceptions.RequestException, NoneType], attempt_count: int) -> Optional[float]:
41    def backoff_time(
42        self,
43        response_or_exception: Optional[Union[requests.Response, requests.RequestException]],
44        attempt_count: int,
45    ) -> Optional[float]:
46        return self.backoff_time_in_seconds.eval(self.config)  # type: ignore # backoff_time_in_seconds is always cast to an interpolated string

Override this method to dynamically determine backoff time e.g: by reading the X-Retry-After header.

This method is called only if should_backoff() returns True for the input request.

Parameters
  • response_or_exception: The response or exception that caused the backoff.
  • attempt_count: The number of attempts already performed for this request. :return how long to backoff in seconds. The return value may be a floating point number for subsecond precision. Returning None defers backoff to the default backoff behavior (e.g using an exponential algorithm).
@dataclass
class ExponentialBackoffStrategy(airbyte_cdk.sources.streams.http.error_handlers.backoff_strategy.BackoffStrategy):
16@dataclass
17class ExponentialBackoffStrategy(BackoffStrategy):
18    """
19    Backoff strategy with an exponential backoff interval
20
21    Attributes:
22        factor (float): multiplicative factor
23    """
24
25    parameters: InitVar[Mapping[str, Any]]
26    config: Config
27    factor: Union[float, InterpolatedString, str] = 5
28
29    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
30        if not isinstance(self.factor, InterpolatedString):
31            self.factor = str(self.factor)
32        if isinstance(self.factor, float):
33            self._factor = InterpolatedString.create(str(self.factor), parameters=parameters)
34        else:
35            self._factor = InterpolatedString.create(self.factor, parameters=parameters)
36
37    @property
38    def _retry_factor(self) -> float:
39        return self._factor.eval(self.config)  # type: ignore # factor is always cast to an interpolated string
40
41    def backoff_time(
42        self,
43        response_or_exception: Optional[Union[requests.Response, requests.RequestException]],
44        attempt_count: int,
45    ) -> Optional[float]:
46        return self._retry_factor * 2**attempt_count  # type: ignore # factor is always cast to an interpolated string

Backoff strategy with an exponential backoff interval

Attributes:
  • factor (float): multiplicative factor
ExponentialBackoffStrategy( parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], config: Mapping[str, Any], factor: Union[float, airbyte_cdk.InterpolatedString, str] = 5)
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
config: Mapping[str, Any]
factor: Union[float, airbyte_cdk.InterpolatedString, str] = 5
def backoff_time( self, response_or_exception: Union[requests.models.Response, requests.exceptions.RequestException, NoneType], attempt_count: int) -> Optional[float]:
41    def backoff_time(
42        self,
43        response_or_exception: Optional[Union[requests.Response, requests.RequestException]],
44        attempt_count: int,
45    ) -> Optional[float]:
46        return self._retry_factor * 2**attempt_count  # type: ignore # factor is always cast to an interpolated string

Override this method to dynamically determine backoff time e.g: by reading the X-Retry-After header.

This method is called only if should_backoff() returns True for the input request.

Parameters
  • response_or_exception: The response or exception that caused the backoff.
  • attempt_count: The number of attempts already performed for this request. :return how long to backoff in seconds. The return value may be a floating point number for subsecond precision. Returning None defers backoff to the default backoff behavior (e.g using an exponential algorithm).
@dataclass
class WaitTimeFromHeaderBackoffStrategy(airbyte_cdk.sources.streams.http.error_handlers.backoff_strategy.BackoffStrategy):
24@dataclass
25class WaitTimeFromHeaderBackoffStrategy(BackoffStrategy):
26    """
27    Extract wait time from http header
28
29    Attributes:
30        header (str): header to read wait time from
31        regex (Optional[str]): optional regex to apply on the header to extract its value
32        max_waiting_time_in_seconds: (Optional[float]): given the value extracted from the header is greater than this value, stop the stream
33    """
34
35    header: Union[InterpolatedString, str]
36    parameters: InitVar[Mapping[str, Any]]
37    config: Config
38    regex: Optional[Union[InterpolatedString, str]] = None
39    max_waiting_time_in_seconds: Optional[float] = None
40
41    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
42        self.regex = (
43            InterpolatedString.create(self.regex, parameters=parameters) if self.regex else None
44        )
45        self.header = InterpolatedString.create(self.header, parameters=parameters)
46
47    def backoff_time(
48        self,
49        response_or_exception: Optional[Union[requests.Response, requests.RequestException]],
50        attempt_count: int,
51    ) -> Optional[float]:
52        header = self.header.eval(config=self.config)  # type: ignore  # header is always cast to an interpolated stream
53        if self.regex:
54            evaled_regex = self.regex.eval(self.config)  # type: ignore # header is always cast to an interpolated string
55            regex = re.compile(evaled_regex)
56        else:
57            regex = None
58        header_value = None
59        if isinstance(response_or_exception, requests.Response):
60            header_value = get_numeric_value_from_header(response_or_exception, header, regex)
61            if (
62                self.max_waiting_time_in_seconds
63                and header_value
64                and header_value >= self.max_waiting_time_in_seconds
65            ):
66                raise AirbyteTracedException(
67                    internal_message=f"Rate limit wait time {header_value} is greater than max waiting time of {self.max_waiting_time_in_seconds} seconds. Stopping the stream...",
68                    message="The rate limit is greater than max waiting time has been reached.",
69                    failure_type=FailureType.transient_error,
70                )
71        return header_value

Extract wait time from http header

Attributes:
  • header (str): header to read wait time from
  • regex (Optional[str]): optional regex to apply on the header to extract its value
  • max_waiting_time_in_seconds: (Optional[float]): given the value extracted from the header is greater than this value, stop the stream
WaitTimeFromHeaderBackoffStrategy( header: Union[airbyte_cdk.InterpolatedString, str], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], config: Mapping[str, Any], regex: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, max_waiting_time_in_seconds: Optional[float] = None)
header: Union[airbyte_cdk.InterpolatedString, str]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
config: Mapping[str, Any]
regex: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
max_waiting_time_in_seconds: Optional[float] = None
def backoff_time( self, response_or_exception: Union[requests.models.Response, requests.exceptions.RequestException, NoneType], attempt_count: int) -> Optional[float]:
47    def backoff_time(
48        self,
49        response_or_exception: Optional[Union[requests.Response, requests.RequestException]],
50        attempt_count: int,
51    ) -> Optional[float]:
52        header = self.header.eval(config=self.config)  # type: ignore  # header is always cast to an interpolated stream
53        if self.regex:
54            evaled_regex = self.regex.eval(self.config)  # type: ignore # header is always cast to an interpolated string
55            regex = re.compile(evaled_regex)
56        else:
57            regex = None
58        header_value = None
59        if isinstance(response_or_exception, requests.Response):
60            header_value = get_numeric_value_from_header(response_or_exception, header, regex)
61            if (
62                self.max_waiting_time_in_seconds
63                and header_value
64                and header_value >= self.max_waiting_time_in_seconds
65            ):
66                raise AirbyteTracedException(
67                    internal_message=f"Rate limit wait time {header_value} is greater than max waiting time of {self.max_waiting_time_in_seconds} seconds. Stopping the stream...",
68                    message="The rate limit is greater than max waiting time has been reached.",
69                    failure_type=FailureType.transient_error,
70                )
71        return header_value

Override this method to dynamically determine backoff time e.g: by reading the X-Retry-After header.

This method is called only if should_backoff() returns True for the input request.

Parameters
  • response_or_exception: The response or exception that caused the backoff.
  • attempt_count: The number of attempts already performed for this request. :return how long to backoff in seconds. The return value may be a floating point number for subsecond precision. Returning None defers backoff to the default backoff behavior (e.g using an exponential algorithm).
@dataclass
class WaitUntilTimeFromHeaderBackoffStrategy(airbyte_cdk.sources.streams.http.error_handlers.backoff_strategy.BackoffStrategy):
24@dataclass
25class WaitUntilTimeFromHeaderBackoffStrategy(BackoffStrategy):
26    """
27    Extract time at which we can retry the request from response header
28    and wait for the difference between now and that time
29
30    Attributes:
31        header (str): header to read wait time from
32        min_wait (Optional[float]): minimum time to wait for safety
33        regex (Optional[str]): optional regex to apply on the header to extract its value
34    """
35
36    header: Union[InterpolatedString, str]
37    parameters: InitVar[Mapping[str, Any]]
38    config: Config
39    min_wait: Optional[Union[float, InterpolatedString, str]] = None
40    regex: Optional[Union[InterpolatedString, str]] = None
41
42    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
43        self.header = InterpolatedString.create(self.header, parameters=parameters)
44        self.regex = (
45            InterpolatedString.create(self.regex, parameters=parameters) if self.regex else None
46        )
47        if not isinstance(self.min_wait, InterpolatedString):
48            self.min_wait = InterpolatedString.create(str(self.min_wait), parameters=parameters)
49
50    def backoff_time(
51        self,
52        response_or_exception: Optional[Union[requests.Response, requests.RequestException]],
53        attempt_count: int,
54    ) -> Optional[float]:
55        now = time.time()
56        header = self.header.eval(self.config)  # type: ignore # header is always cast to an interpolated string
57        if self.regex:
58            evaled_regex = self.regex.eval(self.config)  # type: ignore # header is always cast to an interpolated string
59            regex = re.compile(evaled_regex)
60        else:
61            regex = None
62        wait_until = None
63        if isinstance(response_or_exception, requests.Response):
64            wait_until = get_numeric_value_from_header(response_or_exception, header, regex)
65        min_wait = self.min_wait.eval(self.config)  # type: ignore # header is always cast to an interpolated string
66        if wait_until is None or not wait_until:
67            return float(min_wait) if min_wait else None
68        if (isinstance(wait_until, str) and wait_until.isnumeric()) or isinstance(
69            wait_until, numbers.Number
70        ):
71            wait_time = float(wait_until) - now
72        else:
73            return float(min_wait)
74        if min_wait:
75            return float(max(wait_time, min_wait))
76        elif wait_time < 0:
77            return None
78        return wait_time

Extract time at which we can retry the request from response header and wait for the difference between now and that time

Attributes:
  • header (str): header to read wait time from
  • min_wait (Optional[float]): minimum time to wait for safety
  • regex (Optional[str]): optional regex to apply on the header to extract its value
WaitUntilTimeFromHeaderBackoffStrategy( header: Union[airbyte_cdk.InterpolatedString, str], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], config: Mapping[str, Any], min_wait: Union[float, airbyte_cdk.InterpolatedString, str, NoneType] = None, regex: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None)
header: Union[airbyte_cdk.InterpolatedString, str]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
config: Mapping[str, Any]
min_wait: Union[float, airbyte_cdk.InterpolatedString, str, NoneType] = None
regex: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
def backoff_time( self, response_or_exception: Union[requests.models.Response, requests.exceptions.RequestException, NoneType], attempt_count: int) -> Optional[float]:
50    def backoff_time(
51        self,
52        response_or_exception: Optional[Union[requests.Response, requests.RequestException]],
53        attempt_count: int,
54    ) -> Optional[float]:
55        now = time.time()
56        header = self.header.eval(self.config)  # type: ignore # header is always cast to an interpolated string
57        if self.regex:
58            evaled_regex = self.regex.eval(self.config)  # type: ignore # header is always cast to an interpolated string
59            regex = re.compile(evaled_regex)
60        else:
61            regex = None
62        wait_until = None
63        if isinstance(response_or_exception, requests.Response):
64            wait_until = get_numeric_value_from_header(response_or_exception, header, regex)
65        min_wait = self.min_wait.eval(self.config)  # type: ignore # header is always cast to an interpolated string
66        if wait_until is None or not wait_until:
67            return float(min_wait) if min_wait else None
68        if (isinstance(wait_until, str) and wait_until.isnumeric()) or isinstance(
69            wait_until, numbers.Number
70        ):
71            wait_time = float(wait_until) - now
72        else:
73            return float(min_wait)
74        if min_wait:
75            return float(max(wait_time, min_wait))
76        elif wait_time < 0:
77            return None
78        return wait_time

Override this method to dynamically determine backoff time e.g: by reading the X-Retry-After header.

This method is called only if should_backoff() returns True for the input request.

Parameters
  • response_or_exception: The response or exception that caused the backoff.
  • attempt_count: The number of attempts already performed for this request. :return how long to backoff in seconds. The return value may be a floating point number for subsecond precision. Returning None defers backoff to the default backoff behavior (e.g using an exponential algorithm).