airbyte_cdk.sources.streams.call_rate
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import abc 6import dataclasses 7import datetime 8import logging 9import re 10import time 11from datetime import timedelta 12from threading import RLock 13from typing import TYPE_CHECKING, Any, Mapping, Optional 14from urllib import parse 15 16import requests 17import requests_cache 18from pyrate_limiter import InMemoryBucket, Limiter, RateItem, TimeClock 19from pyrate_limiter import Rate as PyRateRate 20from pyrate_limiter.exceptions import BucketFullException 21 22# prevents mypy from complaining about missing session attributes in LimiterMixin 23if TYPE_CHECKING: 24 MIXIN_BASE = requests.Session 25else: 26 MIXIN_BASE = object 27 28logger = logging.getLogger("airbyte") 29logging.getLogger("pyrate_limiter").setLevel(logging.WARNING) 30 31 32@dataclasses.dataclass 33class Rate: 34 """Call rate limit""" 35 36 limit: int 37 interval: timedelta 38 39 40class CallRateLimitHit(Exception): 41 def __init__(self, error: str, item: Any, weight: int, rate: str, time_to_wait: timedelta): 42 """Constructor 43 44 :param error: error message 45 :param item: object passed into acquire_call 46 :param weight: how many credits were requested 47 :param rate: string representation of the rate violated 48 :param time_to_wait: how long should wait util more call will be available 49 """ 50 self.item = item 51 self.weight = weight 52 self.rate = rate 53 self.time_to_wait = time_to_wait 54 super().__init__(error) 55 56 57class AbstractCallRatePolicy(abc.ABC): 58 """Call rate policy interface. 59 Should be configurable with different rules, like N per M for endpoint X. Endpoint X is matched with APIBudget. 60 """ 61 62 @abc.abstractmethod 63 def matches(self, request: Any) -> bool: 64 """Tells if this policy matches specific request and should apply to it 65 66 :param request: 67 :return: True if policy should apply to this request, False - otherwise 68 """ 69 70 @abc.abstractmethod 71 def try_acquire(self, request: Any, weight: int) -> None: 72 """Try to acquire request 73 74 :param request: a request object representing a single call to API 75 :param weight: number of requests to deduct from credit 76 :return: 77 """ 78 79 @abc.abstractmethod 80 def update( 81 self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime] 82 ) -> None: 83 """Update call rate counting with current values 84 85 :param available_calls: 86 :param call_reset_ts: 87 """ 88 89 90class RequestMatcher(abc.ABC): 91 """Callable that help to match a request object with call rate policies.""" 92 93 @abc.abstractmethod 94 def __call__(self, request: Any) -> bool: 95 """ 96 97 :param request: 98 :return: True if matches the provided request object, False - otherwise 99 """ 100 101 102class HttpRequestMatcher(RequestMatcher): 103 """Simple implementation of RequestMatcher for HTTP requests using HttpRequestRegexMatcher under the hood.""" 104 105 def __init__( 106 self, 107 method: Optional[str] = None, 108 url: Optional[str] = None, 109 params: Optional[Mapping[str, Any]] = None, 110 headers: Optional[Mapping[str, Any]] = None, 111 ): 112 """Constructor 113 114 :param method: HTTP method (e.g., "GET", "POST"). 115 :param url: Full URL to match. 116 :param params: Dictionary of query parameters to match. 117 :param headers: Dictionary of headers to match. 118 """ 119 # Parse the URL to extract the base and path 120 if url: 121 parsed_url = parse.urlsplit(url) 122 url_base = f"{parsed_url.scheme}://{parsed_url.netloc}" 123 url_path = parsed_url.path if parsed_url.path != "/" else None 124 else: 125 url_base = None 126 url_path = None 127 128 # Use HttpRequestRegexMatcher under the hood 129 self._regex_matcher = HttpRequestRegexMatcher( 130 method=method, 131 url_base=url_base, 132 url_path_pattern=re.escape(url_path) if url_path else None, 133 params=params, 134 headers=headers, 135 ) 136 137 def __call__(self, request: Any) -> bool: 138 """ 139 :param request: A requests.Request or requests.PreparedRequest instance. 140 :return: True if the request matches all provided criteria; False otherwise. 141 """ 142 return self._regex_matcher(request) 143 144 def __str__(self) -> str: 145 return ( 146 f"HttpRequestMatcher(method={self._regex_matcher._method}, " 147 f"url={self._regex_matcher._url_base}{self._regex_matcher._url_path_pattern.pattern if self._regex_matcher._url_path_pattern else ''}, " 148 f"params={self._regex_matcher._params}, headers={self._regex_matcher._headers})" 149 ) 150 151 152class HttpRequestRegexMatcher(RequestMatcher): 153 """ 154 Extended RequestMatcher for HTTP requests that supports matching on: 155 - HTTP method (case-insensitive) 156 - URL base (scheme + netloc) optionally 157 - URL path pattern (a regex applied to the path portion of the URL) 158 - Query parameters (must be present) 159 - Headers (header names compared case-insensitively) 160 """ 161 162 def __init__( 163 self, 164 method: Optional[str] = None, 165 url_base: Optional[str] = None, 166 url_path_pattern: Optional[str] = None, 167 params: Optional[Mapping[str, Any]] = None, 168 headers: Optional[Mapping[str, Any]] = None, 169 ): 170 """ 171 :param method: HTTP method (e.g. "GET", "POST"); compared case-insensitively. 172 :param url_base: Base URL (scheme://host) that must match. 173 :param url_path_pattern: A regex pattern that will be applied to the path portion of the URL. 174 :param params: Dictionary of query parameters that must be present in the request. 175 :param headers: Dictionary of headers that must be present (header keys are compared case-insensitively). 176 """ 177 self._method = method.upper() if method else None 178 179 # Normalize the url_base if provided: remove trailing slash. 180 self._url_base = url_base.rstrip("/") if url_base else None 181 182 # Compile the URL path pattern if provided. 183 self._url_path_pattern = re.compile(url_path_pattern) if url_path_pattern else None 184 185 # Normalize query parameters to strings. 186 self._params = {str(k): str(v) for k, v in (params or {}).items()} 187 188 # Normalize header keys to lowercase. 189 self._headers = {str(k).lower(): str(v) for k, v in (headers or {}).items()} 190 191 @staticmethod 192 def _match_dict(obj: Mapping[str, Any], pattern: Mapping[str, Any]) -> bool: 193 """Check that every key/value in the pattern exists in the object.""" 194 return pattern.items() <= obj.items() 195 196 def __call__(self, request: Any) -> bool: 197 """ 198 :param request: A requests.Request or requests.PreparedRequest instance. 199 :return: True if the request matches all provided criteria; False otherwise. 200 """ 201 # Prepare the request (if needed) and extract the URL details. 202 if isinstance(request, requests.Request): 203 prepared_request = request.prepare() 204 elif isinstance(request, requests.PreparedRequest): 205 prepared_request = request 206 else: 207 return False 208 209 # Check HTTP method. 210 if self._method is not None: 211 if prepared_request.method != self._method: 212 return False 213 214 # Parse the URL. 215 parsed_url = parse.urlsplit(prepared_request.url) 216 # Reconstruct the base: scheme://netloc 217 request_url_base = f"{str(parsed_url.scheme)}://{str(parsed_url.netloc)}" 218 # The path (without query parameters) 219 request_path = str(parsed_url.path).rstrip("/") 220 221 # If a base URL is provided, check that it matches. 222 if self._url_base is not None: 223 if request_url_base != self._url_base: 224 return False 225 226 # If a URL path pattern is provided, ensure the path matches the regex. 227 if self._url_path_pattern is not None: 228 if not self._url_path_pattern.search(request_path): 229 return False 230 231 # Check query parameters. 232 if self._params: 233 query_params = dict(parse.parse_qsl(str(parsed_url.query))) 234 if not self._match_dict(query_params, self._params): 235 return False 236 237 # Check headers (normalize keys to lower-case). 238 if self._headers: 239 req_headers = {k.lower(): v for k, v in prepared_request.headers.items()} 240 if not self._match_dict(req_headers, self._headers): 241 return False 242 243 return True 244 245 def __str__(self) -> str: 246 regex = self._url_path_pattern.pattern if self._url_path_pattern else None 247 return ( 248 f"HttpRequestRegexMatcher(method={self._method}, url_base={self._url_base}, " 249 f"url_path_pattern={regex}, params={self._params}, headers={self._headers})" 250 ) 251 252 253class BaseCallRatePolicy(AbstractCallRatePolicy, abc.ABC): 254 def __init__(self, matchers: list[RequestMatcher]): 255 self._matchers = matchers 256 257 def matches(self, request: Any) -> bool: 258 """Tell if this policy matches specific request and should apply to it 259 260 :param request: 261 :return: True if policy should apply to this request, False - otherwise 262 """ 263 264 if not self._matchers: 265 return True 266 return any(matcher(request) for matcher in self._matchers) 267 268 269class UnlimitedCallRatePolicy(BaseCallRatePolicy): 270 """ 271 This policy is for explicit unlimited call rates. 272 It can be used when we want to match a specific group of requests and don't apply any limits. 273 274 Example: 275 276 APICallBudget( 277 [ 278 UnlimitedCallRatePolicy( 279 matchers=[HttpRequestMatcher(url="/some/method", headers={"sandbox": true})], 280 ), 281 FixedWindowCallRatePolicy( 282 matchers=[HttpRequestMatcher(url="/some/method")], 283 next_reset_ts=datetime.now(), 284 period=timedelta(hours=1) 285 call_limit=1000, 286 ), 287 ] 288 ) 289 290 The code above will limit all calls to /some/method except calls that have header sandbox=True 291 """ 292 293 def try_acquire(self, request: Any, weight: int) -> None: 294 """Do nothing""" 295 296 def update( 297 self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime] 298 ) -> None: 299 """Do nothing""" 300 301 302class FixedWindowCallRatePolicy(BaseCallRatePolicy): 303 def __init__( 304 self, 305 next_reset_ts: datetime.datetime, 306 period: timedelta, 307 call_limit: int, 308 matchers: list[RequestMatcher], 309 ): 310 """A policy that allows {call_limit} calls within a {period} time interval 311 312 :param next_reset_ts: next call rate reset time point 313 :param period: call rate reset period 314 :param call_limit: 315 :param matchers: 316 """ 317 318 self._next_reset_ts = next_reset_ts 319 self._offset = period 320 self._call_limit = call_limit 321 self._calls_num = 0 322 self._lock = RLock() 323 super().__init__(matchers=matchers) 324 325 def try_acquire(self, request: Any, weight: int) -> None: 326 if weight > self._call_limit: 327 raise ValueError("Weight can not exceed the call limit") 328 if not self.matches(request): 329 raise ValueError("Request does not match the policy") 330 331 with self._lock: 332 self._update_current_window() 333 334 if self._calls_num + weight > self._call_limit: 335 reset_in = self._next_reset_ts - datetime.datetime.now() 336 error_message = ( 337 f"reached maximum number of allowed calls {self._call_limit} " 338 f"per {self._offset} interval, next reset in {reset_in}." 339 ) 340 raise CallRateLimitHit( 341 error=error_message, 342 item=request, 343 weight=weight, 344 rate=f"{self._call_limit} per {self._offset}", 345 time_to_wait=reset_in, 346 ) 347 348 self._calls_num += weight 349 350 def __str__(self) -> str: 351 matcher_str = ", ".join(f"{matcher}" for matcher in self._matchers) 352 return ( 353 f"FixedWindowCallRatePolicy(call_limit={self._call_limit}, period={self._offset}, " 354 f"calls_used={self._calls_num}, next_reset={self._next_reset_ts}, " 355 f"matchers=[{matcher_str}])" 356 ) 357 358 def update( 359 self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime] 360 ) -> None: 361 """Update call rate counters, by default, only reacts to decreasing updates of available_calls and changes to call_reset_ts. 362 We ignore updates with available_calls > current_available_calls to support call rate limits that are lower than API limits. 363 364 :param available_calls: 365 :param call_reset_ts: 366 """ 367 with self._lock: 368 self._update_current_window() 369 current_available_calls = self._call_limit - self._calls_num 370 371 if available_calls is not None and current_available_calls > available_calls: 372 logger.debug( 373 "got rate limit update from api, adjusting available calls from %s to %s", 374 current_available_calls, 375 available_calls, 376 ) 377 self._calls_num = self._call_limit - available_calls 378 379 if call_reset_ts is not None and call_reset_ts != self._next_reset_ts: 380 logger.debug( 381 "got rate limit update from api, adjusting reset time from %s to %s", 382 self._next_reset_ts, 383 call_reset_ts, 384 ) 385 self._next_reset_ts = call_reset_ts 386 387 def _update_current_window(self) -> None: 388 now = datetime.datetime.now() 389 if now > self._next_reset_ts: 390 logger.debug("started new window, %s calls available now", self._call_limit) 391 self._next_reset_ts = self._next_reset_ts + self._offset 392 self._calls_num = 0 393 394 395class MovingWindowCallRatePolicy(BaseCallRatePolicy): 396 """ 397 Policy to control requests rate implemented on top of PyRateLimiter lib. 398 The main difference between this policy and FixedWindowCallRatePolicy is that the rate-limiting window 399 is moving along requests that we made, and there is no moment when we reset an available number of calls. 400 This strategy requires saving of timestamps of all requests within a window. 401 """ 402 403 def __init__(self, rates: list[Rate], matchers: list[RequestMatcher]): 404 """Constructor 405 406 :param rates: list of rates, the order is important and must be ascending 407 :param matchers: 408 """ 409 if not rates: 410 raise ValueError("The list of rates can not be empty") 411 pyrate_rates = [ 412 PyRateRate(limit=rate.limit, interval=int(rate.interval.total_seconds() * 1000)) 413 for rate in rates 414 ] 415 self._bucket = InMemoryBucket(pyrate_rates) 416 # Limiter will create the background task that clears old requests in the bucket 417 self._limiter = Limiter(self._bucket) 418 super().__init__(matchers=matchers) 419 420 def try_acquire(self, request: Any, weight: int) -> None: 421 if not self.matches(request): 422 raise ValueError("Request does not match the policy") 423 424 try: 425 self._limiter.try_acquire(request, weight=weight) 426 except BucketFullException as exc: 427 item = self._limiter.bucket_factory.wrap_item(request, weight) 428 assert isinstance(item, RateItem) 429 430 with self._limiter.lock: 431 time_to_wait = self._bucket.waiting(item) 432 assert isinstance(time_to_wait, int) 433 434 raise CallRateLimitHit( 435 error=str(exc.meta_info["error"]), 436 item=request, 437 weight=int(exc.meta_info["weight"]), 438 rate=str(exc.meta_info["rate"]), 439 time_to_wait=timedelta(milliseconds=time_to_wait), 440 ) 441 442 def update( 443 self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime] 444 ) -> None: 445 """Adjust call bucket to reflect the state of the API server 446 447 :param available_calls: 448 :param call_reset_ts: 449 :return: 450 """ 451 if ( 452 available_calls is not None and call_reset_ts is None 453 ): # we do our best to sync buckets with API 454 if available_calls == 0: 455 with self._limiter.lock: 456 items_to_add = self._bucket.count() < self._bucket.rates[0].limit 457 if items_to_add > 0: 458 now: int = TimeClock().now() # type: ignore[no-untyped-call] 459 self._bucket.put(RateItem(name="dummy", timestamp=now, weight=items_to_add)) 460 # TODO: add support if needed, it might be that it is not possible to make a good solution for this case 461 # if available_calls is not None and call_reset_ts is not None: 462 # ts = call_reset_ts.timestamp() 463 464 def __str__(self) -> str: 465 """Return a human-friendly description of the moving window rate policy for logging purposes.""" 466 rates_info = ", ".join( 467 f"{rate.limit} per {timedelta(milliseconds=rate.interval)}" 468 for rate in self._bucket.rates 469 ) 470 current_bucket_count = self._bucket.count() 471 matcher_str = ", ".join(f"{matcher}" for matcher in self._matchers) 472 return ( 473 f"MovingWindowCallRatePolicy(rates=[{rates_info}], current_bucket_count={current_bucket_count}, " 474 f"matchers=[{matcher_str}])" 475 ) 476 477 478class AbstractAPIBudget(abc.ABC): 479 """Interface to some API where a client allowed to have N calls per T interval. 480 481 Important: APIBudget is not doing any API calls, the end user code is responsible to call this interface 482 to respect call rate limitation of the API. 483 484 It supports multiple policies applied to different group of requests. To distinct these groups we use RequestMatchers. 485 Individual policy represented by MovingWindowCallRatePolicy and currently supports only moving window strategy. 486 """ 487 488 @abc.abstractmethod 489 def acquire_call( 490 self, request: Any, block: bool = True, timeout: Optional[float] = None 491 ) -> None: 492 """Try to get a call from budget, will block by default 493 494 :param request: 495 :param block: when true (default) will block the current thread until call credit is available 496 :param timeout: if set will limit maximum time in block, otherwise will wait until credit is available 497 :raises: CallRateLimitHit - when no credits left and if timeout was set the waiting time exceed the timeout 498 """ 499 500 @abc.abstractmethod 501 def get_matching_policy(self, request: Any) -> Optional[AbstractCallRatePolicy]: 502 """Find matching call rate policy for specific request""" 503 504 @abc.abstractmethod 505 def update_from_response(self, request: Any, response: Any) -> None: 506 """Update budget information based on response from API 507 508 :param request: the initial request that triggered this response 509 :param response: response from the API 510 """ 511 512 513class APIBudget(AbstractAPIBudget): 514 """Default APIBudget implementation""" 515 516 def __init__( 517 self, policies: list[AbstractCallRatePolicy], maximum_attempts_to_acquire: int = 100000 518 ) -> None: 519 """Constructor 520 521 :param policies: list of policies in this budget 522 :param maximum_attempts_to_acquire: number of attempts before throwing hit ratelimit exception, we put some big number here 523 to avoid situations when many threads compete with each other for a few lots over a significant amount of time 524 """ 525 526 self._policies = policies 527 self._maximum_attempts_to_acquire = maximum_attempts_to_acquire 528 529 def _extract_endpoint(self, request: Any) -> str: 530 """Extract the endpoint URL from the request if available.""" 531 endpoint = None 532 try: 533 # If the request is already a PreparedRequest, it should have a URL. 534 if isinstance(request, requests.PreparedRequest): 535 endpoint = request.url 536 # If it's a requests.Request, we call prepare() to extract the URL. 537 elif isinstance(request, requests.Request): 538 prepared = request.prepare() 539 endpoint = prepared.url 540 except Exception as e: 541 logger.debug(f"Error extracting endpoint: {e}") 542 if endpoint: 543 return endpoint 544 return "unknown endpoint" 545 546 def get_matching_policy(self, request: Any) -> Optional[AbstractCallRatePolicy]: 547 for policy in self._policies: 548 if policy.matches(request): 549 return policy 550 return None 551 552 def acquire_call( 553 self, request: Any, block: bool = True, timeout: Optional[float] = None 554 ) -> None: 555 """Try to get a call from budget, will block by default. 556 Matchers will be called sequentially in the same order they were added. 557 The first matcher that returns True will 558 559 :param request: the API request 560 :param block: when True (default) will block until a call credit is available 561 :param timeout: if provided, limits maximum waiting time; otherwise, waits indefinitely 562 :raises: CallRateLimitHit if the call credit cannot be acquired within the timeout 563 """ 564 565 policy = self.get_matching_policy(request) 566 endpoint = self._extract_endpoint(request) 567 if policy: 568 logger.debug(f"Acquiring call for endpoint {endpoint} using policy: {policy}") 569 self._do_acquire(request=request, policy=policy, block=block, timeout=timeout) 570 elif self._policies: 571 logger.debug( 572 f"No policies matched for endpoint {endpoint} (request: {request}). Allowing call by default." 573 ) 574 575 def update_from_response(self, request: Any, response: Any) -> None: 576 """Update budget information based on the API response. 577 578 :param request: the initial request that triggered this response 579 :param response: response from the API 580 """ 581 pass 582 583 def _do_acquire( 584 self, request: Any, policy: AbstractCallRatePolicy, block: bool, timeout: Optional[float] 585 ) -> None: 586 """Internal method to try to acquire a call credit. 587 588 :param request: the API request 589 :param policy: the matching rate-limiting policy 590 :param block: indicates whether to block until a call credit is available 591 :param timeout: maximum time to wait if blocking 592 :raises: CallRateLimitHit if unable to acquire a call credit 593 """ 594 last_exception = None 595 endpoint = self._extract_endpoint(request) 596 # sometimes we spend all budget before a second attempt, so we have a few more attempts 597 for attempt in range(1, self._maximum_attempts_to_acquire): 598 try: 599 policy.try_acquire(request, weight=1) 600 return 601 except CallRateLimitHit as exc: 602 last_exception = exc 603 if block: 604 if timeout is not None: 605 time_to_wait = min(timedelta(seconds=timeout), exc.time_to_wait) 606 else: 607 time_to_wait = exc.time_to_wait 608 # Ensure we never sleep for a negative duration. 609 time_to_wait = max(timedelta(0), time_to_wait) 610 logger.debug( 611 f"Policy {policy} reached call limit for endpoint {endpoint} ({exc.rate}). " 612 f"Sleeping for {time_to_wait} on attempt {attempt}." 613 ) 614 time.sleep(time_to_wait.total_seconds()) 615 else: 616 logger.debug( 617 f"Policy {policy} reached call limit for endpoint {endpoint} ({exc.rate}) " 618 f"and blocking is disabled." 619 ) 620 raise 621 622 if last_exception: 623 logger.debug( 624 f"Exhausted all {self._maximum_attempts_to_acquire} attempts to acquire a call for endpoint {endpoint} " 625 f"using policy: {policy}" 626 ) 627 raise last_exception 628 629 630class HttpAPIBudget(APIBudget): 631 """Implementation of AbstractAPIBudget for HTTP""" 632 633 def __init__( 634 self, 635 ratelimit_reset_header: str = "ratelimit-reset", 636 ratelimit_remaining_header: str = "ratelimit-remaining", 637 status_codes_for_ratelimit_hit: list[int] = [429], 638 **kwargs: Any, 639 ): 640 """Constructor 641 642 :param ratelimit_reset_header: name of the header that has a timestamp of the next reset of call budget 643 :param ratelimit_remaining_header: name of the header that has the number of calls left 644 :param status_codes_for_ratelimit_hit: list of HTTP status codes that signal about rate limit being hit 645 """ 646 self._ratelimit_reset_header = ratelimit_reset_header 647 self._ratelimit_remaining_header = ratelimit_remaining_header 648 self._status_codes_for_ratelimit_hit = status_codes_for_ratelimit_hit 649 super().__init__(**kwargs) 650 651 def update_from_response(self, request: Any, response: Any) -> None: 652 policy = self.get_matching_policy(request) 653 if not policy: 654 return 655 656 if isinstance(response, requests.Response): 657 available_calls = self.get_calls_left_from_response(response) 658 reset_ts = self.get_reset_ts_from_response(response) 659 policy.update(available_calls=available_calls, call_reset_ts=reset_ts) 660 661 def get_reset_ts_from_response( 662 self, response: requests.Response 663 ) -> Optional[datetime.datetime]: 664 if response.headers.get(self._ratelimit_reset_header): 665 return datetime.datetime.fromtimestamp( 666 int(response.headers[self._ratelimit_reset_header]) 667 ) 668 return None 669 670 def get_calls_left_from_response(self, response: requests.Response) -> Optional[int]: 671 if response.headers.get(self._ratelimit_remaining_header): 672 return int(response.headers[self._ratelimit_remaining_header]) 673 674 if response.status_code in self._status_codes_for_ratelimit_hit: 675 return 0 676 677 return None 678 679 680class LimiterMixin(MIXIN_BASE): 681 """Mixin class that adds rate-limiting behavior to requests.""" 682 683 def __init__( 684 self, 685 api_budget: AbstractAPIBudget, 686 **kwargs: Any, 687 ): 688 self._api_budget = api_budget 689 super().__init__(**kwargs) # type: ignore # Base Session doesn't take any kwargs 690 691 def send(self, request: requests.PreparedRequest, **kwargs: Any) -> requests.Response: 692 """Send a request with rate-limiting.""" 693 self._api_budget.acquire_call(request) 694 response = super().send(request, **kwargs) 695 self._api_budget.update_from_response(request, response) 696 return response 697 698 699class LimiterSession(LimiterMixin, requests.Session): 700 """Session that adds rate-limiting behavior to requests.""" 701 702 703class CachedLimiterSession(requests_cache.CacheMixin, LimiterMixin, requests.Session): 704 """Session class with caching and rate-limiting behavior."""
33@dataclasses.dataclass 34class Rate: 35 """Call rate limit""" 36 37 limit: int 38 interval: timedelta
Call rate limit
41class CallRateLimitHit(Exception): 42 def __init__(self, error: str, item: Any, weight: int, rate: str, time_to_wait: timedelta): 43 """Constructor 44 45 :param error: error message 46 :param item: object passed into acquire_call 47 :param weight: how many credits were requested 48 :param rate: string representation of the rate violated 49 :param time_to_wait: how long should wait util more call will be available 50 """ 51 self.item = item 52 self.weight = weight 53 self.rate = rate 54 self.time_to_wait = time_to_wait 55 super().__init__(error)
Common base class for all non-exit exceptions.
42 def __init__(self, error: str, item: Any, weight: int, rate: str, time_to_wait: timedelta): 43 """Constructor 44 45 :param error: error message 46 :param item: object passed into acquire_call 47 :param weight: how many credits were requested 48 :param rate: string representation of the rate violated 49 :param time_to_wait: how long should wait util more call will be available 50 """ 51 self.item = item 52 self.weight = weight 53 self.rate = rate 54 self.time_to_wait = time_to_wait 55 super().__init__(error)
Constructor
Parameters
- error: error message
- item: object passed into acquire_call
- weight: how many credits were requested
- rate: string representation of the rate violated
- time_to_wait: how long should wait util more call will be available
58class AbstractCallRatePolicy(abc.ABC): 59 """Call rate policy interface. 60 Should be configurable with different rules, like N per M for endpoint X. Endpoint X is matched with APIBudget. 61 """ 62 63 @abc.abstractmethod 64 def matches(self, request: Any) -> bool: 65 """Tells if this policy matches specific request and should apply to it 66 67 :param request: 68 :return: True if policy should apply to this request, False - otherwise 69 """ 70 71 @abc.abstractmethod 72 def try_acquire(self, request: Any, weight: int) -> None: 73 """Try to acquire request 74 75 :param request: a request object representing a single call to API 76 :param weight: number of requests to deduct from credit 77 :return: 78 """ 79 80 @abc.abstractmethod 81 def update( 82 self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime] 83 ) -> None: 84 """Update call rate counting with current values 85 86 :param available_calls: 87 :param call_reset_ts: 88 """
Call rate policy interface. Should be configurable with different rules, like N per M for endpoint X. Endpoint X is matched with APIBudget.
63 @abc.abstractmethod 64 def matches(self, request: Any) -> bool: 65 """Tells if this policy matches specific request and should apply to it 66 67 :param request: 68 :return: True if policy should apply to this request, False - otherwise 69 """
Tells if this policy matches specific request and should apply to it
Parameters
- request:
Returns
True if policy should apply to this request, False - otherwise
71 @abc.abstractmethod 72 def try_acquire(self, request: Any, weight: int) -> None: 73 """Try to acquire request 74 75 :param request: a request object representing a single call to API 76 :param weight: number of requests to deduct from credit 77 :return: 78 """
Try to acquire request
Parameters
- request: a request object representing a single call to API
- weight: number of requests to deduct from credit
Returns
80 @abc.abstractmethod 81 def update( 82 self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime] 83 ) -> None: 84 """Update call rate counting with current values 85 86 :param available_calls: 87 :param call_reset_ts: 88 """
Update call rate counting with current values
Parameters
- available_calls:
- call_reset_ts:
91class RequestMatcher(abc.ABC): 92 """Callable that help to match a request object with call rate policies.""" 93 94 @abc.abstractmethod 95 def __call__(self, request: Any) -> bool: 96 """ 97 98 :param request: 99 :return: True if matches the provided request object, False - otherwise 100 """
Callable that help to match a request object with call rate policies.
103class HttpRequestMatcher(RequestMatcher): 104 """Simple implementation of RequestMatcher for HTTP requests using HttpRequestRegexMatcher under the hood.""" 105 106 def __init__( 107 self, 108 method: Optional[str] = None, 109 url: Optional[str] = None, 110 params: Optional[Mapping[str, Any]] = None, 111 headers: Optional[Mapping[str, Any]] = None, 112 ): 113 """Constructor 114 115 :param method: HTTP method (e.g., "GET", "POST"). 116 :param url: Full URL to match. 117 :param params: Dictionary of query parameters to match. 118 :param headers: Dictionary of headers to match. 119 """ 120 # Parse the URL to extract the base and path 121 if url: 122 parsed_url = parse.urlsplit(url) 123 url_base = f"{parsed_url.scheme}://{parsed_url.netloc}" 124 url_path = parsed_url.path if parsed_url.path != "/" else None 125 else: 126 url_base = None 127 url_path = None 128 129 # Use HttpRequestRegexMatcher under the hood 130 self._regex_matcher = HttpRequestRegexMatcher( 131 method=method, 132 url_base=url_base, 133 url_path_pattern=re.escape(url_path) if url_path else None, 134 params=params, 135 headers=headers, 136 ) 137 138 def __call__(self, request: Any) -> bool: 139 """ 140 :param request: A requests.Request or requests.PreparedRequest instance. 141 :return: True if the request matches all provided criteria; False otherwise. 142 """ 143 return self._regex_matcher(request) 144 145 def __str__(self) -> str: 146 return ( 147 f"HttpRequestMatcher(method={self._regex_matcher._method}, " 148 f"url={self._regex_matcher._url_base}{self._regex_matcher._url_path_pattern.pattern if self._regex_matcher._url_path_pattern else ''}, " 149 f"params={self._regex_matcher._params}, headers={self._regex_matcher._headers})" 150 )
Simple implementation of RequestMatcher for HTTP requests using HttpRequestRegexMatcher under the hood.
106 def __init__( 107 self, 108 method: Optional[str] = None, 109 url: Optional[str] = None, 110 params: Optional[Mapping[str, Any]] = None, 111 headers: Optional[Mapping[str, Any]] = None, 112 ): 113 """Constructor 114 115 :param method: HTTP method (e.g., "GET", "POST"). 116 :param url: Full URL to match. 117 :param params: Dictionary of query parameters to match. 118 :param headers: Dictionary of headers to match. 119 """ 120 # Parse the URL to extract the base and path 121 if url: 122 parsed_url = parse.urlsplit(url) 123 url_base = f"{parsed_url.scheme}://{parsed_url.netloc}" 124 url_path = parsed_url.path if parsed_url.path != "/" else None 125 else: 126 url_base = None 127 url_path = None 128 129 # Use HttpRequestRegexMatcher under the hood 130 self._regex_matcher = HttpRequestRegexMatcher( 131 method=method, 132 url_base=url_base, 133 url_path_pattern=re.escape(url_path) if url_path else None, 134 params=params, 135 headers=headers, 136 )
Constructor
Parameters
- method: HTTP method (e.g., "GET", "POST").
- url: Full URL to match.
- params: Dictionary of query parameters to match.
- headers: Dictionary of headers to match.
153class HttpRequestRegexMatcher(RequestMatcher): 154 """ 155 Extended RequestMatcher for HTTP requests that supports matching on: 156 - HTTP method (case-insensitive) 157 - URL base (scheme + netloc) optionally 158 - URL path pattern (a regex applied to the path portion of the URL) 159 - Query parameters (must be present) 160 - Headers (header names compared case-insensitively) 161 """ 162 163 def __init__( 164 self, 165 method: Optional[str] = None, 166 url_base: Optional[str] = None, 167 url_path_pattern: Optional[str] = None, 168 params: Optional[Mapping[str, Any]] = None, 169 headers: Optional[Mapping[str, Any]] = None, 170 ): 171 """ 172 :param method: HTTP method (e.g. "GET", "POST"); compared case-insensitively. 173 :param url_base: Base URL (scheme://host) that must match. 174 :param url_path_pattern: A regex pattern that will be applied to the path portion of the URL. 175 :param params: Dictionary of query parameters that must be present in the request. 176 :param headers: Dictionary of headers that must be present (header keys are compared case-insensitively). 177 """ 178 self._method = method.upper() if method else None 179 180 # Normalize the url_base if provided: remove trailing slash. 181 self._url_base = url_base.rstrip("/") if url_base else None 182 183 # Compile the URL path pattern if provided. 184 self._url_path_pattern = re.compile(url_path_pattern) if url_path_pattern else None 185 186 # Normalize query parameters to strings. 187 self._params = {str(k): str(v) for k, v in (params or {}).items()} 188 189 # Normalize header keys to lowercase. 190 self._headers = {str(k).lower(): str(v) for k, v in (headers or {}).items()} 191 192 @staticmethod 193 def _match_dict(obj: Mapping[str, Any], pattern: Mapping[str, Any]) -> bool: 194 """Check that every key/value in the pattern exists in the object.""" 195 return pattern.items() <= obj.items() 196 197 def __call__(self, request: Any) -> bool: 198 """ 199 :param request: A requests.Request or requests.PreparedRequest instance. 200 :return: True if the request matches all provided criteria; False otherwise. 201 """ 202 # Prepare the request (if needed) and extract the URL details. 203 if isinstance(request, requests.Request): 204 prepared_request = request.prepare() 205 elif isinstance(request, requests.PreparedRequest): 206 prepared_request = request 207 else: 208 return False 209 210 # Check HTTP method. 211 if self._method is not None: 212 if prepared_request.method != self._method: 213 return False 214 215 # Parse the URL. 216 parsed_url = parse.urlsplit(prepared_request.url) 217 # Reconstruct the base: scheme://netloc 218 request_url_base = f"{str(parsed_url.scheme)}://{str(parsed_url.netloc)}" 219 # The path (without query parameters) 220 request_path = str(parsed_url.path).rstrip("/") 221 222 # If a base URL is provided, check that it matches. 223 if self._url_base is not None: 224 if request_url_base != self._url_base: 225 return False 226 227 # If a URL path pattern is provided, ensure the path matches the regex. 228 if self._url_path_pattern is not None: 229 if not self._url_path_pattern.search(request_path): 230 return False 231 232 # Check query parameters. 233 if self._params: 234 query_params = dict(parse.parse_qsl(str(parsed_url.query))) 235 if not self._match_dict(query_params, self._params): 236 return False 237 238 # Check headers (normalize keys to lower-case). 239 if self._headers: 240 req_headers = {k.lower(): v for k, v in prepared_request.headers.items()} 241 if not self._match_dict(req_headers, self._headers): 242 return False 243 244 return True 245 246 def __str__(self) -> str: 247 regex = self._url_path_pattern.pattern if self._url_path_pattern else None 248 return ( 249 f"HttpRequestRegexMatcher(method={self._method}, url_base={self._url_base}, " 250 f"url_path_pattern={regex}, params={self._params}, headers={self._headers})" 251 )
Extended RequestMatcher for HTTP requests that supports matching on:
- HTTP method (case-insensitive)
- URL base (scheme + netloc) optionally
- URL path pattern (a regex applied to the path portion of the URL)
- Query parameters (must be present)
- Headers (header names compared case-insensitively)
163 def __init__( 164 self, 165 method: Optional[str] = None, 166 url_base: Optional[str] = None, 167 url_path_pattern: Optional[str] = None, 168 params: Optional[Mapping[str, Any]] = None, 169 headers: Optional[Mapping[str, Any]] = None, 170 ): 171 """ 172 :param method: HTTP method (e.g. "GET", "POST"); compared case-insensitively. 173 :param url_base: Base URL (scheme://host) that must match. 174 :param url_path_pattern: A regex pattern that will be applied to the path portion of the URL. 175 :param params: Dictionary of query parameters that must be present in the request. 176 :param headers: Dictionary of headers that must be present (header keys are compared case-insensitively). 177 """ 178 self._method = method.upper() if method else None 179 180 # Normalize the url_base if provided: remove trailing slash. 181 self._url_base = url_base.rstrip("/") if url_base else None 182 183 # Compile the URL path pattern if provided. 184 self._url_path_pattern = re.compile(url_path_pattern) if url_path_pattern else None 185 186 # Normalize query parameters to strings. 187 self._params = {str(k): str(v) for k, v in (params or {}).items()} 188 189 # Normalize header keys to lowercase. 190 self._headers = {str(k).lower(): str(v) for k, v in (headers or {}).items()}
Parameters
- method: HTTP method (e.g. "GET", "POST"); compared case-insensitively.
- url_base: Base URL (scheme: //host) that must match.
- url_path_pattern: A regex pattern that will be applied to the path portion of the URL.
- params: Dictionary of query parameters that must be present in the request.
- headers: Dictionary of headers that must be present (header keys are compared case-insensitively).
254class BaseCallRatePolicy(AbstractCallRatePolicy, abc.ABC): 255 def __init__(self, matchers: list[RequestMatcher]): 256 self._matchers = matchers 257 258 def matches(self, request: Any) -> bool: 259 """Tell if this policy matches specific request and should apply to it 260 261 :param request: 262 :return: True if policy should apply to this request, False - otherwise 263 """ 264 265 if not self._matchers: 266 return True 267 return any(matcher(request) for matcher in self._matchers)
Call rate policy interface. Should be configurable with different rules, like N per M for endpoint X. Endpoint X is matched with APIBudget.
258 def matches(self, request: Any) -> bool: 259 """Tell if this policy matches specific request and should apply to it 260 261 :param request: 262 :return: True if policy should apply to this request, False - otherwise 263 """ 264 265 if not self._matchers: 266 return True 267 return any(matcher(request) for matcher in self._matchers)
Tell if this policy matches specific request and should apply to it
Parameters
- request:
Returns
True if policy should apply to this request, False - otherwise
Inherited Members
270class UnlimitedCallRatePolicy(BaseCallRatePolicy): 271 """ 272 This policy is for explicit unlimited call rates. 273 It can be used when we want to match a specific group of requests and don't apply any limits. 274 275 Example: 276 277 APICallBudget( 278 [ 279 UnlimitedCallRatePolicy( 280 matchers=[HttpRequestMatcher(url="/some/method", headers={"sandbox": true})], 281 ), 282 FixedWindowCallRatePolicy( 283 matchers=[HttpRequestMatcher(url="/some/method")], 284 next_reset_ts=datetime.now(), 285 period=timedelta(hours=1) 286 call_limit=1000, 287 ), 288 ] 289 ) 290 291 The code above will limit all calls to /some/method except calls that have header sandbox=True 292 """ 293 294 def try_acquire(self, request: Any, weight: int) -> None: 295 """Do nothing""" 296 297 def update( 298 self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime] 299 ) -> None: 300 """Do nothing"""
This policy is for explicit unlimited call rates. It can be used when we want to match a specific group of requests and don't apply any limits.
Example:
APICallBudget( [ UnlimitedCallRatePolicy( matchers=[HttpRequestMatcher(url="/some/method", headers={"sandbox": true})], ), FixedWindowCallRatePolicy( matchers=[HttpRequestMatcher(url="/some/method")], next_reset_ts=datetime.now(), period=timedelta(hours=1) call_limit=1000, ), ] )
The code above will limit all calls to /some/method except calls that have header sandbox=True
297 def update( 298 self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime] 299 ) -> None: 300 """Do nothing"""
Do nothing
Inherited Members
303class FixedWindowCallRatePolicy(BaseCallRatePolicy): 304 def __init__( 305 self, 306 next_reset_ts: datetime.datetime, 307 period: timedelta, 308 call_limit: int, 309 matchers: list[RequestMatcher], 310 ): 311 """A policy that allows {call_limit} calls within a {period} time interval 312 313 :param next_reset_ts: next call rate reset time point 314 :param period: call rate reset period 315 :param call_limit: 316 :param matchers: 317 """ 318 319 self._next_reset_ts = next_reset_ts 320 self._offset = period 321 self._call_limit = call_limit 322 self._calls_num = 0 323 self._lock = RLock() 324 super().__init__(matchers=matchers) 325 326 def try_acquire(self, request: Any, weight: int) -> None: 327 if weight > self._call_limit: 328 raise ValueError("Weight can not exceed the call limit") 329 if not self.matches(request): 330 raise ValueError("Request does not match the policy") 331 332 with self._lock: 333 self._update_current_window() 334 335 if self._calls_num + weight > self._call_limit: 336 reset_in = self._next_reset_ts - datetime.datetime.now() 337 error_message = ( 338 f"reached maximum number of allowed calls {self._call_limit} " 339 f"per {self._offset} interval, next reset in {reset_in}." 340 ) 341 raise CallRateLimitHit( 342 error=error_message, 343 item=request, 344 weight=weight, 345 rate=f"{self._call_limit} per {self._offset}", 346 time_to_wait=reset_in, 347 ) 348 349 self._calls_num += weight 350 351 def __str__(self) -> str: 352 matcher_str = ", ".join(f"{matcher}" for matcher in self._matchers) 353 return ( 354 f"FixedWindowCallRatePolicy(call_limit={self._call_limit}, period={self._offset}, " 355 f"calls_used={self._calls_num}, next_reset={self._next_reset_ts}, " 356 f"matchers=[{matcher_str}])" 357 ) 358 359 def update( 360 self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime] 361 ) -> None: 362 """Update call rate counters, by default, only reacts to decreasing updates of available_calls and changes to call_reset_ts. 363 We ignore updates with available_calls > current_available_calls to support call rate limits that are lower than API limits. 364 365 :param available_calls: 366 :param call_reset_ts: 367 """ 368 with self._lock: 369 self._update_current_window() 370 current_available_calls = self._call_limit - self._calls_num 371 372 if available_calls is not None and current_available_calls > available_calls: 373 logger.debug( 374 "got rate limit update from api, adjusting available calls from %s to %s", 375 current_available_calls, 376 available_calls, 377 ) 378 self._calls_num = self._call_limit - available_calls 379 380 if call_reset_ts is not None and call_reset_ts != self._next_reset_ts: 381 logger.debug( 382 "got rate limit update from api, adjusting reset time from %s to %s", 383 self._next_reset_ts, 384 call_reset_ts, 385 ) 386 self._next_reset_ts = call_reset_ts 387 388 def _update_current_window(self) -> None: 389 now = datetime.datetime.now() 390 if now > self._next_reset_ts: 391 logger.debug("started new window, %s calls available now", self._call_limit) 392 self._next_reset_ts = self._next_reset_ts + self._offset 393 self._calls_num = 0
Call rate policy interface. Should be configurable with different rules, like N per M for endpoint X. Endpoint X is matched with APIBudget.
304 def __init__( 305 self, 306 next_reset_ts: datetime.datetime, 307 period: timedelta, 308 call_limit: int, 309 matchers: list[RequestMatcher], 310 ): 311 """A policy that allows {call_limit} calls within a {period} time interval 312 313 :param next_reset_ts: next call rate reset time point 314 :param period: call rate reset period 315 :param call_limit: 316 :param matchers: 317 """ 318 319 self._next_reset_ts = next_reset_ts 320 self._offset = period 321 self._call_limit = call_limit 322 self._calls_num = 0 323 self._lock = RLock() 324 super().__init__(matchers=matchers)
A policy that allows {call_limit} calls within a {period} time interval
Parameters
- next_reset_ts: next call rate reset time point
- period: call rate reset period
- call_limit:
- matchers:
326 def try_acquire(self, request: Any, weight: int) -> None: 327 if weight > self._call_limit: 328 raise ValueError("Weight can not exceed the call limit") 329 if not self.matches(request): 330 raise ValueError("Request does not match the policy") 331 332 with self._lock: 333 self._update_current_window() 334 335 if self._calls_num + weight > self._call_limit: 336 reset_in = self._next_reset_ts - datetime.datetime.now() 337 error_message = ( 338 f"reached maximum number of allowed calls {self._call_limit} " 339 f"per {self._offset} interval, next reset in {reset_in}." 340 ) 341 raise CallRateLimitHit( 342 error=error_message, 343 item=request, 344 weight=weight, 345 rate=f"{self._call_limit} per {self._offset}", 346 time_to_wait=reset_in, 347 ) 348 349 self._calls_num += weight
Try to acquire request
Parameters
- request: a request object representing a single call to API
- weight: number of requests to deduct from credit
Returns
359 def update( 360 self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime] 361 ) -> None: 362 """Update call rate counters, by default, only reacts to decreasing updates of available_calls and changes to call_reset_ts. 363 We ignore updates with available_calls > current_available_calls to support call rate limits that are lower than API limits. 364 365 :param available_calls: 366 :param call_reset_ts: 367 """ 368 with self._lock: 369 self._update_current_window() 370 current_available_calls = self._call_limit - self._calls_num 371 372 if available_calls is not None and current_available_calls > available_calls: 373 logger.debug( 374 "got rate limit update from api, adjusting available calls from %s to %s", 375 current_available_calls, 376 available_calls, 377 ) 378 self._calls_num = self._call_limit - available_calls 379 380 if call_reset_ts is not None and call_reset_ts != self._next_reset_ts: 381 logger.debug( 382 "got rate limit update from api, adjusting reset time from %s to %s", 383 self._next_reset_ts, 384 call_reset_ts, 385 ) 386 self._next_reset_ts = call_reset_ts
Update call rate counters, by default, only reacts to decreasing updates of available_calls and changes to call_reset_ts. We ignore updates with available_calls > current_available_calls to support call rate limits that are lower than API limits.
Parameters
- available_calls:
- call_reset_ts:
Inherited Members
396class MovingWindowCallRatePolicy(BaseCallRatePolicy): 397 """ 398 Policy to control requests rate implemented on top of PyRateLimiter lib. 399 The main difference between this policy and FixedWindowCallRatePolicy is that the rate-limiting window 400 is moving along requests that we made, and there is no moment when we reset an available number of calls. 401 This strategy requires saving of timestamps of all requests within a window. 402 """ 403 404 def __init__(self, rates: list[Rate], matchers: list[RequestMatcher]): 405 """Constructor 406 407 :param rates: list of rates, the order is important and must be ascending 408 :param matchers: 409 """ 410 if not rates: 411 raise ValueError("The list of rates can not be empty") 412 pyrate_rates = [ 413 PyRateRate(limit=rate.limit, interval=int(rate.interval.total_seconds() * 1000)) 414 for rate in rates 415 ] 416 self._bucket = InMemoryBucket(pyrate_rates) 417 # Limiter will create the background task that clears old requests in the bucket 418 self._limiter = Limiter(self._bucket) 419 super().__init__(matchers=matchers) 420 421 def try_acquire(self, request: Any, weight: int) -> None: 422 if not self.matches(request): 423 raise ValueError("Request does not match the policy") 424 425 try: 426 self._limiter.try_acquire(request, weight=weight) 427 except BucketFullException as exc: 428 item = self._limiter.bucket_factory.wrap_item(request, weight) 429 assert isinstance(item, RateItem) 430 431 with self._limiter.lock: 432 time_to_wait = self._bucket.waiting(item) 433 assert isinstance(time_to_wait, int) 434 435 raise CallRateLimitHit( 436 error=str(exc.meta_info["error"]), 437 item=request, 438 weight=int(exc.meta_info["weight"]), 439 rate=str(exc.meta_info["rate"]), 440 time_to_wait=timedelta(milliseconds=time_to_wait), 441 ) 442 443 def update( 444 self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime] 445 ) -> None: 446 """Adjust call bucket to reflect the state of the API server 447 448 :param available_calls: 449 :param call_reset_ts: 450 :return: 451 """ 452 if ( 453 available_calls is not None and call_reset_ts is None 454 ): # we do our best to sync buckets with API 455 if available_calls == 0: 456 with self._limiter.lock: 457 items_to_add = self._bucket.count() < self._bucket.rates[0].limit 458 if items_to_add > 0: 459 now: int = TimeClock().now() # type: ignore[no-untyped-call] 460 self._bucket.put(RateItem(name="dummy", timestamp=now, weight=items_to_add)) 461 # TODO: add support if needed, it might be that it is not possible to make a good solution for this case 462 # if available_calls is not None and call_reset_ts is not None: 463 # ts = call_reset_ts.timestamp() 464 465 def __str__(self) -> str: 466 """Return a human-friendly description of the moving window rate policy for logging purposes.""" 467 rates_info = ", ".join( 468 f"{rate.limit} per {timedelta(milliseconds=rate.interval)}" 469 for rate in self._bucket.rates 470 ) 471 current_bucket_count = self._bucket.count() 472 matcher_str = ", ".join(f"{matcher}" for matcher in self._matchers) 473 return ( 474 f"MovingWindowCallRatePolicy(rates=[{rates_info}], current_bucket_count={current_bucket_count}, " 475 f"matchers=[{matcher_str}])" 476 )
Policy to control requests rate implemented on top of PyRateLimiter lib. The main difference between this policy and FixedWindowCallRatePolicy is that the rate-limiting window is moving along requests that we made, and there is no moment when we reset an available number of calls. This strategy requires saving of timestamps of all requests within a window.
404 def __init__(self, rates: list[Rate], matchers: list[RequestMatcher]): 405 """Constructor 406 407 :param rates: list of rates, the order is important and must be ascending 408 :param matchers: 409 """ 410 if not rates: 411 raise ValueError("The list of rates can not be empty") 412 pyrate_rates = [ 413 PyRateRate(limit=rate.limit, interval=int(rate.interval.total_seconds() * 1000)) 414 for rate in rates 415 ] 416 self._bucket = InMemoryBucket(pyrate_rates) 417 # Limiter will create the background task that clears old requests in the bucket 418 self._limiter = Limiter(self._bucket) 419 super().__init__(matchers=matchers)
Constructor
Parameters
- rates: list of rates, the order is important and must be ascending
- matchers:
421 def try_acquire(self, request: Any, weight: int) -> None: 422 if not self.matches(request): 423 raise ValueError("Request does not match the policy") 424 425 try: 426 self._limiter.try_acquire(request, weight=weight) 427 except BucketFullException as exc: 428 item = self._limiter.bucket_factory.wrap_item(request, weight) 429 assert isinstance(item, RateItem) 430 431 with self._limiter.lock: 432 time_to_wait = self._bucket.waiting(item) 433 assert isinstance(time_to_wait, int) 434 435 raise CallRateLimitHit( 436 error=str(exc.meta_info["error"]), 437 item=request, 438 weight=int(exc.meta_info["weight"]), 439 rate=str(exc.meta_info["rate"]), 440 time_to_wait=timedelta(milliseconds=time_to_wait), 441 )
Try to acquire request
Parameters
- request: a request object representing a single call to API
- weight: number of requests to deduct from credit
Returns
443 def update( 444 self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime] 445 ) -> None: 446 """Adjust call bucket to reflect the state of the API server 447 448 :param available_calls: 449 :param call_reset_ts: 450 :return: 451 """ 452 if ( 453 available_calls is not None and call_reset_ts is None 454 ): # we do our best to sync buckets with API 455 if available_calls == 0: 456 with self._limiter.lock: 457 items_to_add = self._bucket.count() < self._bucket.rates[0].limit 458 if items_to_add > 0: 459 now: int = TimeClock().now() # type: ignore[no-untyped-call] 460 self._bucket.put(RateItem(name="dummy", timestamp=now, weight=items_to_add)) 461 # TODO: add support if needed, it might be that it is not possible to make a good solution for this case 462 # if available_calls is not None and call_reset_ts is not None: 463 # ts = call_reset_ts.timestamp()
Adjust call bucket to reflect the state of the API server
Parameters
- available_calls:
- call_reset_ts:
Returns
Inherited Members
479class AbstractAPIBudget(abc.ABC): 480 """Interface to some API where a client allowed to have N calls per T interval. 481 482 Important: APIBudget is not doing any API calls, the end user code is responsible to call this interface 483 to respect call rate limitation of the API. 484 485 It supports multiple policies applied to different group of requests. To distinct these groups we use RequestMatchers. 486 Individual policy represented by MovingWindowCallRatePolicy and currently supports only moving window strategy. 487 """ 488 489 @abc.abstractmethod 490 def acquire_call( 491 self, request: Any, block: bool = True, timeout: Optional[float] = None 492 ) -> None: 493 """Try to get a call from budget, will block by default 494 495 :param request: 496 :param block: when true (default) will block the current thread until call credit is available 497 :param timeout: if set will limit maximum time in block, otherwise will wait until credit is available 498 :raises: CallRateLimitHit - when no credits left and if timeout was set the waiting time exceed the timeout 499 """ 500 501 @abc.abstractmethod 502 def get_matching_policy(self, request: Any) -> Optional[AbstractCallRatePolicy]: 503 """Find matching call rate policy for specific request""" 504 505 @abc.abstractmethod 506 def update_from_response(self, request: Any, response: Any) -> None: 507 """Update budget information based on response from API 508 509 :param request: the initial request that triggered this response 510 :param response: response from the API 511 """
Interface to some API where a client allowed to have N calls per T interval.
Important: APIBudget is not doing any API calls, the end user code is responsible to call this interface to respect call rate limitation of the API.
It supports multiple policies applied to different group of requests. To distinct these groups we use RequestMatchers. Individual policy represented by MovingWindowCallRatePolicy and currently supports only moving window strategy.
489 @abc.abstractmethod 490 def acquire_call( 491 self, request: Any, block: bool = True, timeout: Optional[float] = None 492 ) -> None: 493 """Try to get a call from budget, will block by default 494 495 :param request: 496 :param block: when true (default) will block the current thread until call credit is available 497 :param timeout: if set will limit maximum time in block, otherwise will wait until credit is available 498 :raises: CallRateLimitHit - when no credits left and if timeout was set the waiting time exceed the timeout 499 """
Try to get a call from budget, will block by default
Parameters
- request:
- block: when true (default) will block the current thread until call credit is available
- timeout: if set will limit maximum time in block, otherwise will wait until credit is available
Raises
- CallRateLimitHit - when no credits left and if timeout was set the waiting time exceed the timeout
501 @abc.abstractmethod 502 def get_matching_policy(self, request: Any) -> Optional[AbstractCallRatePolicy]: 503 """Find matching call rate policy for specific request"""
Find matching call rate policy for specific request
505 @abc.abstractmethod 506 def update_from_response(self, request: Any, response: Any) -> None: 507 """Update budget information based on response from API 508 509 :param request: the initial request that triggered this response 510 :param response: response from the API 511 """
Update budget information based on response from API
Parameters
- request: the initial request that triggered this response
- response: response from the API
514class APIBudget(AbstractAPIBudget): 515 """Default APIBudget implementation""" 516 517 def __init__( 518 self, policies: list[AbstractCallRatePolicy], maximum_attempts_to_acquire: int = 100000 519 ) -> None: 520 """Constructor 521 522 :param policies: list of policies in this budget 523 :param maximum_attempts_to_acquire: number of attempts before throwing hit ratelimit exception, we put some big number here 524 to avoid situations when many threads compete with each other for a few lots over a significant amount of time 525 """ 526 527 self._policies = policies 528 self._maximum_attempts_to_acquire = maximum_attempts_to_acquire 529 530 def _extract_endpoint(self, request: Any) -> str: 531 """Extract the endpoint URL from the request if available.""" 532 endpoint = None 533 try: 534 # If the request is already a PreparedRequest, it should have a URL. 535 if isinstance(request, requests.PreparedRequest): 536 endpoint = request.url 537 # If it's a requests.Request, we call prepare() to extract the URL. 538 elif isinstance(request, requests.Request): 539 prepared = request.prepare() 540 endpoint = prepared.url 541 except Exception as e: 542 logger.debug(f"Error extracting endpoint: {e}") 543 if endpoint: 544 return endpoint 545 return "unknown endpoint" 546 547 def get_matching_policy(self, request: Any) -> Optional[AbstractCallRatePolicy]: 548 for policy in self._policies: 549 if policy.matches(request): 550 return policy 551 return None 552 553 def acquire_call( 554 self, request: Any, block: bool = True, timeout: Optional[float] = None 555 ) -> None: 556 """Try to get a call from budget, will block by default. 557 Matchers will be called sequentially in the same order they were added. 558 The first matcher that returns True will 559 560 :param request: the API request 561 :param block: when True (default) will block until a call credit is available 562 :param timeout: if provided, limits maximum waiting time; otherwise, waits indefinitely 563 :raises: CallRateLimitHit if the call credit cannot be acquired within the timeout 564 """ 565 566 policy = self.get_matching_policy(request) 567 endpoint = self._extract_endpoint(request) 568 if policy: 569 logger.debug(f"Acquiring call for endpoint {endpoint} using policy: {policy}") 570 self._do_acquire(request=request, policy=policy, block=block, timeout=timeout) 571 elif self._policies: 572 logger.debug( 573 f"No policies matched for endpoint {endpoint} (request: {request}). Allowing call by default." 574 ) 575 576 def update_from_response(self, request: Any, response: Any) -> None: 577 """Update budget information based on the API response. 578 579 :param request: the initial request that triggered this response 580 :param response: response from the API 581 """ 582 pass 583 584 def _do_acquire( 585 self, request: Any, policy: AbstractCallRatePolicy, block: bool, timeout: Optional[float] 586 ) -> None: 587 """Internal method to try to acquire a call credit. 588 589 :param request: the API request 590 :param policy: the matching rate-limiting policy 591 :param block: indicates whether to block until a call credit is available 592 :param timeout: maximum time to wait if blocking 593 :raises: CallRateLimitHit if unable to acquire a call credit 594 """ 595 last_exception = None 596 endpoint = self._extract_endpoint(request) 597 # sometimes we spend all budget before a second attempt, so we have a few more attempts 598 for attempt in range(1, self._maximum_attempts_to_acquire): 599 try: 600 policy.try_acquire(request, weight=1) 601 return 602 except CallRateLimitHit as exc: 603 last_exception = exc 604 if block: 605 if timeout is not None: 606 time_to_wait = min(timedelta(seconds=timeout), exc.time_to_wait) 607 else: 608 time_to_wait = exc.time_to_wait 609 # Ensure we never sleep for a negative duration. 610 time_to_wait = max(timedelta(0), time_to_wait) 611 logger.debug( 612 f"Policy {policy} reached call limit for endpoint {endpoint} ({exc.rate}). " 613 f"Sleeping for {time_to_wait} on attempt {attempt}." 614 ) 615 time.sleep(time_to_wait.total_seconds()) 616 else: 617 logger.debug( 618 f"Policy {policy} reached call limit for endpoint {endpoint} ({exc.rate}) " 619 f"and blocking is disabled." 620 ) 621 raise 622 623 if last_exception: 624 logger.debug( 625 f"Exhausted all {self._maximum_attempts_to_acquire} attempts to acquire a call for endpoint {endpoint} " 626 f"using policy: {policy}" 627 ) 628 raise last_exception
Default APIBudget implementation
517 def __init__( 518 self, policies: list[AbstractCallRatePolicy], maximum_attempts_to_acquire: int = 100000 519 ) -> None: 520 """Constructor 521 522 :param policies: list of policies in this budget 523 :param maximum_attempts_to_acquire: number of attempts before throwing hit ratelimit exception, we put some big number here 524 to avoid situations when many threads compete with each other for a few lots over a significant amount of time 525 """ 526 527 self._policies = policies 528 self._maximum_attempts_to_acquire = maximum_attempts_to_acquire
Constructor
Parameters
- policies: list of policies in this budget
- maximum_attempts_to_acquire: number of attempts before throwing hit ratelimit exception, we put some big number here to avoid situations when many threads compete with each other for a few lots over a significant amount of time
547 def get_matching_policy(self, request: Any) -> Optional[AbstractCallRatePolicy]: 548 for policy in self._policies: 549 if policy.matches(request): 550 return policy 551 return None
Find matching call rate policy for specific request
553 def acquire_call( 554 self, request: Any, block: bool = True, timeout: Optional[float] = None 555 ) -> None: 556 """Try to get a call from budget, will block by default. 557 Matchers will be called sequentially in the same order they were added. 558 The first matcher that returns True will 559 560 :param request: the API request 561 :param block: when True (default) will block until a call credit is available 562 :param timeout: if provided, limits maximum waiting time; otherwise, waits indefinitely 563 :raises: CallRateLimitHit if the call credit cannot be acquired within the timeout 564 """ 565 566 policy = self.get_matching_policy(request) 567 endpoint = self._extract_endpoint(request) 568 if policy: 569 logger.debug(f"Acquiring call for endpoint {endpoint} using policy: {policy}") 570 self._do_acquire(request=request, policy=policy, block=block, timeout=timeout) 571 elif self._policies: 572 logger.debug( 573 f"No policies matched for endpoint {endpoint} (request: {request}). Allowing call by default." 574 )
Try to get a call from budget, will block by default. Matchers will be called sequentially in the same order they were added. The first matcher that returns True will
Parameters
- request: the API request
- block: when True (default) will block until a call credit is available
- timeout: if provided, limits maximum waiting time; otherwise, waits indefinitely
Raises
- CallRateLimitHit if the call credit cannot be acquired within the timeout
576 def update_from_response(self, request: Any, response: Any) -> None: 577 """Update budget information based on the API response. 578 579 :param request: the initial request that triggered this response 580 :param response: response from the API 581 """ 582 pass
Update budget information based on the API response.
Parameters
- request: the initial request that triggered this response
- response: response from the API
631class HttpAPIBudget(APIBudget): 632 """Implementation of AbstractAPIBudget for HTTP""" 633 634 def __init__( 635 self, 636 ratelimit_reset_header: str = "ratelimit-reset", 637 ratelimit_remaining_header: str = "ratelimit-remaining", 638 status_codes_for_ratelimit_hit: list[int] = [429], 639 **kwargs: Any, 640 ): 641 """Constructor 642 643 :param ratelimit_reset_header: name of the header that has a timestamp of the next reset of call budget 644 :param ratelimit_remaining_header: name of the header that has the number of calls left 645 :param status_codes_for_ratelimit_hit: list of HTTP status codes that signal about rate limit being hit 646 """ 647 self._ratelimit_reset_header = ratelimit_reset_header 648 self._ratelimit_remaining_header = ratelimit_remaining_header 649 self._status_codes_for_ratelimit_hit = status_codes_for_ratelimit_hit 650 super().__init__(**kwargs) 651 652 def update_from_response(self, request: Any, response: Any) -> None: 653 policy = self.get_matching_policy(request) 654 if not policy: 655 return 656 657 if isinstance(response, requests.Response): 658 available_calls = self.get_calls_left_from_response(response) 659 reset_ts = self.get_reset_ts_from_response(response) 660 policy.update(available_calls=available_calls, call_reset_ts=reset_ts) 661 662 def get_reset_ts_from_response( 663 self, response: requests.Response 664 ) -> Optional[datetime.datetime]: 665 if response.headers.get(self._ratelimit_reset_header): 666 return datetime.datetime.fromtimestamp( 667 int(response.headers[self._ratelimit_reset_header]) 668 ) 669 return None 670 671 def get_calls_left_from_response(self, response: requests.Response) -> Optional[int]: 672 if response.headers.get(self._ratelimit_remaining_header): 673 return int(response.headers[self._ratelimit_remaining_header]) 674 675 if response.status_code in self._status_codes_for_ratelimit_hit: 676 return 0 677 678 return None
Implementation of AbstractAPIBudget for HTTP
634 def __init__( 635 self, 636 ratelimit_reset_header: str = "ratelimit-reset", 637 ratelimit_remaining_header: str = "ratelimit-remaining", 638 status_codes_for_ratelimit_hit: list[int] = [429], 639 **kwargs: Any, 640 ): 641 """Constructor 642 643 :param ratelimit_reset_header: name of the header that has a timestamp of the next reset of call budget 644 :param ratelimit_remaining_header: name of the header that has the number of calls left 645 :param status_codes_for_ratelimit_hit: list of HTTP status codes that signal about rate limit being hit 646 """ 647 self._ratelimit_reset_header = ratelimit_reset_header 648 self._ratelimit_remaining_header = ratelimit_remaining_header 649 self._status_codes_for_ratelimit_hit = status_codes_for_ratelimit_hit 650 super().__init__(**kwargs)
Constructor
Parameters
- ratelimit_reset_header: name of the header that has a timestamp of the next reset of call budget
- ratelimit_remaining_header: name of the header that has the number of calls left
- status_codes_for_ratelimit_hit: list of HTTP status codes that signal about rate limit being hit
652 def update_from_response(self, request: Any, response: Any) -> None: 653 policy = self.get_matching_policy(request) 654 if not policy: 655 return 656 657 if isinstance(response, requests.Response): 658 available_calls = self.get_calls_left_from_response(response) 659 reset_ts = self.get_reset_ts_from_response(response) 660 policy.update(available_calls=available_calls, call_reset_ts=reset_ts)
Update budget information based on the API response.
Parameters
- request: the initial request that triggered this response
- response: response from the API
671 def get_calls_left_from_response(self, response: requests.Response) -> Optional[int]: 672 if response.headers.get(self._ratelimit_remaining_header): 673 return int(response.headers[self._ratelimit_remaining_header]) 674 675 if response.status_code in self._status_codes_for_ratelimit_hit: 676 return 0 677 678 return None
Inherited Members
681class LimiterMixin(MIXIN_BASE): 682 """Mixin class that adds rate-limiting behavior to requests.""" 683 684 def __init__( 685 self, 686 api_budget: AbstractAPIBudget, 687 **kwargs: Any, 688 ): 689 self._api_budget = api_budget 690 super().__init__(**kwargs) # type: ignore # Base Session doesn't take any kwargs 691 692 def send(self, request: requests.PreparedRequest, **kwargs: Any) -> requests.Response: 693 """Send a request with rate-limiting.""" 694 self._api_budget.acquire_call(request) 695 response = super().send(request, **kwargs) 696 self._api_budget.update_from_response(request, response) 697 return response
Mixin class that adds rate-limiting behavior to requests.
692 def send(self, request: requests.PreparedRequest, **kwargs: Any) -> requests.Response: 693 """Send a request with rate-limiting.""" 694 self._api_budget.acquire_call(request) 695 response = super().send(request, **kwargs) 696 self._api_budget.update_from_response(request, response) 697 return response
Send a request with rate-limiting.
700class LimiterSession(LimiterMixin, requests.Session): 701 """Session that adds rate-limiting behavior to requests."""
Session that adds rate-limiting behavior to requests.
Inherited Members
704class CachedLimiterSession(requests_cache.CacheMixin, LimiterMixin, requests.Session): 705 """Session class with caching and rate-limiting behavior."""
Session class with caching and rate-limiting behavior.