airbyte_cdk.sources.streams.call_rate

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5import abc
  6import dataclasses
  7import datetime
  8import logging
  9import re
 10import time
 11from datetime import timedelta
 12from threading import RLock
 13from typing import TYPE_CHECKING, Any, Mapping, Optional
 14from urllib import parse
 15
 16import requests
 17import requests_cache
 18from pyrate_limiter import InMemoryBucket, Limiter, RateItem, TimeClock
 19from pyrate_limiter import Rate as PyRateRate
 20from pyrate_limiter.exceptions import BucketFullException
 21
 22# prevents mypy from complaining about missing session attributes in LimiterMixin
 23if TYPE_CHECKING:
 24    MIXIN_BASE = requests.Session
 25else:
 26    MIXIN_BASE = object
 27
 28logger = logging.getLogger("airbyte")
 29logging.getLogger("pyrate_limiter").setLevel(logging.WARNING)
 30
 31
 32@dataclasses.dataclass
 33class Rate:
 34    """Call rate limit"""
 35
 36    limit: int
 37    interval: timedelta
 38
 39
 40class CallRateLimitHit(Exception):
 41    def __init__(self, error: str, item: Any, weight: int, rate: str, time_to_wait: timedelta):
 42        """Constructor
 43
 44        :param error: error message
 45        :param item: object passed into acquire_call
 46        :param weight: how many credits were requested
 47        :param rate: string representation of the rate violated
 48        :param time_to_wait: how long should wait util more call will be available
 49        """
 50        self.item = item
 51        self.weight = weight
 52        self.rate = rate
 53        self.time_to_wait = time_to_wait
 54        super().__init__(error)
 55
 56
 57class AbstractCallRatePolicy(abc.ABC):
 58    """Call rate policy interface.
 59    Should be configurable with different rules, like N per M for endpoint X. Endpoint X is matched with APIBudget.
 60    """
 61
 62    @abc.abstractmethod
 63    def matches(self, request: Any) -> bool:
 64        """Tells if this policy matches specific request and should apply to it
 65
 66        :param request:
 67        :return: True if policy should apply to this request, False - otherwise
 68        """
 69
 70    @abc.abstractmethod
 71    def try_acquire(self, request: Any, weight: int) -> None:
 72        """Try to acquire request
 73
 74        :param request: a request object representing a single call to API
 75        :param weight: number of requests to deduct from credit
 76        :return:
 77        """
 78
 79    @abc.abstractmethod
 80    def update(
 81        self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime]
 82    ) -> None:
 83        """Update call rate counting with current values
 84
 85        :param available_calls:
 86        :param call_reset_ts:
 87        """
 88
 89
 90class RequestMatcher(abc.ABC):
 91    """Callable that help to match a request object with call rate policies."""
 92
 93    @abc.abstractmethod
 94    def __call__(self, request: Any) -> bool:
 95        """
 96
 97        :param request:
 98        :return: True if matches the provided request object, False - otherwise
 99        """
