airbyte_cdk.sources.streams.call_rate

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5import abc
  6import dataclasses
  7import datetime
  8import logging
  9import re
 10import time
 11from datetime import timedelta
 12from threading import RLock
 13from typing import TYPE_CHECKING, Any, Mapping, Optional
 14from urllib import parse
 15
 16import requests
 17import requests_cache
 18from pyrate_limiter import InMemoryBucket, Limiter, RateItem, TimeClock
 19from pyrate_limiter import Rate as PyRateRate
 20from pyrate_limiter.exceptions import BucketFullException
 21
 22# prevents mypy from complaining about missing session attributes in LimiterMixin
 23if TYPE_CHECKING:
 24    MIXIN_BASE = requests.Session
 25else:
 26    MIXIN_BASE = object
 27
 28logger = logging.getLogger("airbyte")
 29logging.getLogger("pyrate_limiter").setLevel(logging.WARNING)
 30
 31
 32@dataclasses.dataclass
 33class Rate:
 34    """Call rate limit"""
 35
 36    limit: int
 37    interval: timedelta
 38
 39
 40class CallRateLimitHit(Exception):
 41    def __init__(self, error: str, item: Any, weight: int, rate: str, time_to_wait: timedelta):
 42        """Constructor
 43
 44        :param error: error message
 45        :param item: object passed into acquire_call
 46        :param weight: how many credits were requested
 47        :param rate: string representation of the rate violated
 48        :param time_to_wait: how long should wait util more call will be available
 49        """
 50        self.item = item
 51        self.weight = weight
 52        self.rate = rate
 53        self.time_to_wait = time_to_wait
 54        super().__init__(error)
 55
 56
 57class AbstractCallRatePolicy(abc.ABC):
 58    """Call rate policy interface.
 59    Should be configurable with different rules, like N per M for endpoint X. Endpoint X is matched with APIBudget.
 60    """
 61
 62    @abc.abstractmethod
 63    def matches(self, request: Any) -> bool:
 64        """Tells if this policy matches specific request and should apply to it
 65
 66        :param request:
 67        :return: True if policy should apply to this request, False - otherwise
 68        """
 69
 70    @abc.abstractmethod
 71    def try_acquire(self, request: Any, weight: int) -> None:
 72        """Try to acquire request
 73
 74        :param request: a request object representing a single call to API
 75        :param weight: number of requests to deduct from credit
 76        :return:
 77        """
 78
 79    @abc.abstractmethod
 80    def update(
 81        self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime]
 82    ) -> None:
 83        """Update call rate counting with current values
 84
 85        :param available_calls:
 86        :param call_reset_ts:
 87        """
 88
 89
 90class RequestMatcher(abc.ABC):
 91    """Callable that help to match a request object with call rate policies."""
 92
 93    @abc.abstractmethod
 94    def __call__(self, request: Any) -> bool:
 95        """
 96
 97        :param request:
 98        :return: True if matches the provided request object, False - otherwise
 99        """
