airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies.constant_backoff_strategy import ( 6 ConstantBackoffStrategy, 7) 8from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies.exponential_backoff_strategy import ( 9 ExponentialBackoffStrategy, 10) 11from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies.wait_time_from_header_backoff_strategy import ( 12 WaitTimeFromHeaderBackoffStrategy, 13) 14from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies.wait_until_time_from_header_backoff_strategy import ( 15 WaitUntilTimeFromHeaderBackoffStrategy, 16) 17 18__all__ = [ 19 "ConstantBackoffStrategy", 20 "ExponentialBackoffStrategy", 21 "WaitTimeFromHeaderBackoffStrategy", 22 "WaitUntilTimeFromHeaderBackoffStrategy", 23]
16@dataclass 17class ConstantBackoffStrategy(BackoffStrategy): 18 """ 19 Backoff strategy with a constant backoff interval 20 21 Attributes: 22 backoff_time_in_seconds (float): time to backoff before retrying a retryable request. 23 """ 24 25 backoff_time_in_seconds: Union[float, InterpolatedString, str] 26 parameters: InitVar[Mapping[str, Any]] 27 config: Config 28 29 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 30 if not isinstance(self.backoff_time_in_seconds, InterpolatedString): 31 self.backoff_time_in_seconds = str(self.backoff_time_in_seconds) 32 if isinstance(self.backoff_time_in_seconds, float): 33 self.backoff_time_in_seconds = InterpolatedString.create( 34 str(self.backoff_time_in_seconds), parameters=parameters 35 ) 36 else: 37 self.backoff_time_in_seconds = InterpolatedString.create( 38 self.backoff_time_in_seconds, parameters=parameters 39 ) 40 41 def backoff_time( 42 self, 43 response_or_exception: Optional[Union[requests.Response, requests.RequestException]], 44 attempt_count: int, 45 ) -> Optional[float]: 46 return self.backoff_time_in_seconds.eval(self.config) # type: ignore # backoff_time_in_seconds is always cast to an interpolated string
Backoff strategy with a constant backoff interval
Attributes:
- backoff_time_in_seconds (float): time to backoff before retrying a retryable request.
41 def backoff_time( 42 self, 43 response_or_exception: Optional[Union[requests.Response, requests.RequestException]], 44 attempt_count: int, 45 ) -> Optional[float]: 46 return self.backoff_time_in_seconds.eval(self.config) # type: ignore # backoff_time_in_seconds is always cast to an interpolated string
Override this method to dynamically determine backoff time e.g: by reading the X-Retry-After header.
This method is called only if should_backoff() returns True for the input request.
Parameters
- response_or_exception: The response or exception that caused the backoff.
- attempt_count: The number of attempts already performed for this request. :return how long to backoff in seconds. The return value may be a floating point number for subsecond precision. Returning None defers backoff to the default backoff behavior (e.g using an exponential algorithm).
16@dataclass 17class ExponentialBackoffStrategy(BackoffStrategy): 18 """ 19 Backoff strategy with an exponential backoff interval 20 21 Attributes: 22 factor (float): multiplicative factor 23 """ 24 25 parameters: InitVar[Mapping[str, Any]] 26 config: Config 27 factor: Union[float, InterpolatedString, str] = 5 28 29 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 30 if not isinstance(self.factor, InterpolatedString): 31 self.factor = str(self.factor) 32 if isinstance(self.factor, float): 33 self._factor = InterpolatedString.create(str(self.factor), parameters=parameters) 34 else: 35 self._factor = InterpolatedString.create(self.factor, parameters=parameters) 36 37 @property 38 def _retry_factor(self) -> float: 39 return self._factor.eval(self.config) # type: ignore # factor is always cast to an interpolated string 40 41 def backoff_time( 42 self, 43 response_or_exception: Optional[Union[requests.Response, requests.RequestException]], 44 attempt_count: int, 45 ) -> Optional[float]: 46 return self._retry_factor * 2**attempt_count # type: ignore # factor is always cast to an interpolated string
Backoff strategy with an exponential backoff interval
Attributes:
- factor (float): multiplicative factor
41 def backoff_time( 42 self, 43 response_or_exception: Optional[Union[requests.Response, requests.RequestException]], 44 attempt_count: int, 45 ) -> Optional[float]: 46 return self._retry_factor * 2**attempt_count # type: ignore # factor is always cast to an interpolated string
Override this method to dynamically determine backoff time e.g: by reading the X-Retry-After header.
This method is called only if should_backoff() returns True for the input request.
Parameters
- response_or_exception: The response or exception that caused the backoff.
- attempt_count: The number of attempts already performed for this request. :return how long to backoff in seconds. The return value may be a floating point number for subsecond precision. Returning None defers backoff to the default backoff behavior (e.g using an exponential algorithm).
24@dataclass 25class WaitTimeFromHeaderBackoffStrategy(BackoffStrategy): 26 """ 27 Extract wait time from http header 28 29 Attributes: 30 header (str): header to read wait time from 31 regex (Optional[str]): optional regex to apply on the header to extract its value 32 max_waiting_time_in_seconds: (Optional[float]): given the value extracted from the header is greater than this value, stop the stream 33 """ 34 35 header: Union[InterpolatedString, str] 36 parameters: InitVar[Mapping[str, Any]] 37 config: Config 38 regex: Optional[Union[InterpolatedString, str]] = None 39 max_waiting_time_in_seconds: Optional[float] = None 40 41 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 42 self.regex = ( 43 InterpolatedString.create(self.regex, parameters=parameters) if self.regex else None 44 ) 45 self.header = InterpolatedString.create(self.header, parameters=parameters) 46 47 def backoff_time( 48 self, 49 response_or_exception: Optional[Union[requests.Response, requests.RequestException]], 50 attempt_count: int, 51 ) -> Optional[float]: 52 header = self.header.eval(config=self.config) # type: ignore # header is always cast to an interpolated stream 53 if self.regex: 54 evaled_regex = self.regex.eval(self.config) # type: ignore # header is always cast to an interpolated string 55 regex = re.compile(evaled_regex) 56 else: 57 regex = None 58 header_value = None 59 if isinstance(response_or_exception, requests.Response): 60 header_value = get_numeric_value_from_header(response_or_exception, header, regex) 61 if ( 62 self.max_waiting_time_in_seconds 63 and header_value 64 and header_value >= self.max_waiting_time_in_seconds 65 ): 66 raise AirbyteTracedException( 67 internal_message=f"Rate limit wait time {header_value} is greater than max waiting time of {self.max_waiting_time_in_seconds} seconds. Stopping the stream...", 68 message="The rate limit is greater than max waiting time has been reached.", 69 failure_type=FailureType.transient_error, 70 ) 71 return header_value
Extract wait time from http header
Attributes:
- header (str): header to read wait time from
- regex (Optional[str]): optional regex to apply on the header to extract its value
- max_waiting_time_in_seconds: (Optional[float]): given the value extracted from the header is greater than this value, stop the stream
47 def backoff_time( 48 self, 49 response_or_exception: Optional[Union[requests.Response, requests.RequestException]], 50 attempt_count: int, 51 ) -> Optional[float]: 52 header = self.header.eval(config=self.config) # type: ignore # header is always cast to an interpolated stream 53 if self.regex: 54 evaled_regex = self.regex.eval(self.config) # type: ignore # header is always cast to an interpolated string 55 regex = re.compile(evaled_regex) 56 else: 57 regex = None 58 header_value = None 59 if isinstance(response_or_exception, requests.Response): 60 header_value = get_numeric_value_from_header(response_or_exception, header, regex) 61 if ( 62 self.max_waiting_time_in_seconds 63 and header_value 64 and header_value >= self.max_waiting_time_in_seconds 65 ): 66 raise AirbyteTracedException( 67 internal_message=f"Rate limit wait time {header_value} is greater than max waiting time of {self.max_waiting_time_in_seconds} seconds. Stopping the stream...", 68 message="The rate limit is greater than max waiting time has been reached.", 69 failure_type=FailureType.transient_error, 70 ) 71 return header_value
Override this method to dynamically determine backoff time e.g: by reading the X-Retry-After header.
This method is called only if should_backoff() returns True for the input request.
Parameters
- response_or_exception: The response or exception that caused the backoff.
- attempt_count: The number of attempts already performed for this request. :return how long to backoff in seconds. The return value may be a floating point number for subsecond precision. Returning None defers backoff to the default backoff behavior (e.g using an exponential algorithm).
24@dataclass 25class WaitUntilTimeFromHeaderBackoffStrategy(BackoffStrategy): 26 """ 27 Extract time at which we can retry the request from response header 28 and wait for the difference between now and that time 29 30 Attributes: 31 header (str): header to read wait time from 32 min_wait (Optional[float]): minimum time to wait for safety 33 regex (Optional[str]): optional regex to apply on the header to extract its value 34 """ 35 36 header: Union[InterpolatedString, str] 37 parameters: InitVar[Mapping[str, Any]] 38 config: Config 39 min_wait: Optional[Union[float, InterpolatedString, str]] = None 40 regex: Optional[Union[InterpolatedString, str]] = None 41 42 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 43 self.header = InterpolatedString.create(self.header, parameters=parameters) 44 self.regex = ( 45 InterpolatedString.create(self.regex, parameters=parameters) if self.regex else None 46 ) 47 if not isinstance(self.min_wait, InterpolatedString): 48 self.min_wait = InterpolatedString.create(str(self.min_wait), parameters=parameters) 49 50 def backoff_time( 51 self, 52 response_or_exception: Optional[Union[requests.Response, requests.RequestException]], 53 attempt_count: int, 54 ) -> Optional[float]: 55 now = time.time() 56 header = self.header.eval(self.config) # type: ignore # header is always cast to an interpolated string 57 if self.regex: 58 evaled_regex = self.regex.eval(self.config) # type: ignore # header is always cast to an interpolated string 59 regex = re.compile(evaled_regex) 60 else: 61 regex = None 62 wait_until = None 63 if isinstance(response_or_exception, requests.Response): 64 wait_until = get_numeric_value_from_header(response_or_exception, header, regex) 65 min_wait = self.min_wait.eval(self.config) # type: ignore # header is always cast to an interpolated string 66 if wait_until is None or not wait_until: 67 return float(min_wait) if min_wait else None 68 if (isinstance(wait_until, str) and wait_until.isnumeric()) or isinstance( 69 wait_until, numbers.Number 70 ): 71 wait_time = float(wait_until) - now 72 else: 73 return float(min_wait) 74 if min_wait: 75 return float(max(wait_time, min_wait)) 76 elif wait_time < 0: 77 return None 78 return wait_time
Extract time at which we can retry the request from response header and wait for the difference between now and that time
Attributes:
- header (str): header to read wait time from
- min_wait (Optional[float]): minimum time to wait for safety
- regex (Optional[str]): optional regex to apply on the header to extract its value
50 def backoff_time( 51 self, 52 response_or_exception: Optional[Union[requests.Response, requests.RequestException]], 53 attempt_count: int, 54 ) -> Optional[float]: 55 now = time.time() 56 header = self.header.eval(self.config) # type: ignore # header is always cast to an interpolated string 57 if self.regex: 58 evaled_regex = self.regex.eval(self.config) # type: ignore # header is always cast to an interpolated string 59 regex = re.compile(evaled_regex) 60 else: 61 regex = None 62 wait_until = None 63 if isinstance(response_or_exception, requests.Response): 64 wait_until = get_numeric_value_from_header(response_or_exception, header, regex) 65 min_wait = self.min_wait.eval(self.config) # type: ignore # header is always cast to an interpolated string 66 if wait_until is None or not wait_until: 67 return float(min_wait) if min_wait else None 68 if (isinstance(wait_until, str) and wait_until.isnumeric()) or isinstance( 69 wait_until, numbers.Number 70 ): 71 wait_time = float(wait_until) - now 72 else: 73 return float(min_wait) 74 if min_wait: 75 return float(max(wait_time, min_wait)) 76 elif wait_time < 0: 77 return None 78 return wait_time
Override this method to dynamically determine backoff time e.g: by reading the X-Retry-After header.
This method is called only if should_backoff() returns True for the input request.
Parameters
- response_or_exception: The response or exception that caused the backoff.
- attempt_count: The number of attempts already performed for this request. :return how long to backoff in seconds. The return value may be a floating point number for subsecond precision. Returning None defers backoff to the default backoff behavior (e.g using an exponential algorithm).