100
101
102class HttpRequestMatcher(RequestMatcher):
103    """Simple implementation of RequestMatcher for HTTP requests using HttpRequestRegexMatcher under the hood."""
104
105    def __init__(
106        self,
107        method: Optional[str] = None,
108        url: Optional[str] = None,
109        params: Optional[Mapping[str, Any]] = None,
110        headers: Optional[Mapping[str, Any]] = None,
111    ):
112        """Constructor
113
114        :param method: HTTP method (e.g., "GET", "POST").
115        :param url: Full URL to match.
116        :param params: Dictionary of query parameters to match.
117        :param headers: Dictionary of headers to match.
118        """
119        # Parse the URL to extract the base and path
120        if url:
121            parsed_url = parse.urlsplit(url)
122            url_base = f"{parsed_url.scheme}://{parsed_url.netloc}"
123            url_path = parsed_url.path if parsed_url.path != "/" else None
124        else:
125            url_base = None
126            url_path = None
127
128        # Use HttpRequestRegexMatcher under the hood
129        self._regex_matcher = HttpRequestRegexMatcher(
130            method=method,
131            url_base=url_base,
132            url_path_pattern=re.escape(url_path) if url_path else None,
133            params=params,
134            headers=headers,
135        )
136
137    def __call__(self, request: Any) -> bool:
138        """
139        :param request: A requests.Request or requests.PreparedRequest instance.
140        :return: True if the request matches all provided criteria; False otherwise.
141        """
142        return self._regex_matcher(request)
143
144    def __str__(self) -> str:
145        return (
146            f"HttpRequestMatcher(method={self._regex_matcher._method}, "
147            f"url={self._regex_matcher._url_base}{self._regex_matcher._url_path_pattern.pattern if self._regex_matcher._url_path_pattern else ''}, "
148            f"params={self._regex_matcher._params}, headers={self._regex_matcher._headers})"
149        )
150
151
152class HttpRequestRegexMatcher(RequestMatcher):
153    """
154    Extended RequestMatcher for HTTP requests that supports matching on:
155      - HTTP method (case-insensitive)
156      - URL base (scheme + netloc) optionally
157      - URL path pattern (a regex applied to the path portion of the URL)
158      - Query parameters (must be present)
159      - Headers (header names compared case-insensitively)
160    """
161
162    def __init__(
163        self,
164        method: Optional[str] = None,
165        url_base: Optional[str] = None,
166        url_path_pattern: Optional[str] = None,
167        params: Optional[Mapping[str, Any]] = None,
168        headers: Optional[Mapping[str, Any]] = None,
169    ):
170        """
171        :param method: HTTP method (e.g. "GET", "POST"); compared case-insensitively.
172        :param url_base: Base URL (scheme://host) that must match.
173        :param url_path_pattern: A regex pattern that will be applied to the path portion of the URL.
174        :param params: Dictionary of query parameters that must be present in the request.
175        :param headers: Dictionary of headers that must be present (header keys are compared case-insensitively).
176        """
177        self._method = method.upper() if method else None
178
179        # Normalize the url_base if provided: remove trailing slash.
180        self._url_base = url_base.rstrip("/") if url_base else None
181
182        # Compile the URL path pattern if provided.
183        self._url_path_pattern = re.compile(url_path_pattern) if url_path_pattern else None
184
185        # Normalize query parameters to strings.
186        self._params = {str(k): str(v) for k, v in (params or {}).items()}
187
188        # Normalize header keys to lowercase.
189        self._headers = {str(k).lower(): str(v) for k, v in (headers or {}).items()}
190
191    @staticmethod
192    def _match_dict(obj: Mapping[str, Any], pattern: Mapping[str, Any]) -> bool:
193        """Check that every key/value in the pattern exists in the object."""
194        return pattern.items() <= obj.items()
195
196    def __call__(self, request: Any) -> bool:
197        """
198        :param request: A requests.Request or requests.PreparedRequest instance.
199        :return: True if the request matches all provided criteria; False otherwise.
200        """
201        # Prepare the request (if needed) and extract the URL details.
202        if isinstance(request, requests.Request):
203            prepared_request = request.prepare()
204        elif isinstance(request, requests.PreparedRequest):
205            prepared_request = request
206        else:
207            return False
208
209        # Check HTTP method.
210        if self._method is not None:
211            if prepared_request.method != self._method:
212                return False
213
214        # Parse the URL.
215        parsed_url = parse.urlsplit(prepared_request.url)
216        # Reconstruct the base: scheme://netloc
217        request_url_base = f"{str(parsed_url.scheme)}://{str(parsed_url.netloc)}"
218        # The path (without query parameters)
219        request_path = str(parsed_url.path).rstrip("/")
220
221        # If a base URL is provided, check that it matches.
222        if self._url_base is not None:
223            if request_url_base != self._url_base:
224                return False
225
226        # If a URL path pattern is provided, ensure the path matches the regex.
227        if self._url_path_pattern is not None:
228            if not self._url_path_pattern.search(request_path):
229                return False
230
231        # Check query parameters.
232        if self._params:
233            query_params = dict(parse.parse_qsl(str(parsed_url.query)))
234            if not self._match_dict(query_params, self._params):
235                return False
236
237        # Check headers (normalize keys to lower-case).
238        if self._headers:
239            req_headers = {k.lower(): v for k, v in prepared_request.headers.items()}
240            if not self._match_dict(req_headers, self._headers):
241                return False
242
243        return True
244
245    def __str__(self) -> str:
246        regex = self._url_path_pattern.pattern if self._url_path_pattern else None
247        return (
248            f"HttpRequestRegexMatcher(method={self._method}, url_base={self._url_base}, "
249            f"url_path_pattern={regex}, params={self._params}, headers={self._headers})"
250        )
251
252
253class BaseCallRatePolicy(AbstractCallRatePolicy, abc.ABC):
254    def __init__(self, matchers: list[RequestMatcher]):
255        self._matchers = matchers
256
257    def matches(self, request: Any) -> bool:
258        """Tell if this policy matches specific request and should apply to it
259
260        :param request:
261        :return: True if policy should apply to this request, False - otherwise
262        """
263
264        if not self._matchers:
265            return True
266        return any(matcher(request) for matcher in self._matchers)
267
268
269class UnlimitedCallRatePolicy(BaseCallRatePolicy):
270    """
271    This policy is for explicit unlimited call rates.
272    It can be used when we want to match a specific group of requests and don't apply any limits.
273
274    Example:
275
276    APICallBudget(
277        [
278            UnlimitedCallRatePolicy(
279                matchers=[HttpRequestMatcher(url="/some/method", headers={"sandbox": true})],
280            ),
281            FixedWindowCallRatePolicy(
282                matchers=[HttpRequestMatcher(url="/some/method")],
283                next_reset_ts=datetime.now(),
284                period=timedelta(hours=1)
285                call_limit=1000,
286            ),
287        ]
288    )
289
290    The code above will limit all calls to /some/method except calls that have header sandbox=True
291    """
292
293    def try_acquire(self, request: Any, weight: int) -> None:
294        """Do nothing"""
295
296    def update(
297        self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime]
298    ) -> None:
299        """Do nothing"""
300
301
302class FixedWindowCallRatePolicy(BaseCallRatePolicy):
303    def __init__(
304        self,
305        next_reset_ts: datetime.datetime,
306        period: timedelta,
307        call_limit: int,
308        matchers: list[RequestMatcher],
309    ):
310        """A policy that allows {call_limit} calls within a {period} time interval
311
312        :param next_reset_ts: next call rate reset time point
313        :param period: call rate reset period
314        :param call_limit:
315        :param matchers:
316        """
317
318        self._next_reset_ts = next_reset_ts
319        self._offset = period
320        self._call_limit = call_limit
321        self._calls_num = 0
322        self._lock = RLock()
323        super().__init__(matchers=matchers)
324
325    def try_acquire(self, request: Any, weight: int) -> None:
326        if weight > self._call_limit:
327            raise ValueError("Weight can not exceed the call limit")
328        if not self.matches(request):
329            raise ValueError("Request does not match the policy")
330
331        with self._lock:
332            self._update_current_window()
333
334            if self._calls_num + weight > self._call_limit:
335                reset_in = self._next_reset_ts - datetime.datetime.now()
336                error_message = (
337                    f"reached maximum number of allowed calls {self._call_limit} "
338                    f"per {self._offset} interval, next reset in {reset_in}."
339                )
340                raise CallRateLimitHit(
341                    error=error_message,
342                    item=request,
343                    weight=weight,
344                    rate=f"{self._call_limit} per {self._offset}",
345                    time_to_wait=reset_in,
346                )
347
348            self._calls_num += weight
349
350    def __str__(self) -> str:
351        matcher_str = ", ".join(f"{matcher}" for matcher in self._matchers)
352        return (
353            f"FixedWindowCallRatePolicy(call_limit={self._call_limit}, period={self._offset}, "
354            f"calls_used={self._calls_num}, next_reset={self._next_reset_ts}, "
355            f"matchers=[{matcher_str}])"
356        )
357
358    def update(
359        self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime]
360    ) -> None:
361        """Update call rate counters, by default, only reacts to decreasing updates of available_calls and changes to call_reset_ts.
362        We ignore updates with available_calls > current_available_calls to support call rate limits that are lower than API limits.
363
364        :param available_calls:
365        :param call_reset_ts:
366        """
367        with self._lock:
368            self._update_current_window()
369            current_available_calls = self._call_limit - self._calls_num
370
371            if available_calls is not None and current_available_calls > available_calls:
372                logger.debug(
373                    "got rate limit update from api, adjusting available calls from %s to %s",
374                    current_available_calls,
375                    available_calls,
376                )
377                self._calls_num = self._call_limit - available_calls
378
379            if call_reset_ts is not None and call_reset_ts != self._next_reset_ts:
380                logger.debug(
381                    "got rate limit update from api, adjusting reset time from %s to %s",
382                    self._next_reset_ts,
383                    call_reset_ts,
384                )
385                self._next_reset_ts = call_reset_ts
386
387    def _update_current_window(self) -> None:
388        now = datetime.datetime.now()
389        if now > self._next_reset_ts:
390            logger.debug("started new window, %s calls available now", self._call_limit)
391            self._next_reset_ts = self._next_reset_ts + self._offset
392            self._calls_num = 0
393
394
395class MovingWindowCallRatePolicy(BaseCallRatePolicy):
396    """
397    Policy to control requests rate implemented on top of PyRateLimiter lib.
398    The main difference between this policy and FixedWindowCallRatePolicy is that the rate-limiting window
399    is moving along requests that we made, and there is no moment when we reset an available number of calls.
400    This strategy requires saving of timestamps of all requests within a window.
401    """
402
403    def __init__(self, rates: list[Rate], matchers: list[RequestMatcher]):
404        """Constructor
405
406        :param rates: list of rates, the order is important and must be ascending
407        :param matchers:
408        """
409        if not rates:
410            raise ValueError("The list of rates can not be empty")
411        pyrate_rates = [
412            PyRateRate(limit=rate.limit, interval=int(rate.interval.total_seconds() * 1000))
413            for rate in rates
414        ]
415        self._bucket = InMemoryBucket(pyrate_rates)
416        # Limiter will create the background task that clears old requests in the bucket
417        self._limiter = Limiter(self._bucket)
418        super().__init__(matchers=matchers)
419
420    def try_acquire(self, request: Any, weight: int) -> None:
421        if not self.matches(request):
422            raise ValueError("Request does not match the policy")
423
424        try:
425            self._limiter.try_acquire(request, weight=weight)
426        except BucketFullException as exc:
427            item = self._limiter.bucket_factory.wrap_item(request, weight)
428            assert isinstance(item, RateItem)
429
430            with self._limiter.lock:
431                time_to_wait = self._bucket.waiting(item)
432                assert isinstance(time_to_wait, int)
433
434                raise CallRateLimitHit(
435                    error=str(exc.meta_info["error"]),
436                    item=request,
437                    weight=int(exc.meta_info["weight"]),
438                    rate=str(exc.meta_info["rate"]),
439                    time_to_wait=timedelta(milliseconds=time_to_wait),
440                )
441
442    def update(
443        self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime]
444    ) -> None:
445        """Adjust call bucket to reflect the state of the API server
446
447        :param available_calls:
448        :param call_reset_ts:
449        :return:
450        """
451        if (
452            available_calls is not None and call_reset_ts is None
453        ):  # we do our best to sync buckets with API
454            if available_calls == 0:
455                with self._limiter.lock:
456                    items_to_add = self._bucket.count() < self._bucket.rates[0].limit
457                    if items_to_add > 0:
458                        now: int = TimeClock().now()  # type: ignore[no-untyped-call]
459                        self._bucket.put(RateItem(name="dummy", timestamp=now, weight=items_to_add))
460        # TODO: add support if needed, it might be that it is not possible to make a good solution for this case
461        # if available_calls is not None and call_reset_ts is not None:
462        #     ts = call_reset_ts.timestamp()
463
464    def __str__(self) -> str:
465        """Return a human-friendly description of the moving window rate policy for logging purposes."""
466        rates_info = ", ".join(
467            f"{rate.limit} per {timedelta(milliseconds=rate.interval)}"
468            for rate in self._bucket.rates
469        )
470        current_bucket_count = self._bucket.count()
471        matcher_str = ", ".join(f"{matcher}" for matcher in self._matchers)
472        return (
473            f"MovingWindowCallRatePolicy(rates=[{rates_info}], current_bucket_count={current_bucket_count}, "
474            f"matchers=[{matcher_str}])"
475        )
476
477
478class AbstractAPIBudget(abc.ABC):
479    """Interface to some API where a client allowed to have N calls per T interval.
480
481    Important: APIBudget is not doing any API calls, the end user code is responsible to call this interface
482        to respect call rate limitation of the API.
483
484    It supports multiple policies applied to different group of requests. To distinct these groups we use RequestMatchers.
485    Individual policy represented by MovingWindowCallRatePolicy and currently supports only moving window strategy.
486    """
487
488    @abc.abstractmethod
489    def acquire_call(
490        self, request: Any, block: bool = True, timeout: Optional[float] = None
491    ) -> None:
492        """Try to get a call from budget, will block by default
493
494        :param request:
495        :param block: when true (default) will block the current thread until call credit is available
496        :param timeout: if set will limit maximum time in block, otherwise will wait until credit is available
497        :raises: CallRateLimitHit - when no credits left and if timeout was set the waiting time exceed the timeout
498        """
499
500    @abc.abstractmethod
501    def get_matching_policy(self, request: Any) -> Optional[AbstractCallRatePolicy]:
502        """Find matching call rate policy for specific request"""
503
504    @abc.abstractmethod
505    def update_from_response(self, request: Any, response: Any) -> None:
506        """Update budget information based on response from API
507
508        :param request: the initial request that triggered this response
509        :param response: response from the API
510        """
511
512
513class APIBudget(AbstractAPIBudget):
514    """Default APIBudget implementation"""
515
516    def __init__(
517        self, policies: list[AbstractCallRatePolicy], maximum_attempts_to_acquire: int = 100000
518    ) -> None:
519        """Constructor
520
521        :param policies: list of policies in this budget
522        :param maximum_attempts_to_acquire: number of attempts before throwing hit ratelimit exception, we put some big number here
523         to avoid situations when many threads compete with each other for a few lots over a significant amount of time
524        """
525
526        self._policies = policies
527        self._maximum_attempts_to_acquire = maximum_attempts_to_acquire
528
529    def _extract_endpoint(self, request: Any) -> str:
530        """Extract the endpoint URL from the request if available."""
531        endpoint = None
532        try:
533            # If the request is already a PreparedRequest, it should have a URL.
534            if isinstance(request, requests.PreparedRequest):
535                endpoint = request.url
536            # If it's a requests.Request, we call prepare() to extract the URL.
537            elif isinstance(request, requests.Request):
538                prepared = request.prepare()
539                endpoint = prepared.url
540        except Exception as e:
541            logger.debug(f"Error extracting endpoint: {e}")
542        if endpoint:
543            return endpoint
544        return "unknown endpoint"
545
546    def get_matching_policy(self, request: Any) -> Optional[AbstractCallRatePolicy]:
547        for policy in self._policies:
548            if policy.matches(request):
549                return policy
550        return None
551
552    def acquire_call(
553        self, request: Any, block: bool = True, timeout: Optional[float] = None
554    ) -> None:
555        """Try to get a call from budget, will block by default.
556        Matchers will be called sequentially in the same order they were added.
557        The first matcher that returns True will
558
559        :param request: the API request
560        :param block: when True (default) will block until a call credit is available
561        :param timeout: if provided, limits maximum waiting time; otherwise, waits indefinitely
562        :raises: CallRateLimitHit if the call credit cannot be acquired within the timeout
563        """
564
565        policy = self.get_matching_policy(request)
566        endpoint = self._extract_endpoint(request)
567        if policy:
568            logger.debug(f"Acquiring call for endpoint {endpoint} using policy: {policy}")
569            self._do_acquire(request=request, policy=policy, block=block, timeout=timeout)
570        elif self._policies:
571            logger.debug(
572                f"No policies matched for endpoint {endpoint} (request: {request}). Allowing call by default."
573            )
574
575    def update_from_response(self, request: Any, response: Any) -> None:
576        """Update budget information based on the API response.
577
578        :param request: the initial request that triggered this response
579        :param response: response from the API
580        """
581        pass
582
583    def _do_acquire(
584        self, request: Any, policy: AbstractCallRatePolicy, block: bool, timeout: Optional[float]
585    ) -> None:
586        """Internal method to try to acquire a call credit.
587
588        :param request: the API request
589        :param policy: the matching rate-limiting policy
590        :param block: indicates whether to block until a call credit is available
591        :param timeout: maximum time to wait if blocking
592        :raises: CallRateLimitHit if unable to acquire a call credit
593        """
594        last_exception = None
595        endpoint = self._extract_endpoint(request)
596        # sometimes we spend all budget before a second attempt, so we have a few more attempts
597        for attempt in range(1, self._maximum_attempts_to_acquire):
598            try:
599                policy.try_acquire(request, weight=1)
600                return
601            except CallRateLimitHit as exc:
602                last_exception = exc
603                if block:
604                    if timeout is not None:
605                        time_to_wait = min(timedelta(seconds=timeout), exc.time_to_wait)
606                    else:
607                        time_to_wait = exc.time_to_wait
608                    # Ensure we never sleep for a negative duration.
609                    time_to_wait = max(timedelta(0), time_to_wait)
610                    logger.debug(
611                        f"Policy {policy} reached call limit for endpoint {endpoint} ({exc.rate}). "
612                        f"Sleeping for {time_to_wait} on attempt {attempt}."
613                    )
614                    time.sleep(time_to_wait.total_seconds())
615                else:
616                    logger.debug(
617                        f"Policy {policy} reached call limit for endpoint {endpoint} ({exc.rate}) "
618                        f"and blocking is disabled."
619                    )
620                    raise
621
622        if last_exception:
623            logger.debug(
624                f"Exhausted all {self._maximum_attempts_to_acquire} attempts to acquire a call for endpoint {endpoint} "
625                f"using policy: {policy}"
626            )
627            raise last_exception
628
629
630class HttpAPIBudget(APIBudget):
631    """Implementation of AbstractAPIBudget for HTTP"""
632
633    def __init__(
634        self,
635        ratelimit_reset_header: str = "ratelimit-reset",
636        ratelimit_remaining_header: str = "ratelimit-remaining",
637        status_codes_for_ratelimit_hit: list[int] = [429],
638        **kwargs: Any,
639    ):
640        """Constructor
641
642        :param ratelimit_reset_header: name of the header that has a timestamp of the next reset of call budget
643        :param ratelimit_remaining_header: name of the header that has the number of calls left
644        :param status_codes_for_ratelimit_hit: list of HTTP status codes that signal about rate limit being hit
645        """
646        self._ratelimit_reset_header = ratelimit_reset_header
647        self._ratelimit_remaining_header = ratelimit_remaining_header
648        self._status_codes_for_ratelimit_hit = status_codes_for_ratelimit_hit
649        super().__init__(**kwargs)
650
651    def update_from_response(self, request: Any, response: Any) -> None:
652        policy = self.get_matching_policy(request)
653        if not policy:
654            return
655
656        if isinstance(response, requests.Response):
657            available_calls = self.get_calls_left_from_response(response)
658            reset_ts = self.get_reset_ts_from_response(response)
659            policy.update(available_calls=available_calls, call_reset_ts=reset_ts)
660
661    def get_reset_ts_from_response(
662        self, response: requests.Response
663    ) -> Optional[datetime.datetime]:
664        if response.headers.get(self._ratelimit_reset_header):
665            return datetime.datetime.fromtimestamp(
666                int(response.headers[self._ratelimit_reset_header])
667            )
668        return None
669
670    def get_calls_left_from_response(self, response: requests.Response) -> Optional[int]:
671        if response.headers.get(self._ratelimit_remaining_header):
672            return int(response.headers[self._ratelimit_remaining_header])
673
674        if response.status_code in self._status_codes_for_ratelimit_hit:
675            return 0
676
677        return None
678
679
680class LimiterMixin(MIXIN_BASE):
681    """Mixin class that adds rate-limiting behavior to requests."""
682
683    def __init__(
684        self,
685        api_budget: AbstractAPIBudget,
686        **kwargs: Any,
687    ):
688        self._api_budget = api_budget
689        super().__init__(**kwargs)  # type: ignore # Base Session doesn't take any kwargs
690
691    def send(self, request: requests.PreparedRequest, **kwargs: Any) -> requests.Response:
692        """Send a request with rate-limiting."""
693        self._api_budget.acquire_call(request)
694        response = super().send(request, **kwargs)
695        self._api_budget.update_from_response(request, response)
696        return response
697
698
699class LimiterSession(LimiterMixin, requests.Session):
700    """Session that adds rate-limiting behavior to requests."""
701
702
703class CachedLimiterSession(requests_cache.CacheMixin, LimiterMixin, requests.Session):
704    """Session class with caching and rate-limiting behavior."""
logger = <Logger airbyte (INFO)>
@dataclasses.dataclass
class Rate:
33@dataclasses.dataclass
34class Rate:
35    """Call rate limit"""
36
37    limit: int
38    interval: timedelta