100
101
102class HttpRequestMatcher(RequestMatcher):
103    """Simple implementation of RequestMatcher for HTTP requests using HttpRequestRegexMatcher under the hood."""
104
105    def __init__(
106        self,
107        method: Optional[str] = None,
108        url: Optional[str] = None,
109        params: Optional[Mapping[str, Any]] = None,
110        headers: Optional[Mapping[str, Any]] = None,
111    ):
112        """Constructor
113
114        :param method: HTTP method (e.g., "GET", "POST").
115        :param url: Full URL to match.
116        :param params: Dictionary of query parameters to match.
117        :param headers: Dictionary of headers to match.
118        """
119        # Parse the URL to extract the base and path
120        if url:
121            parsed_url = parse.urlsplit(url)
122            url_base = f"{parsed_url.scheme}://{parsed_url.netloc}"
123            url_path = parsed_url.path if parsed_url.path != "/" else None
124        else:
125            url_base = None
126            url_path = None
127
128        # Use HttpRequestRegexMatcher under the hood
129        self._regex_matcher = HttpRequestRegexMatcher(
130            method=method,
131            url_base=url_base,
132            url_path_pattern=re.escape(url_path) if url_path else None,
133            params=params,
134            headers=headers,
135        )
136
137    def __call__(self, request: Any) -> bool:
138        """
139        :param request: A requests.Request or requests.PreparedRequest instance.
140        :return: True if the request matches all provided criteria; False otherwise.
141        """
142        return self._regex_matcher(request)
143
144    def __str__(self) -> str:
145        return (
146            f"HttpRequestMatcher(method={self._regex_matcher._method}, "
147            f"url={self._regex_matcher._url_base}{self._regex_matcher._url_path_pattern.pattern if self._regex_matcher._url_path_pattern else ''}, "
148            f"params={self._regex_matcher._params}, headers={self._regex_matcher._headers})"
149        )
150
151
152class HttpRequestRegexMatcher(RequestMatcher):
153    """
154    Extended RequestMatcher for HTTP requests that supports matching on:
155      - HTTP method (case-insensitive)
156      - URL base (scheme + netloc) optionally
157      - URL path pattern (a regex applied to the path portion of the URL)
158      - Query parameters (must be present)
159      - Headers (header names compared case-insensitively)
160    """
161
162    def __init__(
163        self,
164        method: Optional[str] = None,
165        url_base: Optional[str] = None,
166        url_path_pattern: Optional[str] = None,
167        params: Optional[Mapping[str, Any]] = None,
168        headers: Optional[Mapping[str, Any]] = None,
169        weight: Optional[int] = None,
170    ):
171        """
172        :param method: HTTP method (e.g. "GET", "POST"); compared case-insensitively.
173        :param url_base: Base URL (scheme://host) that must match.
174        :param url_path_pattern: A regex pattern that will be applied to the path portion of the URL.
175        :param params: Dictionary of query parameters that must be present in the request.
176        :param headers: Dictionary of headers that must be present (header keys are compared case-insensitively).
177        :param weight: The weight of a request matching this matcher. If set, this value is used
178            when acquiring a call from the rate limiter, enabling cost-based rate limiting
179            where different endpoints consume different amounts from a shared budget.
180            If not set, each request counts as 1.
181        """
182        if weight is not None and weight < 1:
183            raise ValueError(f"weight must be >= 1, got {weight}")
184        self._weight = weight
185        self._method = method.upper() if method else None
186
187        # Normalize the url_base if provided: remove trailing slash.
188        self._url_base = url_base.rstrip("/") if url_base else None
189
190        # Compile the URL path pattern if provided.
191        self._url_path_pattern = re.compile(url_path_pattern) if url_path_pattern else None
192
193        # Normalize query parameters to strings.
194        self._params = {str(k): str(v) for k, v in (params or {}).items()}
195
196        # Normalize header keys to lowercase.
197        self._headers = {str(k).lower(): str(v) for k, v in (headers or {}).items()}
198
199    @staticmethod
200    def _match_dict(obj: Mapping[str, Any], pattern: Mapping[str, Any]) -> bool:
201        """Check that every key/value in the pattern exists in the object."""
202        return pattern.items() <= obj.items()
203
204    def __call__(self, request: Any) -> bool:
205        """
206        :param request: A requests.Request or requests.PreparedRequest instance.
207        :return: True if the request matches all provided criteria; False otherwise.
208        """
209        # Prepare the request (if needed) and extract the URL details.
210        if isinstance(request, requests.Request):
211            prepared_request = request.prepare()
212        elif isinstance(request, requests.PreparedRequest):
213            prepared_request = request
214        else:
215            return False
216
217        # Check HTTP method.
218        if self._method is not None:
219            if prepared_request.method != self._method:
220                return False
221
222        # Parse the URL.
223        parsed_url = parse.urlsplit(prepared_request.url)
224        # Reconstruct the base: scheme://netloc
225        request_url_base = f"{str(parsed_url.scheme)}://{str(parsed_url.netloc)}"
226        # The path (without query parameters)
227        request_path = str(parsed_url.path).rstrip("/")
228
229        # If a base URL is provided, check that it matches.
230        if self._url_base is not None:
231            if request_url_base != self._url_base:
232                return False
233
234        # If a URL path pattern is provided, ensure the path matches the regex.
235        if self._url_path_pattern is not None:
236            if not self._url_path_pattern.search(request_path):
237                return False
238
239        # Check query parameters.
240        if self._params:
241            query_params = dict(parse.parse_qsl(str(parsed_url.query)))
242            if not self._match_dict(query_params, self._params):
243                return False
244
245        # Check headers (normalize keys to lower-case).
246        if self._headers:
247            req_headers = {k.lower(): v for k, v in prepared_request.headers.items()}
248            if not self._match_dict(req_headers, self._headers):
249                return False
250
251        return True
252
253    @property
254    def weight(self) -> Optional[int]:
255        """The weight of a request matching this matcher, or None if not set."""
256        return self._weight
257
258    def __str__(self) -> str:
259        regex = self._url_path_pattern.pattern if self._url_path_pattern else None
260        return (
261            f"HttpRequestRegexMatcher(method={self._method}, url_base={self._url_base}, "
262            f"url_path_pattern={regex}, params={self._params}, headers={self._headers}, weight={self._weight})"
263        )
264
265
266class BaseCallRatePolicy(AbstractCallRatePolicy, abc.ABC):
267    def __init__(self, matchers: list[RequestMatcher]):
268        self._matchers = matchers
269
270    def matches(self, request: Any) -> bool:
271        """Tell if this policy matches specific request and should apply to it
272
273        :param request:
274        :return: True if policy should apply to this request, False - otherwise
275        """
276
277        if not self._matchers:
278            return True
279        return any(matcher(request) for matcher in self._matchers)
280
281    def get_weight(self, request: Any) -> int:
282        """Get the weight for a request based on the first matching matcher.
283
284        If a matcher has a weight configured, that weight is used.
285        Otherwise, defaults to 1.
286
287        :param request: a request object
288        :return: the weight for this request
289        """
290        for matcher in self._matchers:
291            if matcher(request):
292                if isinstance(matcher, HttpRequestRegexMatcher) and matcher.weight is not None:
293                    return matcher.weight
294                return 1
295        return 1
296
297
298class UnlimitedCallRatePolicy(BaseCallRatePolicy):
299    """
300    This policy is for explicit unlimited call rates.
301    It can be used when we want to match a specific group of requests and don't apply any limits.
302
303    Example:
304
305    APICallBudget(
306        [
307            UnlimitedCallRatePolicy(
308                matchers=[HttpRequestMatcher(url="/some/method", headers={"sandbox": true})],
309            ),
310            FixedWindowCallRatePolicy(
311                matchers=[HttpRequestMatcher(url="/some/method")],
312                next_reset_ts=datetime.now(),
313                period=timedelta(hours=1)
314                call_limit=1000,
315            ),
316        ]
317    )
318
319    The code above will limit all calls to /some/method except calls that have header sandbox=True
320    """
321
322    def try_acquire(self, request: Any, weight: int) -> None:
323        """Do nothing"""
324
325    def update(
326        self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime]
327    ) -> None:
328        """Do nothing"""
329
330
331class FixedWindowCallRatePolicy(BaseCallRatePolicy):
332    def __init__(
333        self,
334        next_reset_ts: datetime.datetime,
335        period: timedelta,
336        call_limit: int,
337        matchers: list[RequestMatcher],
338    ):
339        """A policy that allows {call_limit} calls within a {period} time interval
340
341        :param next_reset_ts: next call rate reset time point
342        :param period: call rate reset period
343        :param call_limit:
344        :param matchers:
345        """
346
347        self._next_reset_ts = next_reset_ts
348        self._offset = period
349        self._call_limit = call_limit
350        self._calls_num = 0
351        self._lock = RLock()
352        super().__init__(matchers=matchers)
353
354    def try_acquire(self, request: Any, weight: int) -> None:
355        if weight > self._call_limit:
356            raise ValueError("Weight can not exceed the call limit")
357        if not self.matches(request):
358            raise ValueError("Request does not match the policy")
359
360        with self._lock:
361            self._update_current_window()
362
363            if self._calls_num + weight > self._call_limit:
364                reset_in = self._next_reset_ts - datetime.datetime.now()
365                error_message = (
366                    f"reached maximum number of allowed calls {self._call_limit} "
367                    f"per {self._offset} interval, next reset in {reset_in}."
368                )
369                raise CallRateLimitHit(
370                    error=error_message,
371                    item=request,
372                    weight=weight,
373                    rate=f"{self._call_limit} per {self._offset}",
374                    time_to_wait=reset_in,
375                )
376
377            self._calls_num += weight
378
379    def __str__(self) -> str:
380        matcher_str = ", ".join(f"{matcher}" for matcher in self._matchers)
381        return (
382            f"FixedWindowCallRatePolicy(call_limit={self._call_limit}, period={self._offset}, "
383            f"calls_used={self._calls_num}, next_reset={self._next_reset_ts}, "
384            f"matchers=[{matcher_str}])"
385        )
386
387    def update(
388        self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime]
389    ) -> None:
390        """Update call rate counters, by default, only reacts to decreasing updates of available_calls and changes to call_reset_ts.
391        We ignore updates with available_calls > current_available_calls to support call rate limits that are lower than API limits.
392
393        :param available_calls:
394        :param call_reset_ts:
395        """
396        with self._lock:
397            self._update_current_window()
398            current_available_calls = self._call_limit - self._calls_num
399
400            if available_calls is not None and current_available_calls > available_calls:
401                logger.debug(
402                    "got rate limit update from api, adjusting available calls from %s to %s",
403                    current_available_calls,
404                    available_calls,
405                )
406                self._calls_num = self._call_limit - available_calls
407
408            if call_reset_ts is not None and call_reset_ts != self._next_reset_ts:
409                logger.debug(
410                    "got rate limit update from api, adjusting reset time from %s to %s",
411                    self._next_reset_ts,
412                    call_reset_ts,
413                )
414                self._next_reset_ts = call_reset_ts
415
416    def _update_current_window(self) -> None:
417        now = datetime.datetime.now()
418        if now > self._next_reset_ts:
419            logger.debug("started new window, %s calls available now", self._call_limit)
420            self._next_reset_ts = self._next_reset_ts + self._offset
421            self._calls_num = 0
422
423
424class MovingWindowCallRatePolicy(BaseCallRatePolicy):
425    """
426    Policy to control requests rate implemented on top of PyRateLimiter lib.
427    The main difference between this policy and FixedWindowCallRatePolicy is that the rate-limiting window
428    is moving along requests that we made, and there is no moment when we reset an available number of calls.
429    This strategy requires saving of timestamps of all requests within a window.
430    """
431
432    def __init__(self, rates: list[Rate], matchers: list[RequestMatcher]):
433        """Constructor
434
435        :param rates: list of rates, the order is important and must be ascending
436        :param matchers:
437        """
438        if not rates:
439            raise ValueError("The list of rates can not be empty")
440        pyrate_rates = [
441            PyRateRate(limit=rate.limit, interval=int(rate.interval.total_seconds() * 1000))
442            for rate in rates
443        ]
444        self._bucket = InMemoryBucket(pyrate_rates)
445        # Limiter will create the background task that clears old requests in the bucket
446        self._limiter = Limiter(self._bucket)
447        super().__init__(matchers=matchers)
448
449    def try_acquire(self, request: Any, weight: int) -> None:
450        if not self.matches(request):
451            raise ValueError("Request does not match the policy")
452        lowest_limit = min(rate.limit for rate in self._bucket.rates)
453        if weight > lowest_limit:
454            raise ValueError(
455                f"Weight can not exceed the lowest configured rate limit ({lowest_limit})"
456            )
457
458        try:
459            self._limiter.try_acquire(request, weight=weight)
460        except BucketFullException as exc:
461            item = self._limiter.bucket_factory.wrap_item(request, weight)
462            assert isinstance(item, RateItem)
463
464            with self._limiter.lock:
465                time_to_wait = self._bucket.waiting(item)
466                assert isinstance(time_to_wait, int)
467
468                raise CallRateLimitHit(
469                    error=str(exc.meta_info["error"]),
470                    item=request,
471                    weight=int(exc.meta_info["weight"]),
472                    rate=str(exc.meta_info["rate"]),
473                    time_to_wait=timedelta(milliseconds=time_to_wait),
474                )
475
476    def update(
477        self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime]
478    ) -> None:
479        """Adjust call bucket to reflect the state of the API server
480
481        :param available_calls:
482        :param call_reset_ts:
483        :return:
484        """
485        if (
486            available_calls is not None and call_reset_ts is None
487        ):  # we do our best to sync buckets with API
488            if available_calls == 0:
489                with self._limiter.lock:
490                    items_to_add = self._bucket.count() < self._bucket.rates[0].limit
491                    if items_to_add > 0:
492                        now: int = TimeClock().now()  # type: ignore[no-untyped-call]
493                        self._bucket.put(RateItem(name="dummy", timestamp=now, weight=items_to_add))
494        # TODO: add support if needed, it might be that it is not possible to make a good solution for this case
495        # if available_calls is not None and call_reset_ts is not None:
496        #     ts = call_reset_ts.timestamp()
497
498    def __str__(self) -> str:
499        """Return a human-friendly description of the moving window rate policy for logging purposes."""
500        rates_info = ", ".join(
501            f"{rate.limit} per {timedelta(milliseconds=rate.interval)}"
502            for rate in self._bucket.rates
503        )
504        current_bucket_count = self._bucket.count()
505        matcher_str = ", ".join(f"{matcher}" for matcher in self._matchers)
506        return (
507            f"MovingWindowCallRatePolicy(rates=[{rates_info}], current_bucket_count={current_bucket_count}, "
508            f"matchers=[{matcher_str}])"
509        )
510
511
512class AbstractAPIBudget(abc.ABC):
513    """Interface to some API where a client allowed to have N calls per T interval.
514
515    Important: APIBudget is not doing any API calls, the end user code is responsible to call this interface
516        to respect call rate limitation of the API.
517
518    It supports multiple policies applied to different group of requests. To distinct these groups we use RequestMatchers.
519    Individual policy represented by MovingWindowCallRatePolicy and currently supports only moving window strategy.
520    """
521
522    @abc.abstractmethod
523    def acquire_call(
524        self, request: Any, block: bool = True, timeout: Optional[float] = None
525    ) -> None:
526        """Try to get a call from budget, will block by default
527
528        :param request:
529        :param block: when true (default) will block the current thread until call credit is available
530        :param timeout: if set will limit maximum time in block, otherwise will wait until credit is available
531        :raises: CallRateLimitHit - when no credits left and if timeout was set the waiting time exceed the timeout
532        """
533
534    @abc.abstractmethod
535    def get_matching_policy(self, request: Any) -> Optional[AbstractCallRatePolicy]:
536        """Find matching call rate policy for specific request"""
537
538    @abc.abstractmethod
539    def update_from_response(self, request: Any, response: Any) -> None:
540        """Update budget information based on response from API
541
542        :param request: the initial request that triggered this response
543        :param response: response from the API
544        """
545
546
547class APIBudget(AbstractAPIBudget):
548    """Default APIBudget implementation"""
549
550    def __init__(
551        self, policies: list[AbstractCallRatePolicy], maximum_attempts_to_acquire: int = 100000
552    ) -> None:
553        """Constructor
554
555        :param policies: list of policies in this budget
556        :param maximum_attempts_to_acquire: number of attempts before throwing hit ratelimit exception, we put some big number here
557         to avoid situations when many threads compete with each other for a few lots over a significant amount of time
558        """
559
560        self._policies = policies
561        self._maximum_attempts_to_acquire = maximum_attempts_to_acquire
562
563    def _extract_endpoint(self, request: Any) -> str:
564        """Extract the endpoint URL from the request if available."""
565        endpoint = None
566        try:
567            # If the request is already a PreparedRequest, it should have a URL.
568            if isinstance(request, requests.PreparedRequest):
569                endpoint = request.url
570            # If it's a requests.Request, we call prepare() to extract the URL.
571            elif isinstance(request, requests.Request):
572                prepared = request.prepare()
573                endpoint = prepared.url
574        except Exception as e:
575            logger.debug(f"Error extracting endpoint: {e}")
576        if endpoint:
577            return endpoint
578        return "unknown endpoint"
579
580    def get_matching_policy(self, request: Any) -> Optional[AbstractCallRatePolicy]:
581        for policy in self._policies:
582            if policy.matches(request):
583                return policy
584        return None
585
586    def acquire_call(
587        self, request: Any, block: bool = True, timeout: Optional[float] = None
588    ) -> None:
589        """Try to get a call from budget, will block by default.
590        Matchers will be called sequentially in the same order they were added.
591        The first matcher that returns True will
592
593        :param request: the API request
594        :param block: when True (default) will block until a call credit is available
595        :param timeout: if provided, limits maximum waiting time; otherwise, waits indefinitely
596        :raises: CallRateLimitHit if the call credit cannot be acquired within the timeout
597        """
598
599        policy = self.get_matching_policy(request)
600        endpoint = self._extract_endpoint(request)
601        if policy:
602            logger.debug(f"Acquiring call for endpoint {endpoint} using policy: {policy}")
603            self._do_acquire(request=request, policy=policy, block=block, timeout=timeout)
604        elif self._policies:
605            logger.debug(
606                f"No policies matched for endpoint {endpoint} (request: {request}). Allowing call by default."
607            )
608
609    def update_from_response(self, request: Any, response: Any) -> None:
610        """Update budget information based on the API response.
611
612        :param request: the initial request that triggered this response
613        :param response: response from the API
614        """
615        pass
616
617    def _do_acquire(
618        self, request: Any, policy: AbstractCallRatePolicy, block: bool, timeout: Optional[float]
619    ) -> None:
620        """Internal method to try to acquire a call credit.
621
622        :param request: the API request
623        :param policy: the matching rate-limiting policy
624        :param block: indicates whether to block until a call credit is available
625        :param timeout: maximum time to wait if blocking
626        :raises: CallRateLimitHit if unable to acquire a call credit
627        """
628        last_exception = None
629        endpoint = self._extract_endpoint(request)
630        # sometimes we spend all budget before a second attempt, so we have a few more attempts
631        for attempt in range(1, self._maximum_attempts_to_acquire):
632            try:
633                weight = policy.get_weight(request) if isinstance(policy, BaseCallRatePolicy) else 1
634                policy.try_acquire(request, weight=weight)
635                return
636            except CallRateLimitHit as exc:
637                last_exception = exc
638                if block:
639                    if timeout is not None:
640                        time_to_wait = min(timedelta(seconds=timeout), exc.time_to_wait)
641                    else:
642                        time_to_wait = exc.time_to_wait
643                    # Ensure we never sleep for a negative duration.
644                    time_to_wait = max(timedelta(0), time_to_wait)
645                    logger.debug(
646                        f"Policy {policy} reached call limit for endpoint {endpoint} ({exc.rate}). "
647                        f"Sleeping for {time_to_wait} on attempt {attempt}."
648                    )
649                    time.sleep(time_to_wait.total_seconds())
650                else:
651                    logger.debug(
652                        f"Policy {policy} reached call limit for endpoint {endpoint} ({exc.rate}) "
653                        f"and blocking is disabled."
654                    )
655                    raise
656
657        if last_exception:
658            logger.debug(
659                f"Exhausted all {self._maximum_attempts_to_acquire} attempts to acquire a call for endpoint {endpoint} "
660                f"using policy: {policy}"
661            )
662            raise last_exception
663
664
665class HttpAPIBudget(APIBudget):
666    """Implementation of AbstractAPIBudget for HTTP"""
667
668    def __init__(
669        self,
670        ratelimit_reset_header: str = "ratelimit-reset",
671        ratelimit_remaining_header: str = "ratelimit-remaining",
672        status_codes_for_ratelimit_hit: list[int] = [429],
673        **kwargs: Any,
674    ):
675        """Constructor
676
677        :param ratelimit_reset_header: name of the header that has a timestamp of the next reset of call budget
678        :param ratelimit_remaining_header: name of the header that has the number of calls left
679        :param status_codes_for_ratelimit_hit: list of HTTP status codes that signal about rate limit being hit
680        """
681        self._ratelimit_reset_header = ratelimit_reset_header
682        self._ratelimit_remaining_header = ratelimit_remaining_header
683        self._status_codes_for_ratelimit_hit = status_codes_for_ratelimit_hit
684        super().__init__(**kwargs)
685
686    def update_from_response(self, request: Any, response: Any) -> None:
687        policy = self.get_matching_policy(request)
688        if not policy:
689            return
690
691        if isinstance(response, requests.Response):
692            available_calls = self.get_calls_left_from_response(response)
693            reset_ts = self.get_reset_ts_from_response(response)
694            policy.update(available_calls=available_calls, call_reset_ts=reset_ts)
695
696    def get_reset_ts_from_response(
697        self, response: requests.Response
698    ) -> Optional[datetime.datetime]:
699        if response.headers.get(self._ratelimit_reset_header):
700            return datetime.datetime.fromtimestamp(
701                int(response.headers[self._ratelimit_reset_header])
702            )
703        return None
704
705    def get_calls_left_from_response(self, response: requests.Response) -> Optional[int]:
706        if response.headers.get(self._ratelimit_remaining_header):
707            return int(response.headers[self._ratelimit_remaining_header])
708
709        if response.status_code in self._status_codes_for_ratelimit_hit:
710            return 0
711
712        return None
713
714
715class LimiterMixin(MIXIN_BASE):
716    """Mixin class that adds rate-limiting behavior to requests."""
717
718    def __init__(
719        self,
720        api_budget: AbstractAPIBudget,
721        **kwargs: Any,
722    ):
723        self._api_budget = api_budget
724        super().__init__(**kwargs)  # type: ignore # Base Session doesn't take any kwargs
725
726    def send(self, request: requests.PreparedRequest, **kwargs: Any) -> requests.Response:
727        """Send a request with rate-limiting."""
728        self._api_budget.acquire_call(request)
729        response = super().send(request, **kwargs)
730        self._api_budget.update_from_response(request, response)
731        return response
732
733
734class LimiterSession(LimiterMixin, requests.Session):
735    """Session that adds rate-limiting behavior to requests."""
736
737
738class CachedLimiterSession(requests_cache.CacheMixin, LimiterMixin, requests.Session):
739    """Session class with caching and rate-limiting behavior."""
logger = <Logger airbyte (INFO)>
@dataclasses.dataclass
class Rate:
33@dataclasses.dataclass
34class Rate:
35    """Call rate limit"""
36
37    limit: int
38    interval: timedelta