Call rate limit

Rate(limit: int, interval: datetime.timedelta)
limit: int
interval: datetime.timedelta
class CallRateLimitHit(builtins.Exception):
41class CallRateLimitHit(Exception):
42    def __init__(self, error: str, item: Any, weight: int, rate: str, time_to_wait: timedelta):
43        """Constructor
44
45        :param error: error message
46        :param item: object passed into acquire_call
47        :param weight: how many credits were requested
48        :param rate: string representation of the rate violated
49        :param time_to_wait: how long should wait util more call will be available
50        """
51        self.item = item
52        self.weight = weight
53        self.rate = rate
54        self.time_to_wait = time_to_wait
55        super().__init__(error)

Common base class for all non-exit exceptions.

CallRateLimitHit( error: str, item: Any, weight: int, rate: str, time_to_wait: datetime.timedelta)
42    def __init__(self, error: str, item: Any, weight: int, rate: str, time_to_wait: timedelta):
43        """Constructor
44
45        :param error: error message
46        :param item: object passed into acquire_call
47        :param weight: how many credits were requested
48        :param rate: string representation of the rate violated
49        :param time_to_wait: how long should wait util more call will be available
50        """
51        self.item = item
52        self.weight = weight
53        self.rate = rate
54        self.time_to_wait = time_to_wait
55        super().__init__(error)

Constructor

Parameters
  • error: error message
  • item: object passed into acquire_call
  • weight: how many credits were requested
  • rate: string representation of the rate violated
  • time_to_wait: how long should wait util more call will be available
item
weight
rate
time_to_wait
class AbstractCallRatePolicy(abc.ABC):
58class AbstractCallRatePolicy(abc.ABC):
59    """Call rate policy interface.
60    Should be configurable with different rules, like N per M for endpoint X. Endpoint X is matched with APIBudget.
61    """
62
63    @abc.abstractmethod
64    def matches(self, request: Any) -> bool:
65        """Tells if this policy matches specific request and should apply to it
66
67        :param request:
68        :return: True if policy should apply to this request, False - otherwise
69        """
70
71    @abc.abstractmethod
72    def try_acquire(self, request: Any, weight: int) -> None:
73        """Try to acquire request
74
75        :param request: a request object representing a single call to API
76        :param weight: number of requests to deduct from credit
77        :return:
78        """
79
80    @abc.abstractmethod
81    def update(
82        self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime]
83    ) -> None:
84        """Update call rate counting with current values
85
86        :param available_calls:
87        :param call_reset_ts:
88        """

Call rate policy interface. Should be configurable with different rules, like N per M for endpoint X. Endpoint X is matched with APIBudget.

@abc.abstractmethod
def matches(self, request: Any) -> bool:
63    @abc.abstractmethod
64    def matches(self, request: Any) -> bool:
65        """Tells if this policy matches specific request and should apply to it
66
67        :param request:
68        :return: True if policy should apply to this request, False - otherwise
69        """

Tells if this policy matches specific request and should apply to it

Parameters
  • request:
Returns

True if policy should apply to this request, False - otherwise

@abc.abstractmethod
def try_acquire(self, request: Any, weight: int) -> None:
71    @abc.abstractmethod
72    def try_acquire(self, request: Any, weight: int) -> None:
73        """Try to acquire request
74
75        :param request: a request object representing a single call to API
76        :param weight: number of requests to deduct from credit
77        :return:
78        """

Try to acquire request

Parameters
  • request: a request object representing a single call to API
  • weight: number of requests to deduct from credit
Returns
@abc.abstractmethod
def update( self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime]) -> None:
80    @abc.abstractmethod
81    def update(
82        self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime]
83    ) -> None:
84        """Update call rate counting with current values
85
86        :param available_calls:
87        :param call_reset_ts:
88        """

Update call rate counting with current values

Parameters
  • available_calls:
  • call_reset_ts:
class RequestMatcher(abc.ABC):
 91class RequestMatcher(abc.ABC):
 92    """Callable that help to match a request object with call rate policies."""
 93
 94    @abc.abstractmethod
 95    def __call__(self, request: Any) -> bool:
 96        """
 97
 98        :param request:
 99        :return: True if matches the provided request object, False - otherwise
100        """

Callable that help to match a request object with call rate policies.

class HttpRequestMatcher(RequestMatcher):
103class HttpRequestMatcher(RequestMatcher):
104    """Simple implementation of RequestMatcher for HTTP requests using HttpRequestRegexMatcher under the hood."""
105
106    def __init__(
107        self,
108        method: Optional[str] = None,
109        url: Optional[str] = None,
110        params: Optional[Mapping[str, Any]] = None,
111        headers: Optional[Mapping[str, Any]] = None,
112    ):
113        """Constructor
114
115        :param method: HTTP method (e.g., "GET", "POST").
116        :param url: Full URL to match.
117        :param params: Dictionary of query parameters to match.
118        :param headers: Dictionary of headers to match.
119        """
120        # Parse the URL to extract the base and path
121        if url:
122            parsed_url = parse.urlsplit(url)
123            url_base = f"{parsed_url.scheme}://{parsed_url.netloc}"
124            url_path = parsed_url.path if parsed_url.path != "/" else None
125        else:
126            url_base = None
127            url_path = None
128
129        # Use HttpRequestRegexMatcher under the hood
130        self._regex_matcher = HttpRequestRegexMatcher(
131            method=method,
132            url_base=url_base,
133            url_path_pattern=re.escape(url_path) if url_path else None,
134            params=params,
135            headers=headers,
136        )
137
138    def __call__(self, request: Any) -> bool:
139        """
140        :param request: A requests.Request or requests.PreparedRequest instance.
141        :return: True if the request matches all provided criteria; False otherwise.
142        """
143        return self._regex_matcher(request)
144
145    def __str__(self) -> str:
146        return (
147            f"HttpRequestMatcher(method={self._regex_matcher._method}, "
148            f"url={self._regex_matcher._url_base}{self._regex_matcher._url_path_pattern.pattern if self._regex_matcher._url_path_pattern else ''}, "
149            f"params={self._regex_matcher._params}, headers={self._regex_matcher._headers})"
150        )

Simple implementation of RequestMatcher for HTTP requests using HttpRequestRegexMatcher under the hood.

HttpRequestMatcher( method: Optional[str] = None, url: Optional[str] = None, params: Optional[Mapping[str, Any]] = None, headers: Optional[Mapping[str, Any]] = None)
106    def __init__(
107        self,
108        method: Optional[str] = None,
109        url: Optional[str] = None,
110        params: Optional[Mapping[str, Any]] = None,
111        headers: Optional[Mapping[str, Any]] = None,
112    ):
113        """Constructor
114
115        :param method: HTTP method (e.g., "GET", "POST").
116        :param url: Full URL to match.
117        :param params: Dictionary of query parameters to match.
118        :param headers: Dictionary of headers to match.
119        """
120        # Parse the URL to extract the base and path
121        if url:
122            parsed_url = parse.urlsplit(url)
123            url_base = f"{parsed_url.scheme}://{parsed_url.netloc}"
124            url_path = parsed_url.path if parsed_url.path != "/" else None
125        else:
126            url_base = None
127            url_path = None
128
129        # Use HttpRequestRegexMatcher under the hood
130        self._regex_matcher = HttpRequestRegexMatcher(
131            method=method,
132            url_base=url_base,
133            url_path_pattern=re.escape(url_path) if url_path else None,
134            params=params,
135            headers=headers,
136        )

Constructor

Parameters
  • method: HTTP method (e.g., "GET", "POST").
  • url: Full URL to match.
  • params: Dictionary of query parameters to match.
  • headers: Dictionary of headers to match.
class HttpRequestRegexMatcher(RequestMatcher):
153class HttpRequestRegexMatcher(RequestMatcher):
154    """
155    Extended RequestMatcher for HTTP requests that supports matching on:
156      - HTTP method (case-insensitive)
157      - URL base (scheme + netloc) optionally
158      - URL path pattern (a regex applied to the path portion of the URL)
159      - Query parameters (must be present)
160      - Headers (header names compared case-insensitively)
161    """
162
163    def __init__(
164        self,
165        method: Optional[str] = None,
166        url_base: Optional[str] = None,
167        url_path_pattern: Optional[str] = None,
168        params: Optional[Mapping[str, Any]] = None,
169        headers: Optional[Mapping[str, Any]] = None,
170    ):
171        """
172        :param method: HTTP method (e.g. "GET", "POST"); compared case-insensitively.
173        :param url_base: Base URL (scheme://host) that must match.
174        :param url_path_pattern: A regex pattern that will be applied to the path portion of the URL.
175        :param params: Dictionary of query parameters that must be present in the request.
176        :param headers: Dictionary of headers that must be present (header keys are compared case-insensitively).
177        """
178        self._method = method.upper() if method else None
179
180        # Normalize the url_base if provided: remove trailing slash.
181        self._url_base = url_base.rstrip("/") if url_base else None
182
183        # Compile the URL path pattern if provided.
184        self._url_path_pattern = re.compile(url_path_pattern) if url_path_pattern else None
185
186        # Normalize query parameters to strings.
187        self._params = {str(k): str(v) for k, v in (params or {}).items()}
188
189        # Normalize header keys to lowercase.
190        self._headers = {str(k).lower(): str(v) for k, v in (headers or {}).items()}
191
192    @staticmethod
193    def _match_dict(obj: Mapping[str, Any], pattern: Mapping[str, Any]) -> bool:
194        """Check that every key/value in the pattern exists in the object."""
195        return pattern.items() <= obj.items()
196
197    def __call__(self, request: Any) -> bool:
198        """
199        :param request: A requests.Request or requests.PreparedRequest instance.
200        :return: True if the request matches all provided criteria; False otherwise.
201        """
202        # Prepare the request (if needed) and extract the URL details.
203        if isinstance(request, requests.Request):
204            prepared_request = request.prepare()
205        elif isinstance(request, requests.PreparedRequest):
206            prepared_request = request
207        else:
208            return False
209
210        # Check HTTP method.
211        if self._method is not None:
212            if prepared_request.method != self._method:
213                return False
214
215        # Parse the URL.
216        parsed_url = parse.urlsplit(prepared_request.url)
217        # Reconstruct the base: scheme://netloc
218        request_url_base = f"{str(parsed_url.scheme)}://{str(parsed_url.netloc)}"
219        # The path (without query parameters)
220        request_path = str(parsed_url.path).rstrip("/")
221
222        # If a base URL is provided, check that it matches.
223        if self._url_base is not None:
224            if request_url_base != self._url_base:
225                return False
226
227        # If a URL path pattern is provided, ensure the path matches the regex.
228        if self._url_path_pattern is not None:
229            if not self._url_path_pattern.search(request_path):
230                return False
231
232        # Check query parameters.
233        if self._params:
234            query_params = dict(parse.parse_qsl(str(parsed_url.query)))
235            if not self._match_dict(query_params, self._params):
236                return False
237
238        # Check headers (normalize keys to lower-case).
239        if self._headers:
240            req_headers = {k.lower(): v for k, v in prepared_request.headers.items()}
241            if not self._match_dict(req_headers, self._headers):
242                return False
243
244        return True
245
246    def __str__(self) -> str:
247        regex = self._url_path_pattern.pattern if self._url_path_pattern else None
248        return (
249            f"HttpRequestRegexMatcher(method={self._method}, url_base={self._url_base}, "
250            f"url_path_pattern={regex}, params={self._params}, headers={self._headers})"
251        )
Extended RequestMatcher for HTTP requests that supports matching on:
  • HTTP method (case-insensitive)
  • URL base (scheme + netloc) optionally
  • URL path pattern (a regex applied to the path portion of the URL)
  • Query parameters (must be present)
  • Headers (header names compared case-insensitively)