Call rate limit

Rate(limit: int, interval: datetime.timedelta)
limit: int
interval: datetime.timedelta
class CallRateLimitHit(builtins.Exception):
41class CallRateLimitHit(Exception):
42    def __init__(self, error: str, item: Any, weight: int, rate: str, time_to_wait: timedelta):
43        """Constructor
44
45        :param error: error message
46        :param item: object passed into acquire_call
47        :param weight: how many credits were requested
48        :param rate: string representation of the rate violated
49        :param time_to_wait: how long should wait util more call will be available
50        """
51        self.item = item
52        self.weight = weight
53        self.rate = rate
54        self.time_to_wait = time_to_wait
55        super().__init__(error)

Common base class for all non-exit exceptions.

CallRateLimitHit( error: str, item: Any, weight: int, rate: str, time_to_wait: datetime.timedelta)
42    def __init__(self, error: str, item: Any, weight: int, rate: str, time_to_wait: timedelta):
43        """Constructor
44
45        :param error: error message
46        :param item: object passed into acquire_call
47        :param weight: how many credits were requested
48        :param rate: string representation of the rate violated
49        :param time_to_wait: how long should wait util more call will be available
50        """
51        self.item = item
52        self.weight = weight
53        self.rate = rate
54        self.time_to_wait = time_to_wait
55        super().__init__(error)

Constructor

Parameters
  • error: error message
  • item: object passed into acquire_call
  • weight: how many credits were requested
  • rate: string representation of the rate violated
  • time_to_wait: how long should wait util more call will be available
item
weight
rate
time_to_wait
class AbstractCallRatePolicy(abc.ABC):
58class AbstractCallRatePolicy(abc.ABC):
59    """Call rate policy interface.
60    Should be configurable with different rules, like N per M for endpoint X. Endpoint X is matched with APIBudget.
61    """
62
63    @abc.abstractmethod
64    def matches(self, request: Any) -> bool:
65        """Tells if this policy matches specific request and should apply to it
66
67        :param request:
68        :return: True if policy should apply to this request, False - otherwise
69        """
70
71    @abc.abstractmethod
72    def try_acquire(self, request: Any, weight: int) -> None:
73        """Try to acquire request
74
75        :param request: a request object representing a single call to API
76        :param weight: number of requests to deduct from credit
77        :return:
78        """
79
80    @abc.abstractmethod
81    def update(
82        self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime]
83    ) -> None:
84        """Update call rate counting with current values
85
86        :param available_calls:
87        :param call_reset_ts:
88        """