HttpRequestRegexMatcher( method: Optional[str] = None, url_base: Optional[str] = None, url_path_pattern: Optional[str] = None, params: Optional[Mapping[str, Any]] = None, headers: Optional[Mapping[str, Any]] = None)
163    def __init__(
164        self,
165        method: Optional[str] = None,
166        url_base: Optional[str] = None,
167        url_path_pattern: Optional[str] = None,
168        params: Optional[Mapping[str, Any]] = None,
169        headers: Optional[Mapping[str, Any]] = None,
170    ):
171        """
172        :param method: HTTP method (e.g. "GET", "POST"); compared case-insensitively.
173        :param url_base: Base URL (scheme://host) that must match.
174        :param url_path_pattern: A regex pattern that will be applied to the path portion of the URL.
175        :param params: Dictionary of query parameters that must be present in the request.
176        :param headers: Dictionary of headers that must be present (header keys are compared case-insensitively).
177        """
178        self._method = method.upper() if method else None
179
180        # Normalize the url_base if provided: remove trailing slash.
181        self._url_base = url_base.rstrip("/") if url_base else None
182
183        # Compile the URL path pattern if provided.
184        self._url_path_pattern = re.compile(url_path_pattern) if url_path_pattern else None
185
186        # Normalize query parameters to strings.
187        self._params = {str(k): str(v) for k, v in (params or {}).items()}
188
189        # Normalize header keys to lowercase.
190        self._headers = {str(k).lower(): str(v) for k, v in (headers or {}).items()}
Parameters
  • method: HTTP method (e.g. "GET", "POST"); compared case-insensitively.
  • url_base: Base URL (scheme: //host) that must match.
  • url_path_pattern: A regex pattern that will be applied to the path portion of the URL.
  • params: Dictionary of query parameters that must be present in the request.
  • headers: Dictionary of headers that must be present (header keys are compared case-insensitively).
class BaseCallRatePolicy(AbstractCallRatePolicy, abc.ABC):
254class BaseCallRatePolicy(AbstractCallRatePolicy, abc.ABC):
255    def __init__(self, matchers: list[RequestMatcher]):
256        self._matchers = matchers
257
258    def matches(self, request: Any) -> bool:
259        """Tell if this policy matches specific request and should apply to it
260
261        :param request:
262        :return: True if policy should apply to this request, False - otherwise
263        """
264
265        if not self._matchers:
266            return True
267        return any(matcher(request) for matcher in self._matchers)

Call rate policy interface. Should be configurable with different rules, like N per M for endpoint X. Endpoint X is matched with APIBudget.

def matches(self, request: Any) -> bool:
258    def matches(self, request: Any) -> bool:
259        """Tell if this policy matches specific request and should apply to it
260
261        :param request:
262        :return: True if policy should apply to this request, False - otherwise
263        """
264
265        if not self._matchers:
266            return True
267        return any(matcher(request) for matcher in self._matchers)

Tell if this policy matches specific request and should apply to it

Parameters
  • request:
Returns

True if policy should apply to this request, False - otherwise

class UnlimitedCallRatePolicy(BaseCallRatePolicy):
270class UnlimitedCallRatePolicy(BaseCallRatePolicy):
271    """
272    This policy is for explicit unlimited call rates.
273    It can be used when we want to match a specific group of requests and don't apply any limits.
274
275    Example:
276
277    APICallBudget(
278        [
279            UnlimitedCallRatePolicy(
280                matchers=[HttpRequestMatcher(url="/some/method", headers={"sandbox": true})],
281            ),
282            FixedWindowCallRatePolicy(
283                matchers=[HttpRequestMatcher(url="/some/method")],
284                next_reset_ts=datetime.now(),
285                period=timedelta(hours=1)
286                call_limit=1000,
287            ),
288        ]
289    )
290
291    The code above will limit all calls to /some/method except calls that have header sandbox=True
292    """
293
294    def try_acquire(self, request: Any, weight: int) -> None:
295        """Do nothing"""
296
297    def update(
298        self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime]
299    ) -> None:
300        """Do nothing"""

This policy is for explicit unlimited call rates. It can be used when we want to match a specific group of requests and don't apply any limits.

Example:

APICallBudget( [ UnlimitedCallRatePolicy( matchers=[HttpRequestMatcher(url="/some/method", headers={"sandbox": true})], ), FixedWindowCallRatePolicy( matchers=[HttpRequestMatcher(url="/some/method")], next_reset_ts=datetime.now(), period=timedelta(hours=1) call_limit=1000, ), ] )

The code above will limit all calls to /some/method except calls that have header sandbox=True

def try_acquire(self, request: Any, weight: int) -> None:
294    def try_acquire(self, request: Any, weight: int) -> None:
295        """Do nothing"""

Do nothing

def update( self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime]) -> None:
297    def update(
298        self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime]
299    ) -> None:
300        """Do nothing"""

Do nothing

class FixedWindowCallRatePolicy(BaseCallRatePolicy):
303class FixedWindowCallRatePolicy(BaseCallRatePolicy):
304    def __init__(
305        self,
306        next_reset_ts: datetime.datetime,
307        period: timedelta,
308        call_limit: int,
309        matchers: list[RequestMatcher],
310    ):
311        """A policy that allows {call_limit} calls within a {period} time interval
312
313        :param next_reset_ts: next call rate reset time point
314        :param period: call rate reset period
315        :param call_limit:
316        :param matchers:
317        """
318
319        self._next_reset_ts = next_reset_ts
320        self._offset = period
321        self._call_limit = call_limit
322        self._calls_num = 0
323        self._lock = RLock()
324        super().__init__(matchers=matchers)
325
326    def try_acquire(self, request: Any, weight: int) -> None:
327        if weight > self._call_limit:
328            raise ValueError("Weight can not exceed the call limit")
329        if not self.matches(request):
330            raise ValueError("Request does not match the policy")
331
332        with self._lock:
333            self._update_current_window()
334
335            if self._calls_num + weight > self._call_limit:
336                reset_in = self._next_reset_ts - datetime.datetime.now()
337                error_message = (
338                    f"reached maximum number of allowed calls {self._call_limit} "
339                    f"per {self._offset} interval, next reset in {reset_in}."
340                )
341                raise CallRateLimitHit(
342                    error=error_message,
343                    item=request,
344                    weight=weight,
345                    rate=f"{self._call_limit} per {self._offset}",
346                    time_to_wait=reset_in,
347                )
348
349            self._calls_num += weight
350
351    def __str__(self) -> str:
352        matcher_str = ", ".join(f"{matcher}" for matcher in self._matchers)
353        return (
354            f"FixedWindowCallRatePolicy(call_limit={self._call_limit}, period={self._offset}, "
355            f"calls_used={self._calls_num}, next_reset={self._next_reset_ts}, "
356            f"matchers=[{matcher_str}])"
357        )
358
359    def update(
360        self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime]
361    ) -> None:
362        """Update call rate counters, by default, only reacts to decreasing updates of available_calls and changes to call_reset_ts.
363        We ignore updates with available_calls > current_available_calls to support call rate limits that are lower than API limits.
364
365        :param available_calls:
366        :param call_reset_ts:
367        """
368        with self._lock:
369            self._update_current_window()
370            current_available_calls = self._call_limit - self._calls_num
371
372            if available_calls is not None and current_available_calls > available_calls:
373                logger.debug(
374                    "got rate limit update from api, adjusting available calls from %s to %s",
375                    current_available_calls,
376                    available_calls,
377                )
378                self._calls_num = self._call_limit - available_calls
379
380            if call_reset_ts is not None and call_reset_ts != self._next_reset_ts:
381                logger.debug(
382                    "got rate limit update from api, adjusting reset time from %s to %s",
383                    self._next_reset_ts,
384                    call_reset_ts,
385                )
386                self._next_reset_ts = call_reset_ts
387
388    def _update_current_window(self) -> None:
389        now = datetime.datetime.now()
390        if now > self._next_reset_ts:
391            logger.debug("started new window, %s calls available now", self._call_limit)
392            self._next_reset_ts = self._next_reset_ts + self._offset
393            self._calls_num = 0

Call rate policy interface. Should be configurable with different rules, like N per M for endpoint X. Endpoint X is matched with APIBudget.

FixedWindowCallRatePolicy( next_reset_ts: datetime.datetime, period: datetime.timedelta, call_limit: int, matchers: list[RequestMatcher])
304    def __init__(
305        self,
306        next_reset_ts: datetime.datetime,
307        period: timedelta,
308        call_limit: int,
309        matchers: list[RequestMatcher],
310    ):
311        """A policy that allows {call_limit} calls within a {period} time interval
312
313        :param next_reset_ts: next call rate reset time point
314        :param period: call rate reset period
315        :param call_limit:
316        :param matchers:
317        """
318
319        self._next_reset_ts = next_reset_ts
320        self._offset = period
321        self._call_limit = call_limit
322        self._calls_num = 0
323        self._lock = RLock()
324        super().__init__(matchers=matchers)

A policy that allows {call_limit} calls within a {period} time interval

Parameters
  • next_reset_ts: next call rate reset time point
  • period: call rate reset period
  • call_limit:
  • matchers:
def try_acquire(self, request: Any, weight: int) -> None:
326    def try_acquire(self, request: Any, weight: int) -> None:
327        if weight > self._call_limit:
328            raise ValueError("Weight can not exceed the call limit")
329        if not self.matches(request):
330            raise ValueError("Request does not match the policy")
331
332        with self._lock:
333            self._update_current_window()
334
335            if self._calls_num + weight > self._call_limit:
336                reset_in = self._next_reset_ts - datetime.datetime.now()
337                error_message = (
338                    f"reached maximum number of allowed calls {self._call_limit} "
339                    f"per {self._offset} interval, next reset in {reset_in}."
340                )
341                raise CallRateLimitHit(
342                    error=error_message,
343                    item=request,
344                    weight=weight,
345                    rate=f"{self._call_limit} per {self._offset}",
346                    time_to_wait=reset_in,
347                )
348
349            self._calls_num += weight

Try to acquire request

Parameters
  • request: a request object representing a single call to API
  • weight: number of requests to deduct from credit
Returns
def update( self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime]) -> None:
359    def update(
360        self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime]
361    ) -> None:
362        """Update call rate counters, by default, only reacts to decreasing updates of available_calls and changes to call_reset_ts.
363        We ignore updates with available_calls > current_available_calls to support call rate limits that are lower than API limits.
364
365        :param available_calls:
366        :param call_reset_ts:
367        """
368        with self._lock:
369            self._update_current_window()
370            current_available_calls = self._call_limit - self._calls_num
371
372            if available_calls is not None and current_available_calls > available_calls:
373                logger.debug(
374                    "got rate limit update from api, adjusting available calls from %s to %s",
375                    current_available_calls,
376                    available_calls,
377                )
378                self._calls_num = self._call_limit - available_calls
379
380            if call_reset_ts is not None and call_reset_ts != self._next_reset_ts:
381                logger.debug(
382                    "got rate limit update from api, adjusting reset time from %s to %s",
383                    self._next_reset_ts,
384                    call_reset_ts,
385                )
386                self._next_reset_ts = call_reset_ts

Update call rate counters, by default, only reacts to decreasing updates of available_calls and changes to call_reset_ts. We ignore updates with available_calls > current_available_calls to support call rate limits that are lower than API limits.

Parameters
  • available_calls:
  • call_reset_ts:
Inherited Members
BaseCallRatePolicy
matches
class MovingWindowCallRatePolicy(BaseCallRatePolicy):
396class MovingWindowCallRatePolicy(BaseCallRatePolicy):
397    """
398    Policy to control requests rate implemented on top of PyRateLimiter lib.
399    The main difference between this policy and FixedWindowCallRatePolicy is that the rate-limiting window
400    is moving along requests that we made, and there is no moment when we reset an available number of calls.
401    This strategy requires saving of timestamps of all requests within a window.
402    """
403
404    def __init__(self, rates: list[Rate], matchers: list[RequestMatcher]):
405        """Constructor
406
407        :param rates: list of rates, the order is important and must be ascending
408        :param matchers:
409        """
410        if not rates:
411            raise ValueError("The list of rates can not be empty")
412        pyrate_rates = [
413            PyRateRate(limit=rate.limit, interval=int(rate.interval.total_seconds() * 1000))
414            for rate in rates
415        ]
416        self._bucket = InMemoryBucket(pyrate_rates)
417        # Limiter will create the background task that clears old requests in the bucket
418        self._limiter = Limiter(self._bucket)
419        super().__init__(matchers=matchers)
420
421    def try_acquire(self, request: Any, weight: int) -> None:
422        if not self.matches(request):
423            raise ValueError("Request does not match the policy")
424
425        try:
426            self._limiter.try_acquire(request, weight=weight)
427        except BucketFullException as exc:
428            item = self._limiter.bucket_factory.wrap_item(request, weight)
429            assert isinstance(item, RateItem)
430
431            with self._limiter.lock:
432                time_to_wait = self._bucket.waiting(item)
433                assert isinstance(time_to_wait, int)
434
435                raise CallRateLimitHit(
436                    error=str(exc.meta_info["error"]),
437                    item=request,
438                    weight=int(exc.meta_info["weight"]),
439                    rate=str(exc.meta_info["rate"]),
440                    time_to_wait=timedelta(milliseconds=time_to_wait),
441                )
442
443    def update(
444        self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime]
445    ) -> None:
446        """Adjust call bucket to reflect the state of the API server
447
448        :param available_calls:
449        :param call_reset_ts:
450        :return:
451        """
452        if (
453            available_calls is not None and call_reset_ts is None
454        ):  # we do our best to sync buckets with API
455            if available_calls == 0:
456                with self._limiter.lock:
457                    items_to_add = self._bucket.count() < self._bucket.rates[0].limit
458                    if items_to_add > 0:
459                        now: int = TimeClock().now()  # type: ignore[no-untyped-call]
460                        self._bucket.put(RateItem(name="dummy", timestamp=now, weight=items_to_add))
461        # TODO: add support if needed, it might be that it is not possible to make a good solution for this case
462        # if available_calls is not None and call_reset_ts is not None:
463        #     ts = call_reset_ts.timestamp()
464
465    def __str__(self) -> str:
466        """Return a human-friendly description of the moving window rate policy for logging purposes."""
467        rates_info = ", ".join(
468            f"{rate.limit} per {timedelta(milliseconds=rate.interval)}"
469            for rate in self._bucket.rates
470        )
471        current_bucket_count = self._bucket.count()
472        matcher_str = ", ".join(f"{matcher}" for matcher in self._matchers)
473        return (
474            f"MovingWindowCallRatePolicy(rates=[{rates_info}], current_bucket_count={current_bucket_count}, "
475            f"matchers=[{matcher_str}])"
476        )

Policy to control requests rate implemented on top of PyRateLimiter lib. The main difference between this policy and FixedWindowCallRatePolicy is that the rate-limiting window is moving along requests that we made, and there is no moment when we reset an available number of calls. This strategy requires saving of timestamps of all requests within a window.

MovingWindowCallRatePolicy( rates: list[Rate], matchers: list[RequestMatcher])
404    def __init__(self, rates: list[Rate], matchers: list[RequestMatcher]):
405        """Constructor
406
407        :param rates: list of rates, the order is important and must be ascending
408        :param matchers:
409        """
410        if not rates:
411            raise ValueError("The list of rates can not be empty")
412        pyrate_rates = [
413            PyRateRate(limit=rate.limit, interval=int(rate.interval.total_seconds() * 1000))
414            for rate in rates
415        ]
416        self._bucket = InMemoryBucket(pyrate_rates)
417        # Limiter will create the background task that clears old requests in the bucket
418        self._limiter = Limiter(self._bucket)
419        super().__init__(matchers=matchers)

Constructor

Parameters
  • rates: list of rates, the order is important and must be ascending
  • matchers:
def try_acquire(self, request: Any, weight: int) -> None:
421    def try_acquire(self, request: Any, weight: int) -> None:
422        if not self.matches(request):
423            raise ValueError("Request does not match the policy")
424
425        try:
426            self._limiter.try_acquire(request, weight=weight)
427        except BucketFullException as exc:
428            item = self._limiter.bucket_factory.wrap_item(request, weight)
429            assert isinstance(item, RateItem)
430
431            with self._limiter.lock:
432                time_to_wait = self._bucket.waiting(item)
433                assert isinstance(time_to_wait, int)
434
435                raise CallRateLimitHit(
436                    error=str(exc.meta_info["error"]),
437                    item=request,
438                    weight=int(exc.meta_info["weight"]),
439                    rate=str(exc.meta_info["rate"]),
440                    time_to_wait=timedelta(milliseconds=time_to_wait),
441                )

Try to acquire request

Parameters
  • request: a request object representing a single call to API
  • weight: number of requests to deduct from credit