Call rate policy interface. Should be configurable with different rules, like N per M for endpoint X. Endpoint X is matched with APIBudget.

@abc.abstractmethod
def matches(self, request: Any) -> bool:
63    @abc.abstractmethod
64    def matches(self, request: Any) -> bool:
65        """Tells if this policy matches specific request and should apply to it
66
67        :param request:
68        :return: True if policy should apply to this request, False - otherwise
69        """

Tells if this policy matches specific request and should apply to it

Parameters
  • request:
Returns

True if policy should apply to this request, False - otherwise

@abc.abstractmethod
def try_acquire(self, request: Any, weight: int) -> None:
71    @abc.abstractmethod
72    def try_acquire(self, request: Any, weight: int) -> None:
73        """Try to acquire request
74
75        :param request: a request object representing a single call to API
76        :param weight: number of requests to deduct from credit
77        :return:
78        """

Try to acquire request

Parameters
  • request: a request object representing a single call to API
  • weight: number of requests to deduct from credit
Returns
@abc.abstractmethod
def update( self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime]) -> None:
80    @abc.abstractmethod
81    def update(
82        self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime]
83    ) -> None:
84        """Update call rate counting with current values
85
86        :param available_calls:
87        :param call_reset_ts:
88        """

Update call rate counting with current values

Parameters
  • available_calls:
  • call_reset_ts:
class RequestMatcher(abc.ABC):
 91class RequestMatcher(abc.ABC):
 92    """Callable that help to match a request object with call rate policies."""
 93
 94    @abc.abstractmethod
 95    def __call__(self, request: Any) -> bool:
 96        """
 97
 98        :param request:
 99        :return: True if matches the provided request object, False - otherwise
100        """

Callable that help to match a request object with call rate policies.

class HttpRequestMatcher(RequestMatcher):
103class HttpRequestMatcher(RequestMatcher):
104    """Simple implementation of RequestMatcher for HTTP requests using HttpRequestRegexMatcher under the hood."""
105
106    def __init__(
107        self,
108        method: Optional[str] = None,
109        url: Optional[str] = None,
110        params: Optional[Mapping[str, Any]] = None,
111        headers: Optional[Mapping[str, Any]] = None,
112    ):
113        """Constructor
114
115        :param method: HTTP method (e.g., "GET", "POST").
116        :param url: Full URL to match.
117        :param params: Dictionary of query parameters to match.
118        :param headers: Dictionary of headers to match.
119        """
120        # Parse the URL to extract the base and path
121        if url:
122            parsed_url = parse.urlsplit(url)
123            url_base = f"{parsed_url.scheme}://{parsed_url.netloc}"
124            url_path = parsed_url.path if parsed_url.path != "/" else None
125        else:
126            url_base = None
127            url_path = None
128
129        # Use HttpRequestRegexMatcher under the hood
130        self._regex_matcher = HttpRequestRegexMatcher(
131            method=method,
132            url_base=url_base,
133            url_path_pattern=re.escape(url_path) if url_path else None,
134            params=params,
135            headers=headers,
136        )
137
138    def __call__(self, request: Any) -> bool:
139        """
140        :param request: A requests.Request or requests.PreparedRequest instance.
141        :return: True if the request matches all provided criteria; False otherwise.
142        """
143        return self._regex_matcher(request)
144
145    def __str__(self) -> str:
146        return (
147            f"HttpRequestMatcher(method={self._regex_matcher._method}, "
148            f"url={self._regex_matcher._url_base}{self._regex_matcher._url_path_pattern.pattern if self._regex_matcher._url_path_pattern else ''}, "
149            f"params={self._regex_matcher._params}, headers={self._regex_matcher._headers})"
150        )

Simple implementation of RequestMatcher for HTTP requests using HttpRequestRegexMatcher under the hood.

HttpRequestMatcher( method: Optional[str] = None, url: Optional[str] = None, params: Optional[Mapping[str, Any]] = None, headers: Optional[Mapping[str, Any]] = None)
106    def __init__(
107        self,
108        method: Optional[str] = None,
109        url: Optional[str] = None,
110        params: Optional[Mapping[str, Any]] = None,
111        headers: Optional[Mapping[str, Any]] = None,
112    ):
113        """Constructor
114
115        :param method: HTTP method (e.g., "GET", "POST").
116        :param url: Full URL to match.
117        :param params: Dictionary of query parameters to match.
118        :param headers: Dictionary of headers to match.
119        """
120        # Parse the URL to extract the base and path
121        if url:
122            parsed_url = parse.urlsplit(url)
123            url_base = f"{parsed_url.scheme}://{parsed_url.netloc}"
124            url_path = parsed_url.path if parsed_url.path != "/" else None
125        else:
126            url_base = None
127            url_path = None
128
129        # Use HttpRequestRegexMatcher under the hood
130        self._regex_matcher = HttpRequestRegexMatcher(
131            method=method,
132            url_base=url_base,
133            url_path_pattern=re.escape(url_path) if url_path else None,
134            params=params,
135            headers=headers,
136        )

Constructor

Parameters
  • method: HTTP method (e.g., "GET", "POST").
  • url: Full URL to match.
  • params: Dictionary of query parameters to match.
  • headers: Dictionary of headers to match.
class HttpRequestRegexMatcher(RequestMatcher):
153class HttpRequestRegexMatcher(RequestMatcher):
154    """
155    Extended RequestMatcher for HTTP requests that supports matching on:
156      - HTTP method (case-insensitive)
157      - URL base (scheme + netloc) optionally
158      - URL path pattern (a regex applied to the path portion of the URL)
159      - Query parameters (must be present)
160      - Headers (header names compared case-insensitively)
161    """
162
163    def __init__(
164        self,
165        method: Optional[str] = None,
166        url_base: Optional[str] = None,
167        url_path_pattern: Optional[str] = None,
168        params: Optional[Mapping[str, Any]] = None,
169        headers: Optional[Mapping[str, Any]] = None,
170        weight: Optional[int] = None,
171    ):
172        """
173        :param method: HTTP method (e.g. "GET", "POST"); compared case-insensitively.
174        :param url_base: Base URL (scheme://host) that must match.
175        :param url_path_pattern: A regex pattern that will be applied to the path portion of the URL.
176        :param params: Dictionary of query parameters that must be present in the request.
177        :param headers: Dictionary of headers that must be present (header keys are compared case-insensitively).
178        :param weight: The weight of a request matching this matcher. If set, this value is used
179            when acquiring a call from the rate limiter, enabling cost-based rate limiting
180            where different endpoints consume different amounts from a shared budget.
181            If not set, each request counts as 1.
182        """
183        if weight is not None and weight < 1:
184            raise ValueError(f"weight must be >= 1, got {weight}")
185        self._weight = weight
186        self._method = method.upper() if method else None
187
188        # Normalize the url_base if provided: remove trailing slash.
189        self._url_base = url_base.rstrip("/") if url_base else None
190
191        # Compile the URL path pattern if provided.
192        self._url_path_pattern = re.compile(url_path_pattern) if url_path_pattern else None
193
194        # Normalize query parameters to strings.
195        self._params = {str(k): str(v) for k, v in (params or {}).items()}
196
197        # Normalize header keys to lowercase.
198        self._headers = {str(k).lower(): str(v) for k, v in (headers or {}).items()}
199
200    @staticmethod
201    def _match_dict(obj: Mapping[str, Any], pattern: Mapping[str, Any]) -> bool:
202        """Check that every key/value in the pattern exists in the object."""
203        return pattern.items() <= obj.items()
204
205    def __call__(self, request: Any) -> bool:
206        """
207        :param request: A requests.Request or requests.PreparedRequest instance.
208        :return: True if the request matches all provided criteria; False otherwise.
209        """
210        # Prepare the request (if needed) and extract the URL details.
211        if isinstance(request, requests.Request):
212            prepared_request = request.prepare()
213        elif isinstance(request, requests.PreparedRequest):
214            prepared_request = request
215        else:
216            return False
217
218        # Check HTTP method.
219        if self._method is not None:
220            if prepared_request.method != self._method:
221                return False
222
223        # Parse the URL.
224        parsed_url = parse.urlsplit(prepared_request.url)
225        # Reconstruct the base: scheme://netloc
226        request_url_base = f"{str(parsed_url.scheme)}://{str(parsed_url.netloc)}"
227        # The path (without query parameters)
228        request_path = str(parsed_url.path).rstrip("/")
229
230        # If a base URL is provided, check that it matches.
231        if self._url_base is not None:
232            if request_url_base != self._url_base:
233                return False
234
235        # If a URL path pattern is provided, ensure the path matches the regex.
236        if self._url_path_pattern is not None:
237            if not self._url_path_pattern.search(request_path):
238                return False
239
240        # Check query parameters.
241        if self._params:
242            query_params = dict(parse.parse_qsl(str(parsed_url.query)))
243            if not self._match_dict(query_params, self._params):
244                return False
245
246        # Check headers (normalize keys to lower-case).
247        if self._headers:
248            req_headers = {k.lower(): v for k, v in prepared_request.headers.items()}
249            if not self._match_dict(req_headers, self._headers):
250                return False
251
252        return True
253
254    @property
255    def weight(self) -> Optional[int]:
256        """The weight of a request matching this matcher, or None if not set."""
257        return self._weight
258
259    def __str__(self) -> str:
260        regex = self._url_path_pattern.pattern if self._url_path_pattern else None
261        return (
262            f"HttpRequestRegexMatcher(method={self._method}, url_base={self._url_base}, "
263            f"url_path_pattern={regex}, params={self._params}, headers={self._headers}, weight={self._weight})"
264        )
Extended RequestMatcher for HTTP requests that supports matching on:
  • HTTP method (case-insensitive)
  • URL base (scheme + netloc) optionally
  • URL path pattern (a regex applied to the path portion of the URL)
  • Query parameters (must be present)
  • Headers (header names compared case-insensitively)