Returns
def update( self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime]) -> None:
443    def update(
444        self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime]
445    ) -> None:
446        """Adjust call bucket to reflect the state of the API server
447
448        :param available_calls:
449        :param call_reset_ts:
450        :return:
451        """
452        if (
453            available_calls is not None and call_reset_ts is None
454        ):  # we do our best to sync buckets with API
455            if available_calls == 0:
456                with self._limiter.lock:
457                    items_to_add = self._bucket.count() < self._bucket.rates[0].limit
458                    if items_to_add > 0:
459                        now: int = TimeClock().now()  # type: ignore[no-untyped-call]
460                        self._bucket.put(RateItem(name="dummy", timestamp=now, weight=items_to_add))
461        # TODO: add support if needed, it might be that it is not possible to make a good solution for this case
462        # if available_calls is not None and call_reset_ts is not None:
463        #     ts = call_reset_ts.timestamp()

Adjust call bucket to reflect the state of the API server

Parameters
  • available_calls:
  • call_reset_ts:
Returns
Inherited Members
BaseCallRatePolicy
matches
class AbstractAPIBudget(abc.ABC):
479class AbstractAPIBudget(abc.ABC):
480    """Interface to some API where a client allowed to have N calls per T interval.
481
482    Important: APIBudget is not doing any API calls, the end user code is responsible to call this interface
483        to respect call rate limitation of the API.
484
485    It supports multiple policies applied to different group of requests. To distinct these groups we use RequestMatchers.
486    Individual policy represented by MovingWindowCallRatePolicy and currently supports only moving window strategy.
487    """
488
489    @abc.abstractmethod
490    def acquire_call(
491        self, request: Any, block: bool = True, timeout: Optional[float] = None
492    ) -> None:
493        """Try to get a call from budget, will block by default
494
495        :param request:
496        :param block: when true (default) will block the current thread until call credit is available
497        :param timeout: if set will limit maximum time in block, otherwise will wait until credit is available
498        :raises: CallRateLimitHit - when no credits left and if timeout was set the waiting time exceed the timeout
499        """
500
501    @abc.abstractmethod
502    def get_matching_policy(self, request: Any) -> Optional[AbstractCallRatePolicy]:
503        """Find matching call rate policy for specific request"""
504
505    @abc.abstractmethod
506    def update_from_response(self, request: Any, response: Any) -> None:
507        """Update budget information based on response from API
508
509        :param request: the initial request that triggered this response
510        :param response: response from the API
511        """

Interface to some API where a client allowed to have N calls per T interval.

Important: APIBudget is not doing any API calls, the end user code is responsible to call this interface to respect call rate limitation of the API.

It supports multiple policies applied to different group of requests. To distinct these groups we use RequestMatchers. Individual policy represented by MovingWindowCallRatePolicy and currently supports only moving window strategy.

@abc.abstractmethod
def acquire_call( self, request: Any, block: bool = True, timeout: Optional[float] = None) -> None:
489    @abc.abstractmethod
490    def acquire_call(
491        self, request: Any, block: bool = True, timeout: Optional[float] = None
492    ) -> None:
493        """Try to get a call from budget, will block by default
494
495        :param request:
496        :param block: when true (default) will block the current thread until call credit is available
497        :param timeout: if set will limit maximum time in block, otherwise will wait until credit is available
498        :raises: CallRateLimitHit - when no credits left and if timeout was set the waiting time exceed the timeout
499        """

Try to get a call from budget, will block by default

Parameters
  • request:
  • block: when true (default) will block the current thread until call credit is available
  • timeout: if set will limit maximum time in block, otherwise will wait until credit is available
Raises
  • CallRateLimitHit - when no credits left and if timeout was set the waiting time exceed the timeout
@abc.abstractmethod
def get_matching_policy( self, request: Any) -> Optional[AbstractCallRatePolicy]:
501    @abc.abstractmethod
502    def get_matching_policy(self, request: Any) -> Optional[AbstractCallRatePolicy]:
503        """Find matching call rate policy for specific request"""

Find matching call rate policy for specific request

@abc.abstractmethod
def update_from_response(self, request: Any, response: Any) -> None:
505    @abc.abstractmethod
506    def update_from_response(self, request: Any, response: Any) -> None:
507        """Update budget information based on response from API
508
509        :param request: the initial request that triggered this response
510        :param response: response from the API
511        """

Update budget information based on response from API

Parameters
  • request: the initial request that triggered this response
  • response: response from the API
class APIBudget(AbstractAPIBudget):
514class APIBudget(AbstractAPIBudget):
515    """Default APIBudget implementation"""
516
517    def __init__(
518        self, policies: list[AbstractCallRatePolicy], maximum_attempts_to_acquire: int = 100000
519    ) -> None:
520        """Constructor
521
522        :param policies: list of policies in this budget
523        :param maximum_attempts_to_acquire: number of attempts before throwing hit ratelimit exception, we put some big number here
524         to avoid situations when many threads compete with each other for a few lots over a significant amount of time
525        """
526
527        self._policies = policies
528        self._maximum_attempts_to_acquire = maximum_attempts_to_acquire
529
530    def _extract_endpoint(self, request: Any) -> str:
531        """Extract the endpoint URL from the request if available."""
532        endpoint = None
533        try:
534            # If the request is already a PreparedRequest, it should have a URL.
535            if isinstance(request, requests.PreparedRequest):
536                endpoint = request.url
537            # If it's a requests.Request, we call prepare() to extract the URL.
538            elif isinstance(request, requests.Request):
539                prepared = request.prepare()
540                endpoint = prepared.url
541        except Exception as e:
542            logger.debug(f"Error extracting endpoint: {e}")
543        if endpoint:
544            return endpoint
545        return "unknown endpoint"
546
547    def get_matching_policy(self, request: Any) -> Optional[AbstractCallRatePolicy]:
548        for policy in self._policies:
549            if policy.matches(request):
550                return policy
551        return None
552
553    def acquire_call(
554        self, request: Any, block: bool = True, timeout: Optional[float] = None
555    ) -> None:
556        """Try to get a call from budget, will block by default.
557        Matchers will be called sequentially in the same order they were added.
558        The first matcher that returns True will
559
560        :param request: the API request
561        :param block: when True (default) will block until a call credit is available
562        :param timeout: if provided, limits maximum waiting time; otherwise, waits indefinitely
563        :raises: CallRateLimitHit if the call credit cannot be acquired within the timeout
564        """
565
566        policy = self.get_matching_policy(request)
567        endpoint = self._extract_endpoint(request)
568        if policy:
569            logger.debug(f"Acquiring call for endpoint {endpoint} using policy: {policy}")
570            self._do_acquire(request=request, policy=policy, block=block, timeout=timeout)
571        elif self._policies:
572            logger.debug(
573                f"No policies matched for endpoint {endpoint} (request: {request}). Allowing call by default."
574            )
575
576    def update_from_response(self, request: Any, response: Any) -> None:
577        """Update budget information based on the API response.
578
579        :param request: the initial request that triggered this response
580        :param response: response from the API
581        """
582        pass
583
584    def _do_acquire(
585        self, request: Any, policy: AbstractCallRatePolicy, block: bool, timeout: Optional[float]
586    ) -> None:
587        """Internal method to try to acquire a call credit.
588
589        :param request: the API request
590        :param policy: the matching rate-limiting policy
591        :param block: indicates whether to block until a call credit is available
592        :param timeout: maximum time to wait if blocking
593        :raises: CallRateLimitHit if unable to acquire a call credit
594        """
595        last_exception = None
596        endpoint = self._extract_endpoint(request)
597        # sometimes we spend all budget before a second attempt, so we have a few more attempts
598        for attempt in range(1, self._maximum_attempts_to_acquire):
599            try:
600                policy.try_acquire(request, weight=1)
601                return
602            except CallRateLimitHit as exc:
603                last_exception = exc
604                if block:
605                    if timeout is not None:
606                        time_to_wait = min(timedelta(seconds=timeout), exc.time_to_wait)
607                    else:
608                        time_to_wait = exc.time_to_wait
609                    # Ensure we never sleep for a negative duration.
610                    time_to_wait = max(timedelta(0), time_to_wait)
611                    logger.debug(
612                        f"Policy {policy} reached call limit for endpoint {endpoint} ({exc.rate}). "
613                        f"Sleeping for {time_to_wait} on attempt {attempt}."
614                    )
615                    time.sleep(time_to_wait.total_seconds())
616                else:
617                    logger.debug(
618                        f"Policy {policy} reached call limit for endpoint {endpoint} ({exc.rate}) "
619                        f"and blocking is disabled."
620                    )
621                    raise
622
623        if last_exception:
624            logger.debug(
625                f"Exhausted all {self._maximum_attempts_to_acquire} attempts to acquire a call for endpoint {endpoint} "
626                f"using policy: {policy}"
627            )
628            raise last_exception

Default APIBudget implementation

APIBudget( policies: list[AbstractCallRatePolicy], maximum_attempts_to_acquire: int = 100000)
517    def __init__(
518        self, policies: list[AbstractCallRatePolicy], maximum_attempts_to_acquire: int = 100000
519    ) -> None:
520        """Constructor
521
522        :param policies: list of policies in this budget
523        :param maximum_attempts_to_acquire: number of attempts before throwing hit ratelimit exception, we put some big number here
524         to avoid situations when many threads compete with each other for a few lots over a significant amount of time
525        """
526
527        self._policies = policies
528        self._maximum_attempts_to_acquire = maximum_attempts_to_acquire

Constructor