HttpRequestRegexMatcher( method: Optional[str] = None, url_base: Optional[str] = None, url_path_pattern: Optional[str] = None, params: Optional[Mapping[str, Any]] = None, headers: Optional[Mapping[str, Any]] = None, weight: Optional[int] = None)
163    def __init__(
164        self,
165        method: Optional[str] = None,
166        url_base: Optional[str] = None,
167        url_path_pattern: Optional[str] = None,
168        params: Optional[Mapping[str, Any]] = None,
169        headers: Optional[Mapping[str, Any]] = None,
170        weight: Optional[int] = None,
171    ):
172        """
173        :param method: HTTP method (e.g. "GET", "POST"); compared case-insensitively.
174        :param url_base: Base URL (scheme://host) that must match.
175        :param url_path_pattern: A regex pattern that will be applied to the path portion of the URL.
176        :param params: Dictionary of query parameters that must be present in the request.
177        :param headers: Dictionary of headers that must be present (header keys are compared case-insensitively).
178        :param weight: The weight of a request matching this matcher. If set, this value is used
179            when acquiring a call from the rate limiter, enabling cost-based rate limiting
180            where different endpoints consume different amounts from a shared budget.
181            If not set, each request counts as 1.
182        """
183        if weight is not None and weight < 1:
184            raise ValueError(f"weight must be >= 1, got {weight}")
185        self._weight = weight
186        self._method = method.upper() if method else None
187
188        # Normalize the url_base if provided: remove trailing slash.
189        self._url_base = url_base.rstrip("/") if url_base else None
190
191        # Compile the URL path pattern if provided.
192        self._url_path_pattern = re.compile(url_path_pattern) if url_path_pattern else None
193
194        # Normalize query parameters to strings.
195        self._params = {str(k): str(v) for k, v in (params or {}).items()}
196
197        # Normalize header keys to lowercase.
198        self._headers = {str(k).lower(): str(v) for k, v in (headers or {}).items()}
Parameters
  • method: HTTP method (e.g. "GET", "POST"); compared case-insensitively.
  • url_base: Base URL (scheme: //host) that must match.
  • url_path_pattern: A regex pattern that will be applied to the path portion of the URL.
  • params: Dictionary of query parameters that must be present in the request.
  • headers: Dictionary of headers that must be present (header keys are compared case-insensitively).
  • weight: The weight of a request matching this matcher. If set, this value is used when acquiring a call from the rate limiter, enabling cost-based rate limiting where different endpoints consume different amounts from a shared budget. If not set, each request counts as 1.
weight: Optional[int]
254    @property
255    def weight(self) -> Optional[int]:
256        """The weight of a request matching this matcher, or None if not set."""
257        return self._weight

The weight of a request matching this matcher, or None if not set.

class BaseCallRatePolicy(AbstractCallRatePolicy, abc.ABC):
267class BaseCallRatePolicy(AbstractCallRatePolicy, abc.ABC):
268    def __init__(self, matchers: list[RequestMatcher]):
269        self._matchers = matchers
270
271    def matches(self, request: Any) -> bool:
272        """Tell if this policy matches specific request and should apply to it
273
274        :param request:
275        :return: True if policy should apply to this request, False - otherwise
276        """
277
278        if not self._matchers:
279            return True
280        return any(matcher(request) for matcher in self._matchers)
281
282    def get_weight(self, request: Any) -> int:
283        """Get the weight for a request based on the first matching matcher.
284
285        If a matcher has a weight configured, that weight is used.
286        Otherwise, defaults to 1.
287
288        :param request: a request object
289        :return: the weight for this request
290        """
291        for matcher in self._matchers:
292            if matcher(request):
293                if isinstance(matcher, HttpRequestRegexMatcher) and matcher.weight is not None:
294                    return matcher.weight
295                return 1
296        return 1

Call rate policy interface. Should be configurable with different rules, like N per M for endpoint X. Endpoint X is matched with APIBudget.

def matches(self, request: Any) -> bool:
271    def matches(self, request: Any) -> bool:
272        """Tell if this policy matches specific request and should apply to it
273
274        :param request:
275        :return: True if policy should apply to this request, False - otherwise
276        """
277
278        if not self._matchers:
279            return True
280        return any(matcher(request) for matcher in self._matchers)

Tell if this policy matches specific request and should apply to it

Parameters
  • request:
Returns

True if policy should apply to this request, False - otherwise

def get_weight(self, request: Any) -> int:
282    def get_weight(self, request: Any) -> int:
283        """Get the weight for a request based on the first matching matcher.
284
285        If a matcher has a weight configured, that weight is used.
286        Otherwise, defaults to 1.
287
288        :param request: a request object
289        :return: the weight for this request
290        """
291        for matcher in self._matchers:
292            if matcher(request):
293                if isinstance(matcher, HttpRequestRegexMatcher) and matcher.weight is not None:
294                    return matcher.weight
295                return 1
296        return 1

Get the weight for a request based on the first matching matcher.

If a matcher has a weight configured, that weight is used. Otherwise, defaults to 1.

Parameters
  • request: a request object
Returns

the weight for this request

class UnlimitedCallRatePolicy(BaseCallRatePolicy):
299class UnlimitedCallRatePolicy(BaseCallRatePolicy):
300    """
301    This policy is for explicit unlimited call rates.
302    It can be used when we want to match a specific group of requests and don't apply any limits.
303
304    Example:
305
306    APICallBudget(
307        [
308            UnlimitedCallRatePolicy(
309                matchers=[HttpRequestMatcher(url="/some/method", headers={"sandbox": true})],
310            ),
311            FixedWindowCallRatePolicy(
312                matchers=[HttpRequestMatcher(url="/some/method")],
313                next_reset_ts=datetime.now(),
314                period=timedelta(hours=1)
315                call_limit=1000,
316            ),
317        ]
318    )
319
320    The code above will limit all calls to /some/method except calls that have header sandbox=True
321    """
322
323    def try_acquire(self, request: Any, weight: int) -> None:
324        """Do nothing"""
325
326    def update(
327        self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime]
328    ) -> None:
329        """Do nothing"""

This policy is for explicit unlimited call rates. It can be used when we want to match a specific group of requests and don't apply any limits.

Example:

APICallBudget( [ UnlimitedCallRatePolicy( matchers=[HttpRequestMatcher(url="/some/method", headers={"sandbox": true})], ), FixedWindowCallRatePolicy( matchers=[HttpRequestMatcher(url="/some/method")], next_reset_ts=datetime.now(), period=timedelta(hours=1) call_limit=1000, ), ] )

The code above will limit all calls to /some/method except calls that have header sandbox=True

def try_acquire(self, request: Any, weight: int) -> None:
323    def try_acquire(self, request: Any, weight: int) -> None:
324        """Do nothing"""

Do nothing

def update( self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime]) -> None:
326    def update(
327        self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime]
328    ) -> None:
329        """Do nothing"""

Do nothing

class FixedWindowCallRatePolicy(BaseCallRatePolicy):
332class FixedWindowCallRatePolicy(BaseCallRatePolicy):
333    def __init__(
334        self,
335        next_reset_ts: datetime.datetime,
336        period: timedelta,
337        call_limit: int,
338        matchers: list[RequestMatcher],
339    ):
340        """A policy that allows {call_limit} calls within a {period} time interval
341
342        :param next_reset_ts: next call rate reset time point
343        :param period: call rate reset period
344        :param call_limit:
345        :param matchers:
346        """
347
348        self._next_reset_ts = next_reset_ts
349        self._offset = period
350        self._call_limit = call_limit
351        self._calls_num = 0
352        self._lock = RLock()
353        super().__init__(matchers=matchers)
354
355    def try_acquire(self, request: Any, weight: int) -> None:
356        if weight > self._call_limit:
357            raise ValueError("Weight can not exceed the call limit")
358        if not self.matches(request):
359            raise ValueError("Request does not match the policy")
360
361        with self._lock:
362            self._update_current_window()
363
364            if self._calls_num + weight > self._call_limit:
365                reset_in = self._next_reset_ts - datetime.datetime.now()
366                error_message = (
367                    f"reached maximum number of allowed calls {self._call_limit} "
368                    f"per {self._offset} interval, next reset in {reset_in}."
369                )
370                raise CallRateLimitHit(
371                    error=error_message,
372                    item=request,
373                    weight=weight,
374                    rate=f"{self._call_limit} per {self._offset}",
375                    time_to_wait=reset_in,
376                )
377
378            self._calls_num += weight
379
380    def __str__(self) -> str:
381        matcher_str = ", ".join(f"{matcher}" for matcher in self._matchers)
382        return (
383            f"FixedWindowCallRatePolicy(call_limit={self._call_limit}, period={self._offset}, "
384            f"calls_used={self._calls_num}, next_reset={self._next_reset_ts}, "
385            f"matchers=[{matcher_str}])"
386        )
387
388    def update(
389        self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime]
390    ) -> None:
391        """Update call rate counters, by default, only reacts to decreasing updates of available_calls and changes to call_reset_ts.
392        We ignore updates with available_calls > current_available_calls to support call rate limits that are lower than API limits.
393
394        :param available_calls:
395        :param call_reset_ts:
396        """
397        with self._lock:
398            self._update_current_window()
399            current_available_calls = self._call_limit - self._calls_num
400
401            if available_calls is not None and current_available_calls > available_calls:
402                logger.debug(
403                    "got rate limit update from api, adjusting available calls from %s to %s",
404                    current_available_calls,
405                    available_calls,
406                )
407                self._calls_num = self._call_limit - available_calls
408
409            if call_reset_ts is not None and call_reset_ts != self._next_reset_ts:
410                logger.debug(
411                    "got rate limit update from api, adjusting reset time from %s to %s",
412                    self._next_reset_ts,
413                    call_reset_ts,
414                )
415                self._next_reset_ts = call_reset_ts
416
417    def _update_current_window(self) -> None:
418        now = datetime.datetime.now()
419        if now > self._next_reset_ts:
420            logger.debug("started new window, %s calls available now", self._call_limit)
421            self._next_reset_ts = self._next_reset_ts + self._offset
422            self._calls_num = 0

Call rate policy interface. Should be configurable with different rules, like N per M for endpoint X. Endpoint X is matched with APIBudget.

FixedWindowCallRatePolicy( next_reset_ts: datetime.datetime, period: datetime.timedelta, call_limit: int, matchers: list[RequestMatcher])
333    def __init__(
334        self,
335        next_reset_ts: datetime.datetime,
336        period: timedelta,
337        call_limit: int,
338        matchers: list[RequestMatcher],
339    ):
340        """A policy that allows {call_limit} calls within a {period} time interval
341
342        :param next_reset_ts: next call rate reset time point
343        :param period: call rate reset period
344        :param call_limit:
345        :param matchers:
346        """
347
348        self._next_reset_ts = next_reset_ts
349        self._offset = period
350        self._call_limit = call_limit
351        self._calls_num = 0
352        self._lock = RLock()
353        super().__init__(matchers=matchers)

A policy that allows {call_limit} calls within a {period} time interval

Parameters
  • next_reset_ts: next call rate reset time point
  • period: call rate reset period
  • call_limit:
  • matchers:
def try_acquire(self, request: Any, weight: int) -> None:
355    def try_acquire(self, request: Any, weight: int) -> None:
356        if weight > self._call_limit:
357            raise ValueError("Weight can not exceed the call limit")
358        if not self.matches(request):
359            raise ValueError("Request does not match the policy")
360
361        with self._lock:
362            self._update_current_window()
363
364            if self._calls_num + weight > self._call_limit:
365                reset_in = self._next_reset_ts - datetime.datetime.now()
366                error_message = (
367                    f"reached maximum number of allowed calls {self._call_limit} "
368                    f"per {self._offset} interval, next reset in {reset_in}."
369                )
370                raise CallRateLimitHit(
371                    error=error_message,
372                    item=request,
373                    weight=weight,
374                    rate=f"{self._call_limit} per {self._offset}",
375                    time_to_wait=reset_in,
376                )
377
378            self._calls_num += weight

Try to acquire request

Parameters
  • request: a request object representing a single call to API
  • weight: number of requests to deduct from credit
Returns
def update( self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime]) -> None:
388    def update(
389        self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime]
390    ) -> None:
391        """Update call rate counters, by default, only reacts to decreasing updates of available_calls and changes to call_reset_ts.
392        We ignore updates with available_calls > current_available_calls to support call rate limits that are lower than API limits.
393
394        :param available_calls:
395        :param call_reset_ts:
396        """
397        with self._lock:
398            self._update_current_window()
399            current_available_calls = self._call_limit - self._calls_num
400
401            if available_calls is not None and current_available_calls > available_calls:
402                logger.debug(
403                    "got rate limit update from api, adjusting available calls from %s to %s",
404                    current_available_calls,
405                    available_calls,
406                )
407                self._calls_num = self._call_limit - available_calls
408
409            if call_reset_ts is not None and call_reset_ts != self._next_reset_ts:
410                logger.debug(
411                    "got rate limit update from api, adjusting reset time from %s to %s",
412                    self._next_reset_ts,
413                    call_reset_ts,
414                )
415                self._next_reset_ts = call_reset_ts

Update call rate counters, by default, only reacts to decreasing updates of available_calls and changes to call_reset_ts. We ignore updates with available_calls > current_available_calls to support call rate limits that are lower than API limits.

Parameters
  • available_calls:
  • call_reset_ts:
class MovingWindowCallRatePolicy(BaseCallRatePolicy):
425class MovingWindowCallRatePolicy(BaseCallRatePolicy):
426    """
427    Policy to control requests rate implemented on top of PyRateLimiter lib.
428    The main difference between this policy and FixedWindowCallRatePolicy is that the rate-limiting window
429    is moving along requests that we made, and there is no moment when we reset an available number of calls.
430    This strategy requires saving of timestamps of all requests within a window.
431    """
432
433    def __init__(self, rates: list[Rate], matchers: list[RequestMatcher]):
434        """Constructor
435
436        :param rates: list of rates, the order is important and must be ascending
437        :param matchers:
438        """
439        if not rates:
440            raise ValueError("The list of rates can not be empty")
441        pyrate_rates = [
442            PyRateRate(limit=rate.limit, interval=int(rate.interval.total_seconds() * 1000))
443            for rate in rates
444        ]
445        self._bucket = InMemoryBucket(pyrate_rates)
446        # Limiter will create the background task that clears old requests in the bucket
447        self._limiter = Limiter(self._bucket)
448        super().__init__(matchers=matchers)
449
450    def try_acquire(self, request: Any, weight: int) -> None:
451        if not self.matches(request):
452            raise ValueError("Request does not match the policy")
453        lowest_limit = min(rate.limit for rate in self._bucket.rates)
454        if weight > lowest_limit:
455            raise ValueError(
456                f"Weight can not exceed the lowest configured rate limit ({lowest_limit})"
457            )
458
459        try:
460            self._limiter.try_acquire(request, weight=weight)
461        except BucketFullException as exc:
462            item = self._limiter.bucket_factory.wrap_item(request, weight)
463            assert isinstance(item, RateItem)
464
465            with self._limiter.lock:
466                time_to_wait = self._bucket.waiting(item)
467                assert isinstance(time_to_wait, int)
468
469                raise CallRateLimitHit(
470                    error=str(exc.meta_info["error"]),
471                    item=request,
472                    weight=int(exc.meta_info["weight"]),
473                    rate=str(exc.meta_info["rate"]),
474                    time_to_wait=timedelta(milliseconds=time_to_wait),
475                )
476
477    def update(
478        self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime]
479    ) -> None:
480        """Adjust call bucket to reflect the state of the API server
481
482        :param available_calls:
483        :param call_reset_ts:
484        :return:
485        """
486        if (
487            available_calls is not None and call_reset_ts is None
488        ):  # we do our best to sync buckets with API
489            if available_calls == 0:
490                with self._limiter.lock:
491                    items_to_add = self._bucket.count() < self._bucket.rates[0].limit
492                    if items_to_add > 0:
493                        now: int = TimeClock().now()  # type: ignore[no-untyped-call]
494                        self._bucket.put(RateItem(name="dummy", timestamp=now, weight=items_to_add))
495        # TODO: add support if needed, it might be that it is not possible to make a good solution for this case
496        # if available_calls is not None and call_reset_ts is not None:
497        #     ts = call_reset_ts.timestamp()
498
499    def __str__(self) -> str:
500        """Return a human-friendly description of the moving window rate policy for logging purposes."""
501        rates_info = ", ".join(
502            f"{rate.limit} per {timedelta(milliseconds=rate.interval)}"
503            for rate in self._bucket.rates
504        )
505        current_bucket_count = self._bucket.count()
506        matcher_str = ", ".join(f"{matcher}" for matcher in self._matchers)
507        return (
508            f"MovingWindowCallRatePolicy(rates=[{rates_info}], current_bucket_count={current_bucket_count}, "
509            f"matchers=[{matcher_str}])"
510        )

Policy to control requests rate implemented on top of PyRateLimiter lib. The main difference between this policy and FixedWindowCallRatePolicy is that the rate-limiting window is moving along requests that we made, and there is no moment when we reset an available number of calls. This strategy requires saving of timestamps of all requests within a window.

MovingWindowCallRatePolicy( rates: list[Rate], matchers: list[RequestMatcher])
433    def __init__(self, rates: list[Rate], matchers: list[RequestMatcher]):
434        """Constructor
435
436        :param rates: list of rates, the order is important and must be ascending
437        :param matchers:
438        """
439        if not rates:
440            raise ValueError("The list of rates can not be empty")
441        pyrate_rates = [
442            PyRateRate(limit=rate.limit, interval=int(rate.interval.total_seconds() * 1000))
443            for rate in rates
444        ]
445        self._bucket = InMemoryBucket(pyrate_rates)
446        # Limiter will create the background task that clears old requests in the bucket
447        self._limiter = Limiter(self._bucket)
448        super().__init__(matchers=matchers)

Constructor

Parameters
  • rates: list of rates, the order is important and must be ascending
  • matchers:
def try_acquire(self, request: Any, weight: int) -> None:
450    def try_acquire(self, request: Any, weight: int) -> None:
451        if not self.matches(request):
452            raise ValueError("Request does not match the policy")
453        lowest_limit = min(rate.limit for rate in self._bucket.rates)
454        if weight > lowest_limit:
455            raise ValueError(
456                f"Weight can not exceed the lowest configured rate limit ({lowest_limit})"
457            )
458
459        try:
460            self._limiter.try_acquire(request, weight=weight)
461        except BucketFullException as exc:
462            item = self._limiter.bucket_factory.wrap_item(request, weight)
463            assert isinstance(item, RateItem)
464
465            with self._limiter.lock:
466                time_to_wait = self._bucket.waiting(item)
467                assert isinstance(time_to_wait, int)
468
469                raise CallRateLimitHit(
470                    error=str(exc.meta_info["error"]),
471                    item=request,
472                    weight=int(exc.meta_info["weight"]),
473                    rate=str(exc.meta_info["rate"]),
474                    time_to_wait=timedelta(milliseconds=time_to_wait),
475                )