Parameters
  • policies: list of policies in this budget
  • maximum_attempts_to_acquire: number of attempts before throwing hit ratelimit exception, we put some big number here to avoid situations when many threads compete with each other for a few lots over a significant amount of time
def get_matching_policy( self, request: Any) -> Optional[AbstractCallRatePolicy]:
547    def get_matching_policy(self, request: Any) -> Optional[AbstractCallRatePolicy]:
548        for policy in self._policies:
549            if policy.matches(request):
550                return policy
551        return None

Find matching call rate policy for specific request

def acquire_call( self, request: Any, block: bool = True, timeout: Optional[float] = None) -> None:
553    def acquire_call(
554        self, request: Any, block: bool = True, timeout: Optional[float] = None
555    ) -> None:
556        """Try to get a call from budget, will block by default.
557        Matchers will be called sequentially in the same order they were added.
558        The first matcher that returns True will
559
560        :param request: the API request
561        :param block: when True (default) will block until a call credit is available
562        :param timeout: if provided, limits maximum waiting time; otherwise, waits indefinitely
563        :raises: CallRateLimitHit if the call credit cannot be acquired within the timeout
564        """
565
566        policy = self.get_matching_policy(request)
567        endpoint = self._extract_endpoint(request)
568        if policy:
569            logger.debug(f"Acquiring call for endpoint {endpoint} using policy: {policy}")
570            self._do_acquire(request=request, policy=policy, block=block, timeout=timeout)
571        elif self._policies:
572            logger.debug(
573                f"No policies matched for endpoint {endpoint} (request: {request}). Allowing call by default."
574            )

Try to get a call from budget, will block by default. Matchers will be called sequentially in the same order they were added. The first matcher that returns True will

Parameters
  • request: the API request
  • block: when True (default) will block until a call credit is available
  • timeout: if provided, limits maximum waiting time; otherwise, waits indefinitely
Raises
  • CallRateLimitHit if the call credit cannot be acquired within the timeout
def update_from_response(self, request: Any, response: Any) -> None:
576    def update_from_response(self, request: Any, response: Any) -> None:
577        """Update budget information based on the API response.
578
579        :param request: the initial request that triggered this response
580        :param response: response from the API
581        """
582        pass

Update budget information based on the API response.

Parameters
  • request: the initial request that triggered this response
  • response: response from the API
class HttpAPIBudget(APIBudget):
631class HttpAPIBudget(APIBudget):
632    """Implementation of AbstractAPIBudget for HTTP"""
633
634    def __init__(
635        self,
636        ratelimit_reset_header: str = "ratelimit-reset",
637        ratelimit_remaining_header: str = "ratelimit-remaining",
638        status_codes_for_ratelimit_hit: list[int] = [429],
639        **kwargs: Any,
640    ):
641        """Constructor
642
643        :param ratelimit_reset_header: name of the header that has a timestamp of the next reset of call budget
644        :param ratelimit_remaining_header: name of the header that has the number of calls left
645        :param status_codes_for_ratelimit_hit: list of HTTP status codes that signal about rate limit being hit
646        """
647        self._ratelimit_reset_header = ratelimit_reset_header
648        self._ratelimit_remaining_header = ratelimit_remaining_header
649        self._status_codes_for_ratelimit_hit = status_codes_for_ratelimit_hit
650        super().__init__(**kwargs)
651
652    def update_from_response(self, request: Any, response: Any) -> None:
653        policy = self.get_matching_policy(request)
654        if not policy:
655            return
656
657        if isinstance(response, requests.Response):
658            available_calls = self.get_calls_left_from_response(response)
659            reset_ts = self.get_reset_ts_from_response(response)
660            policy.update(available_calls=available_calls, call_reset_ts=reset_ts)
661
662    def get_reset_ts_from_response(
663        self, response: requests.Response
664    ) -> Optional[datetime.datetime]:
665        if response.headers.get(self._ratelimit_reset_header):
666            return datetime.datetime.fromtimestamp(
667                int(response.headers[self._ratelimit_reset_header])
668            )
669        return None
670
671    def get_calls_left_from_response(self, response: requests.Response) -> Optional[int]:
672        if response.headers.get(self._ratelimit_remaining_header):
673            return int(response.headers[self._ratelimit_remaining_header])
674
675        if response.status_code in self._status_codes_for_ratelimit_hit:
676            return 0
677
678        return None

Implementation of AbstractAPIBudget for HTTP

HttpAPIBudget( ratelimit_reset_header: str = 'ratelimit-reset', ratelimit_remaining_header: str = 'ratelimit-remaining', status_codes_for_ratelimit_hit: list[int] = [429], **kwargs: Any)
634    def __init__(
635        self,
636        ratelimit_reset_header: str = "ratelimit-reset",
637        ratelimit_remaining_header: str = "ratelimit-remaining",
638        status_codes_for_ratelimit_hit: list[int] = [429],
639        **kwargs: Any,
640    ):
641        """Constructor
642
643        :param ratelimit_reset_header: name of the header that has a timestamp of the next reset of call budget
644        :param ratelimit_remaining_header: name of the header that has the number of calls left
645        :param status_codes_for_ratelimit_hit: list of HTTP status codes that signal about rate limit being hit
646        """
647        self._ratelimit_reset_header = ratelimit_reset_header
648        self._ratelimit_remaining_header = ratelimit_remaining_header
649        self._status_codes_for_ratelimit_hit = status_codes_for_ratelimit_hit
650        super().__init__(**kwargs)

Constructor

Parameters
  • ratelimit_reset_header: name of the header that has a timestamp of the next reset of call budget
  • ratelimit_remaining_header: name of the header that has the number of calls left
  • status_codes_for_ratelimit_hit: list of HTTP status codes that signal about rate limit being hit
def update_from_response(self, request: Any, response: Any) -> None:
652    def update_from_response(self, request: Any, response: Any) -> None:
653        policy = self.get_matching_policy(request)
654        if not policy:
655            return
656
657        if isinstance(response, requests.Response):
658            available_calls = self.get_calls_left_from_response(response)
659            reset_ts = self.get_reset_ts_from_response(response)
660            policy.update(available_calls=available_calls, call_reset_ts=reset_ts)

Update budget information based on the API response.

Parameters
  • request: the initial request that triggered this response
  • response: response from the API
def get_reset_ts_from_response(self, response: requests.models.Response) -> Optional[datetime.datetime]:
662    def get_reset_ts_from_response(
663        self, response: requests.Response
664    ) -> Optional[datetime.datetime]:
665        if response.headers.get(self._ratelimit_reset_header):
666            return datetime.datetime.fromtimestamp(
667                int(response.headers[self._ratelimit_reset_header])
668            )
669        return None
def get_calls_left_from_response(self, response: requests.models.Response) -> Optional[int]:
671    def get_calls_left_from_response(self, response: requests.Response) -> Optional[int]:
672        if response.headers.get(self._ratelimit_remaining_header):
673            return int(response.headers[self._ratelimit_remaining_header])
674
675        if response.status_code in self._status_codes_for_ratelimit_hit:
676            return 0
677
678        return None
class LimiterMixin:
681class LimiterMixin(MIXIN_BASE):
682    """Mixin class that adds rate-limiting behavior to requests."""
683
684    def __init__(
685        self,
686        api_budget: AbstractAPIBudget,
687        **kwargs: Any,
688    ):
689        self._api_budget = api_budget
690        super().__init__(**kwargs)  # type: ignore # Base Session doesn't take any kwargs
691
692    def send(self, request: requests.PreparedRequest, **kwargs: Any) -> requests.Response:
693        """Send a request with rate-limiting."""
694        self._api_budget.acquire_call(request)
695        response = super().send(request, **kwargs)
696        self._api_budget.update_from_response(request, response)
697        return response

Mixin class that adds rate-limiting behavior to requests.

LimiterMixin( api_budget: AbstractAPIBudget, **kwargs: Any)
684    def __init__(
685        self,
686        api_budget: AbstractAPIBudget,
687        **kwargs: Any,
688    ):
689        self._api_budget = api_budget
690        super().__init__(**kwargs)  # type: ignore # Base Session doesn't take any kwargs
def send( self, request: requests.models.PreparedRequest, **kwargs: Any) -> requests.models.Response:
692    def send(self, request: requests.PreparedRequest, **kwargs: Any) -> requests.Response:
693        """Send a request with rate-limiting."""
694        self._api_budget.acquire_call(request)
695        response = super().send(request, **kwargs)
696        self._api_budget.update_from_response(request, response)
697        return response

Send a request with rate-limiting.

class LimiterSession(LimiterMixin, requests.sessions.Session):
700class LimiterSession(LimiterMixin, requests.Session):
701    """Session that adds rate-limiting behavior to requests."""

Session that adds rate-limiting behavior to requests.

Inherited Members
LimiterMixin
LimiterMixin
send
class CachedLimiterSession(requests_cache.session.CacheMixin, LimiterMixin, requests.sessions.Session):
704class CachedLimiterSession(requests_cache.CacheMixin, LimiterMixin, requests.Session):
705    """Session class with caching and rate-limiting behavior."""

Session class with caching and rate-limiting behavior.