Try to acquire request

Parameters
  • request: a request object representing a single call to API
  • weight: number of requests to deduct from credit
Returns
def update( self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime]) -> None:
477    def update(
478        self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime]
479    ) -> None:
480        """Adjust call bucket to reflect the state of the API server
481
482        :param available_calls:
483        :param call_reset_ts:
484        :return:
485        """
486        if (
487            available_calls is not None and call_reset_ts is None
488        ):  # we do our best to sync buckets with API
489            if available_calls == 0:
490                with self._limiter.lock:
491                    items_to_add = self._bucket.count() < self._bucket.rates[0].limit
492                    if items_to_add > 0:
493                        now: int = TimeClock().now()  # type: ignore[no-untyped-call]
494                        self._bucket.put(RateItem(name="dummy", timestamp=now, weight=items_to_add))
495        # TODO: add support if needed, it might be that it is not possible to make a good solution for this case
496        # if available_calls is not None and call_reset_ts is not None:
497        #     ts = call_reset_ts.timestamp()

Adjust call bucket to reflect the state of the API server

Parameters
  • available_calls:
  • call_reset_ts:
Returns
class AbstractAPIBudget(abc.ABC):
513class AbstractAPIBudget(abc.ABC):
514    """Interface to some API where a client allowed to have N calls per T interval.
515
516    Important: APIBudget is not doing any API calls, the end user code is responsible to call this interface
517        to respect call rate limitation of the API.
518
519    It supports multiple policies applied to different group of requests. To distinct these groups we use RequestMatchers.
520    Individual policy represented by MovingWindowCallRatePolicy and currently supports only moving window strategy.
521    """
522
523    @abc.abstractmethod
524    def acquire_call(
525        self, request: Any, block: bool = True, timeout: Optional[float] = None
526    ) -> None:
527        """Try to get a call from budget, will block by default
528
529        :param request:
530        :param block: when true (default) will block the current thread until call credit is available
531        :param timeout: if set will limit maximum time in block, otherwise will wait until credit is available
532        :raises: CallRateLimitHit - when no credits left and if timeout was set the waiting time exceed the timeout
533        """
534
535    @abc.abstractmethod
536    def get_matching_policy(self, request: Any) -> Optional[AbstractCallRatePolicy]:
537        """Find matching call rate policy for specific request"""
538
539    @abc.abstractmethod
540    def update_from_response(self, request: Any, response: Any) -> None:
541        """Update budget information based on response from API
542
543        :param request: the initial request that triggered this response
544        :param response: response from the API
545        """

Interface to some API where a client allowed to have N calls per T interval.

Important: APIBudget is not doing any API calls, the end user code is responsible to call this interface to respect call rate limitation of the API.

It supports multiple policies applied to different group of requests. To distinct these groups we use RequestMatchers. Individual policy represented by MovingWindowCallRatePolicy and currently supports only moving window strategy.

@abc.abstractmethod
def acquire_call( self, request: Any, block: bool = True, timeout: Optional[float] = None) -> None:
523    @abc.abstractmethod
524    def acquire_call(
525        self, request: Any, block: bool = True, timeout: Optional[float] = None
526    ) -> None:
527        """Try to get a call from budget, will block by default
528
529        :param request:
530        :param block: when true (default) will block the current thread until call credit is available
531        :param timeout: if set will limit maximum time in block, otherwise will wait until credit is available
532        :raises: CallRateLimitHit - when no credits left and if timeout was set the waiting time exceed the timeout
533        """

Try to get a call from budget, will block by default

Parameters
  • request:
  • block: when true (default) will block the current thread until call credit is available
  • timeout: if set will limit maximum time in block, otherwise will wait until credit is available
Raises
  • CallRateLimitHit - when no credits left and if timeout was set the waiting time exceed the timeout
@abc.abstractmethod
def get_matching_policy( self, request: Any) -> Optional[AbstractCallRatePolicy]:
535    @abc.abstractmethod
536    def get_matching_policy(self, request: Any) -> Optional[AbstractCallRatePolicy]:
537        """Find matching call rate policy for specific request"""

Find matching call rate policy for specific request

@abc.abstractmethod
def update_from_response(self, request: Any, response: Any) -> None:
539    @abc.abstractmethod
540    def update_from_response(self, request: Any, response: Any) -> None:
541        """Update budget information based on response from API
542
543        :param request: the initial request that triggered this response
544        :param response: response from the API
545        """

Update budget information based on response from API

Parameters
  • request: the initial request that triggered this response
  • response: response from the API
class APIBudget(AbstractAPIBudget):
548class APIBudget(AbstractAPIBudget):
549    """Default APIBudget implementation"""
550
551    def __init__(
552        self, policies: list[AbstractCallRatePolicy], maximum_attempts_to_acquire: int = 100000
553    ) -> None:
554        """Constructor
555
556        :param policies: list of policies in this budget
557        :param maximum_attempts_to_acquire: number of attempts before throwing hit ratelimit exception, we put some big number here
558         to avoid situations when many threads compete with each other for a few lots over a significant amount of time
559        """
560
561        self._policies = policies
562        self._maximum_attempts_to_acquire = maximum_attempts_to_acquire
563
564    def _extract_endpoint(self, request: Any) -> str:
565        """Extract the endpoint URL from the request if available."""
566        endpoint = None
567        try:
568            # If the request is already a PreparedRequest, it should have a URL.
569            if isinstance(request, requests.PreparedRequest):
570                endpoint = request.url
571            # If it's a requests.Request, we call prepare() to extract the URL.
572            elif isinstance(request, requests.Request):
573                prepared = request.prepare()
574                endpoint = prepared.url
575        except Exception as e:
576            logger.debug(f"Error extracting endpoint: {e}")
577        if endpoint:
578            return endpoint
579        return "unknown endpoint"
580
581    def get_matching_policy(self, request: Any) -> Optional[AbstractCallRatePolicy]:
582        for policy in self._policies:
583            if policy.matches(request):
584                return policy
585        return None
586
587    def acquire_call(
588        self, request: Any, block: bool = True, timeout: Optional[float] = None
589    ) -> None:
590        """Try to get a call from budget, will block by default.
591        Matchers will be called sequentially in the same order they were added.
592        The first matcher that returns True will
593
594        :param request: the API request
595        :param block: when True (default) will block until a call credit is available
596        :param timeout: if provided, limits maximum waiting time; otherwise, waits indefinitely
597        :raises: CallRateLimitHit if the call credit cannot be acquired within the timeout
598        """
599
600        policy = self.get_matching_policy(request)
601        endpoint = self._extract_endpoint(request)
602        if policy:
603            logger.debug(f"Acquiring call for endpoint {endpoint} using policy: {policy}")
604            self._do_acquire(request=request, policy=policy, block=block, timeout=timeout)
605        elif self._policies:
606            logger.debug(
607                f"No policies matched for endpoint {endpoint} (request: {request}). Allowing call by default."
608            )
609
610    def update_from_response(self, request: Any, response: Any) -> None:
611        """Update budget information based on the API response.
612
613        :param request: the initial request that triggered this response
614        :param response: response from the API
615        """
616        pass
617
618    def _do_acquire(
619        self, request: Any, policy: AbstractCallRatePolicy, block: bool, timeout: Optional[float]
620    ) -> None:
621        """Internal method to try to acquire a call credit.
622
623        :param request: the API request
624        :param policy: the matching rate-limiting policy
625        :param block: indicates whether to block until a call credit is available
626        :param timeout: maximum time to wait if blocking
627        :raises: CallRateLimitHit if unable to acquire a call credit
628        """
629        last_exception = None
630        endpoint = self._extract_endpoint(request)
631        # sometimes we spend all budget before a second attempt, so we have a few more attempts
632        for attempt in range(1, self._maximum_attempts_to_acquire):
633            try:
634                weight = policy.get_weight(request) if isinstance(policy, BaseCallRatePolicy) else 1
635                policy.try_acquire(request, weight=weight)
636                return
637            except CallRateLimitHit as exc:
638                last_exception = exc
639                if block:
640                    if timeout is not None:
641                        time_to_wait = min(timedelta(seconds=timeout), exc.time_to_wait)
642                    else:
643                        time_to_wait = exc.time_to_wait
644                    # Ensure we never sleep for a negative duration.
645                    time_to_wait = max(timedelta(0), time_to_wait)
646                    logger.debug(
647                        f"Policy {policy} reached call limit for endpoint {endpoint} ({exc.rate}). "
648                        f"Sleeping for {time_to_wait} on attempt {attempt}."
649                    )
650                    time.sleep(time_to_wait.total_seconds())
651                else:
652                    logger.debug(
653                        f"Policy {policy} reached call limit for endpoint {endpoint} ({exc.rate}) "
654                        f"and blocking is disabled."
655                    )
656                    raise
657
658        if last_exception:
659            logger.debug(
660                f"Exhausted all {self._maximum_attempts_to_acquire} attempts to acquire a call for endpoint {endpoint} "
661                f"using policy: {policy}"
662            )
663            raise last_exception

Default APIBudget implementation

APIBudget( policies: list[AbstractCallRatePolicy], maximum_attempts_to_acquire: int = 100000)
551    def __init__(
552        self, policies: list[AbstractCallRatePolicy], maximum_attempts_to_acquire: int = 100000
553    ) -> None:
554        """Constructor
555
556        :param policies: list of policies in this budget
557        :param maximum_attempts_to_acquire: number of attempts before throwing hit ratelimit exception, we put some big number here
558         to avoid situations when many threads compete with each other for a few lots over a significant amount of time
559        """
560
561        self._policies = policies
562        self._maximum_attempts_to_acquire = maximum_attempts_to_acquire

Constructor

Parameters
  • policies: list of policies in this budget
  • maximum_attempts_to_acquire: number of attempts before throwing hit ratelimit exception, we put some big number here to avoid situations when many threads compete with each other for a few lots over a significant amount of time
def get_matching_policy( self, request: Any) -> Optional[AbstractCallRatePolicy]:
581    def get_matching_policy(self, request: Any) -> Optional[AbstractCallRatePolicy]:
582        for policy in self._policies:
583            if policy.matches(request):
584                return policy
585        return None

Find matching call rate policy for specific request

def acquire_call( self, request: Any, block: bool = True, timeout: Optional[float] = None) -> None:
587    def acquire_call(
588        self, request: Any, block: bool = True, timeout: Optional[float] = None
589    ) -> None:
590        """Try to get a call from budget, will block by default.
591        Matchers will be called sequentially in the same order they were added.
592        The first matcher that returns True will
593
594        :param request: the API request
595        :param block: when True (default) will block until a call credit is available
596        :param timeout: if provided, limits maximum waiting time; otherwise, waits indefinitely
597        :raises: CallRateLimitHit if the call credit cannot be acquired within the timeout
598        """
599
600        policy = self.get_matching_policy(request)
601        endpoint = self._extract_endpoint(request)
602        if policy:
603            logger.debug(f"Acquiring call for endpoint {endpoint} using policy: {policy}")
604            self._do_acquire(request=request, policy=policy, block=block, timeout=timeout)
605        elif self._policies:
606            logger.debug(
607                f"No policies matched for endpoint {endpoint} (request: {request}). Allowing call by default."
608            )

Try to get a call from budget, will block by default. Matchers will be called sequentially in the same order they were added. The first matcher that returns True will

Parameters
  • request: the API request
  • block: when True (default) will block until a call credit is available
  • timeout: if provided, limits maximum waiting time; otherwise, waits indefinitely
Raises
  • CallRateLimitHit if the call credit cannot be acquired within the timeout
def update_from_response(self, request: Any, response: Any) -> None:
610    def update_from_response(self, request: Any, response: Any) -> None:
611        """Update budget information based on the API response.
612
613        :param request: the initial request that triggered this response
614        :param response: response from the API
615        """
616        pass

Update budget information based on the API response.

Parameters
  • request: the initial request that triggered this response
  • response: response from the API
class HttpAPIBudget(APIBudget):
666class HttpAPIBudget(APIBudget):
667    """Implementation of AbstractAPIBudget for HTTP"""
668
669    def __init__(
670        self,
671        ratelimit_reset_header: str = "ratelimit-reset",
672        ratelimit_remaining_header: str = "ratelimit-remaining",
673        status_codes_for_ratelimit_hit: list[int] = [429],
674        **kwargs: Any,
675    ):
676        """Constructor
677
678        :param ratelimit_reset_header: name of the header that has a timestamp of the next reset of call budget
679        :param ratelimit_remaining_header: name of the header that has the number of calls left
680        :param status_codes_for_ratelimit_hit: list of HTTP status codes that signal about rate limit being hit
681        """
682        self._ratelimit_reset_header = ratelimit_reset_header
683        self._ratelimit_remaining_header = ratelimit_remaining_header
684        self._status_codes_for_ratelimit_hit = status_codes_for_ratelimit_hit
685        super().__init__(**kwargs)
686
687    def update_from_response(self, request: Any, response: Any) -> None:
688        policy = self.get_matching_policy(request)
689        if not policy:
690            return
691
692        if isinstance(response, requests.Response):
693            available_calls = self.get_calls_left_from_response(response)
694            reset_ts = self.get_reset_ts_from_response(response)
695            policy.update(available_calls=available_calls, call_reset_ts=reset_ts)
696
697    def get_reset_ts_from_response(
698        self, response: requests.Response
699    ) -> Optional[datetime.datetime]:
700        if response.headers.get(self._ratelimit_reset_header):
701            return datetime.datetime.fromtimestamp(
702                int(response.headers[self._ratelimit_reset_header])
703            )
704        return None
705
706    def get_calls_left_from_response(self, response: requests.Response) -> Optional[int]:
707        if response.headers.get(self._ratelimit_remaining_header):
708            return int(response.headers[self._ratelimit_remaining_header])
709
710        if response.status_code in self._status_codes_for_ratelimit_hit:
711            return 0
712
713        return None

Implementation of AbstractAPIBudget for HTTP

HttpAPIBudget( ratelimit_reset_header: str = 'ratelimit-reset', ratelimit_remaining_header: str = 'ratelimit-remaining', status_codes_for_ratelimit_hit: list[int] = [429], **kwargs: Any)
669    def __init__(
670        self,
671        ratelimit_reset_header: str = "ratelimit-reset",
672        ratelimit_remaining_header: str = "ratelimit-remaining",
673        status_codes_for_ratelimit_hit: list[int] = [429],
674        **kwargs: Any,
675    ):
676        """Constructor
677
678        :param ratelimit_reset_header: name of the header that has a timestamp of the next reset of call budget
679        :param ratelimit_remaining_header: name of the header that has the number of calls left
680        :param status_codes_for_ratelimit_hit: list of HTTP status codes that signal about rate limit being hit
681        """
682        self._ratelimit_reset_header = ratelimit_reset_header
683        self._ratelimit_remaining_header = ratelimit_remaining_header
684        self._status_codes_for_ratelimit_hit = status_codes_for_ratelimit_hit
685        super().__init__(**kwargs)

Constructor

Parameters
  • ratelimit_reset_header: name of the header that has a timestamp of the next reset of call budget
  • ratelimit_remaining_header: name of the header that has the number of calls left
  • status_codes_for_ratelimit_hit: list of HTTP status codes that signal about rate limit being hit
def update_from_response(self, request: Any, response: Any) -> None:
687    def update_from_response(self, request: Any, response: Any) -> None:
688        policy = self.get_matching_policy(request)
689        if not policy:
690            return
691
692        if isinstance(response, requests.Response):
693            available_calls = self.get_calls_left_from_response(response)
694            reset_ts = self.get_reset_ts_from_response(response)
695            policy.update(available_calls=available_calls, call_reset_ts=reset_ts)

Update budget information based on the API response.

Parameters
  • request: the initial request that triggered this response
  • response: response from the API
def get_reset_ts_from_response(self, response: requests.models.Response) -> Optional[datetime.datetime]:
697    def get_reset_ts_from_response(
698        self, response: requests.Response
699    ) -> Optional[datetime.datetime]:
700        if response.headers.get(self._ratelimit_reset_header):
701            return datetime.datetime.fromtimestamp(
702                int(response.headers[self._ratelimit_reset_header])
703            )
704        return None
def get_calls_left_from_response(self, response: requests.models.Response) -> Optional[int]:
706    def get_calls_left_from_response(self, response: requests.Response) -> Optional[int]:
707        if response.headers.get(self._ratelimit_remaining_header):
708            return int(response.headers[self._ratelimit_remaining_header])
709
710        if response.status_code in self._status_codes_for_ratelimit_hit:
711            return 0
712
713        return None
class LimiterMixin:
716class LimiterMixin(MIXIN_BASE):
717    """Mixin class that adds rate-limiting behavior to requests."""
718
719    def __init__(
720        self,
721        api_budget: AbstractAPIBudget,
722        **kwargs: Any,
723    ):
724        self._api_budget = api_budget
725        super().__init__(**kwargs)  # type: ignore # Base Session doesn't take any kwargs
726
727    def send(self, request: requests.PreparedRequest, **kwargs: Any) -> requests.Response:
728        """Send a request with rate-limiting."""
729        self._api_budget.acquire_call(request)
730        response = super().send(request, **kwargs)
731        self._api_budget.update_from_response(request, response)
732        return response

Mixin class that adds rate-limiting behavior to requests.

LimiterMixin( api_budget: AbstractAPIBudget, **kwargs: Any)
719    def __init__(
720        self,
721        api_budget: AbstractAPIBudget,
722        **kwargs: Any,
723    ):
724        self._api_budget = api_budget
725        super().__init__(**kwargs)  # type: ignore # Base Session doesn't take any kwargs
def send( self, request: requests.models.PreparedRequest, **kwargs: Any) -> requests.models.Response:
727    def send(self, request: requests.PreparedRequest, **kwargs: Any) -> requests.Response:
728        """Send a request with rate-limiting."""
729        self._api_budget.acquire_call(request)
730        response = super().send(request, **kwargs)
731        self._api_budget.update_from_response(request, response)
732        return response

Send a request with rate-limiting.

class LimiterSession(LimiterMixin, requests.sessions.Session):
735class LimiterSession(LimiterMixin, requests.Session):
736    """Session that adds rate-limiting behavior to requests."""

Session that adds rate-limiting behavior to requests.

Inherited Members
LimiterMixin
LimiterMixin
send
class CachedLimiterSession(requests_cache.session.CacheMixin, LimiterMixin, requests.sessions.Session):
739class CachedLimiterSession(requests_cache.CacheMixin, LimiterMixin, requests.Session):
740    """Session class with caching and rate-limiting behavior."""

Session class with caching and rate-limiting behavior.