airbyte_cdk.sources.streams.call_rate
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import abc 6import dataclasses 7import datetime 8import logging 9import re 10import time 11from datetime import timedelta 12from threading import RLock 13from typing import TYPE_CHECKING, Any, Mapping, Optional 14from urllib import parse 15 16import requests 17import requests_cache 18from pyrate_limiter import InMemoryBucket, Limiter, RateItem, TimeClock 19from pyrate_limiter import Rate as PyRateRate 20from pyrate_limiter.exceptions import BucketFullException 21 22# prevents mypy from complaining about missing session attributes in LimiterMixin 23if TYPE_CHECKING: 24 MIXIN_BASE = requests.Session 25else: 26 MIXIN_BASE = object 27 28logger = logging.getLogger("airbyte") 29logging.getLogger("pyrate_limiter").setLevel(logging.WARNING) 30 31 32@dataclasses.dataclass 33class Rate: 34 """Call rate limit""" 35 36 limit: int 37 interval: timedelta 38 39 40class CallRateLimitHit(Exception): 41 def __init__(self, error: str, item: Any, weight: int, rate: str, time_to_wait: timedelta): 42 """Constructor 43 44 :param error: error message 45 :param item: object passed into acquire_call 46 :param weight: how many credits were requested 47 :param rate: string representation of the rate violated 48 :param time_to_wait: how long should wait util more call will be available 49 """ 50 self.item = item 51 self.weight = weight 52 self.rate = rate 53 self.time_to_wait = time_to_wait 54 super().__init__(error) 55 56 57class AbstractCallRatePolicy(abc.ABC): 58 """Call rate policy interface. 59 Should be configurable with different rules, like N per M for endpoint X. Endpoint X is matched with APIBudget. 60 """ 61 62 @abc.abstractmethod 63 def matches(self, request: Any) -> bool: 64 """Tells if this policy matches specific request and should apply to it 65 66 :param request: 67 :return: True if policy should apply to this request, False - otherwise 68 """ 69 70 @abc.abstractmethod 71 def try_acquire(self, request: Any, weight: int) -> None: 72 """Try to acquire request 73 74 :param request: a request object representing a single call to API 75 :param weight: number of requests to deduct from credit 76 :return: 77 """ 78 79 @abc.abstractmethod 80 def update( 81 self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime] 82 ) -> None: 83 """Update call rate counting with current values 84 85 :param available_calls: 86 :param call_reset_ts: 87 """ 88 89 90class RequestMatcher(abc.ABC): 91 """Callable that help to match a request object with call rate policies.""" 92 93 @abc.abstractmethod 94 def __call__(self, request: Any) -> bool: 95 """ 96 97 :param request: 98 :return: True if matches the provided request object, False - otherwise 99 """ 100 101 102class HttpRequestMatcher(RequestMatcher): 103 """Simple implementation of RequestMatcher for HTTP requests using HttpRequestRegexMatcher under the hood.""" 104 105 def __init__( 106 self, 107 method: Optional[str] = None, 108 url: Optional[str] = None, 109 params: Optional[Mapping[str, Any]] = None, 110 headers: Optional[Mapping[str, Any]] = None, 111 ): 112 """Constructor 113 114 :param method: HTTP method (e.g., "GET", "POST"). 115 :param url: Full URL to match. 116 :param params: Dictionary of query parameters to match. 117 :param headers: Dictionary of headers to match. 118 """ 119 # Parse the URL to extract the base and path 120 if url: 121 parsed_url = parse.urlsplit(url) 122 url_base = f"{parsed_url.scheme}://{parsed_url.netloc}" 123 url_path = parsed_url.path if parsed_url.path != "/" else None 124 else: 125 url_base = None 126 url_path = None 127 128 # Use HttpRequestRegexMatcher under the hood 129 self._regex_matcher = HttpRequestRegexMatcher( 130 method=method, 131 url_base=url_base, 132 url_path_pattern=re.escape(url_path) if url_path else None, 133 params=params, 134 headers=headers, 135 ) 136 137 def __call__(self, request: Any) -> bool: 138 """ 139 :param request: A requests.Request or requests.PreparedRequest instance. 140 :return: True if the request matches all provided criteria; False otherwise. 141 """ 142 return self._regex_matcher(request) 143 144 def __str__(self) -> str: 145 return ( 146 f"HttpRequestMatcher(method={self._regex_matcher._method}, " 147 f"url={self._regex_matcher._url_base}{self._regex_matcher._url_path_pattern.pattern if self._regex_matcher._url_path_pattern else ''}, " 148 f"params={self._regex_matcher._params}, headers={self._regex_matcher._headers})" 149 ) 150 151 152class HttpRequestRegexMatcher(RequestMatcher): 153 """ 154 Extended RequestMatcher for HTTP requests that supports matching on: 155 - HTTP method (case-insensitive) 156 - URL base (scheme + netloc) optionally 157 - URL path pattern (a regex applied to the path portion of the URL) 158 - Query parameters (must be present) 159 - Headers (header names compared case-insensitively) 160 """ 161 162 def __init__( 163 self, 164 method: Optional[str] = None, 165 url_base: Optional[str] = None, 166 url_path_pattern: Optional[str] = None, 167 params: Optional[Mapping[str, Any]] = None, 168 headers: Optional[Mapping[str, Any]] = None, 169 weight: Optional[int] = None, 170 ): 171 """ 172 :param method: HTTP method (e.g. "GET", "POST"); compared case-insensitively. 173 :param url_base: Base URL (scheme://host) that must match. 174 :param url_path_pattern: A regex pattern that will be applied to the path portion of the URL. 175 :param params: Dictionary of query parameters that must be present in the request. 176 :param headers: Dictionary of headers that must be present (header keys are compared case-insensitively). 177 :param weight: The weight of a request matching this matcher. If set, this value is used 178 when acquiring a call from the rate limiter, enabling cost-based rate limiting 179 where different endpoints consume different amounts from a shared budget. 180 If not set, each request counts as 1. 181 """ 182 if weight is not None and weight < 1: 183 raise ValueError(f"weight must be >= 1, got {weight}") 184 self._weight = weight 185 self._method = method.upper() if method else None 186 187 # Normalize the url_base if provided: remove trailing slash. 188 self._url_base = url_base.rstrip("/") if url_base else None 189 190 # Compile the URL path pattern if provided. 191 self._url_path_pattern = re.compile(url_path_pattern) if url_path_pattern else None 192 193 # Normalize query parameters to strings. 194 self._params = {str(k): str(v) for k, v in (params or {}).items()} 195 196 # Normalize header keys to lowercase. 197 self._headers = {str(k).lower(): str(v) for k, v in (headers or {}).items()} 198 199 @staticmethod 200 def _match_dict(obj: Mapping[str, Any], pattern: Mapping[str, Any]) -> bool: 201 """Check that every key/value in the pattern exists in the object.""" 202 return pattern.items() <= obj.items() 203 204 def __call__(self, request: Any) -> bool: 205 """ 206 :param request: A requests.Request or requests.PreparedRequest instance. 207 :return: True if the request matches all provided criteria; False otherwise. 208 """ 209 # Prepare the request (if needed) and extract the URL details. 210 if isinstance(request, requests.Request): 211 prepared_request = request.prepare() 212 elif isinstance(request, requests.PreparedRequest): 213 prepared_request = request 214 else: 215 return False 216 217 # Check HTTP method. 218 if self._method is not None: 219 if prepared_request.method != self._method: 220 return False 221 222 # Parse the URL. 223 parsed_url = parse.urlsplit(prepared_request.url) 224 # Reconstruct the base: scheme://netloc 225 request_url_base = f"{str(parsed_url.scheme)}://{str(parsed_url.netloc)}" 226 # The path (without query parameters) 227 request_path = str(parsed_url.path).rstrip("/") 228 229 # If a base URL is provided, check that it matches. 230 if self._url_base is not None: 231 if request_url_base != self._url_base: 232 return False 233 234 # If a URL path pattern is provided, ensure the path matches the regex. 235 if self._url_path_pattern is not None: 236 if not self._url_path_pattern.search(request_path): 237 return False 238 239 # Check query parameters. 240 if self._params: 241 query_params = dict(parse.parse_qsl(str(parsed_url.query))) 242 if not self._match_dict(query_params, self._params): 243 return False 244 245 # Check headers (normalize keys to lower-case). 246 if self._headers: 247 req_headers = {k.lower(): v for k, v in prepared_request.headers.items()} 248 if not self._match_dict(req_headers, self._headers): 249 return False 250 251 return True 252 253 @property 254 def weight(self) -> Optional[int]: 255 """The weight of a request matching this matcher, or None if not set.""" 256 return self._weight 257 258 def __str__(self) -> str: 259 regex = self._url_path_pattern.pattern if self._url_path_pattern else None 260 return ( 261 f"HttpRequestRegexMatcher(method={self._method}, url_base={self._url_base}, " 262 f"url_path_pattern={regex}, params={self._params}, headers={self._headers}, weight={self._weight})" 263 ) 264 265 266class BaseCallRatePolicy(AbstractCallRatePolicy, abc.ABC): 267 def __init__(self, matchers: list[RequestMatcher]): 268 self._matchers = matchers 269 270 def matches(self, request: Any) -> bool: 271 """Tell if this policy matches specific request and should apply to it 272 273 :param request: 274 :return: True if policy should apply to this request, False - otherwise 275 """ 276 277 if not self._matchers: 278 return True 279 return any(matcher(request) for matcher in self._matchers) 280 281 def get_weight(self, request: Any) -> int: 282 """Get the weight for a request based on the first matching matcher. 283 284 If a matcher has a weight configured, that weight is used. 285 Otherwise, defaults to 1. 286 287 :param request: a request object 288 :return: the weight for this request 289 """ 290 for matcher in self._matchers: 291 if matcher(request): 292 if isinstance(matcher, HttpRequestRegexMatcher) and matcher.weight is not None: 293 return matcher.weight 294 return 1 295 return 1 296 297 298class UnlimitedCallRatePolicy(BaseCallRatePolicy): 299 """ 300 This policy is for explicit unlimited call rates. 301 It can be used when we want to match a specific group of requests and don't apply any limits. 302 303 Example: 304 305 APICallBudget( 306 [ 307 UnlimitedCallRatePolicy( 308 matchers=[HttpRequestMatcher(url="/some/method", headers={"sandbox": true})], 309 ), 310 FixedWindowCallRatePolicy( 311 matchers=[HttpRequestMatcher(url="/some/method")], 312 next_reset_ts=datetime.now(), 313 period=timedelta(hours=1) 314 call_limit=1000, 315 ), 316 ] 317 ) 318 319 The code above will limit all calls to /some/method except calls that have header sandbox=True 320 """ 321 322 def try_acquire(self, request: Any, weight: int) -> None: 323 """Do nothing""" 324 325 def update( 326 self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime] 327 ) -> None: 328 """Do nothing""" 329 330 331class FixedWindowCallRatePolicy(BaseCallRatePolicy): 332 def __init__( 333 self, 334 next_reset_ts: datetime.datetime, 335 period: timedelta, 336 call_limit: int, 337 matchers: list[RequestMatcher], 338 ): 339 """A policy that allows {call_limit} calls within a {period} time interval 340 341 :param next_reset_ts: next call rate reset time point 342 :param period: call rate reset period 343 :param call_limit: 344 :param matchers: 345 """ 346 347 self._next_reset_ts = next_reset_ts 348 self._offset = period 349 self._call_limit = call_limit 350 self._calls_num = 0 351 self._lock = RLock() 352 super().__init__(matchers=matchers) 353 354 def try_acquire(self, request: Any, weight: int) -> None: 355 if weight > self._call_limit: 356 raise ValueError("Weight can not exceed the call limit") 357 if not self.matches(request): 358 raise ValueError("Request does not match the policy") 359 360 with self._lock: 361 self._update_current_window() 362 363 if self._calls_num + weight > self._call_limit: 364 reset_in = self._next_reset_ts - datetime.datetime.now() 365 error_message = ( 366 f"reached maximum number of allowed calls {self._call_limit} " 367 f"per {self._offset} interval, next reset in {reset_in}." 368 ) 369 raise CallRateLimitHit( 370 error=error_message, 371 item=request, 372 weight=weight, 373 rate=f"{self._call_limit} per {self._offset}", 374 time_to_wait=reset_in, 375 ) 376 377 self._calls_num += weight 378 379 def __str__(self) -> str: 380 matcher_str = ", ".join(f"{matcher}" for matcher in self._matchers) 381 return ( 382 f"FixedWindowCallRatePolicy(call_limit={self._call_limit}, period={self._offset}, " 383 f"calls_used={self._calls_num}, next_reset={self._next_reset_ts}, " 384 f"matchers=[{matcher_str}])" 385 ) 386 387 def update( 388 self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime] 389 ) -> None: 390 """Update call rate counters, by default, only reacts to decreasing updates of available_calls and changes to call_reset_ts. 391 We ignore updates with available_calls > current_available_calls to support call rate limits that are lower than API limits. 392 393 :param available_calls: 394 :param call_reset_ts: 395 """ 396 with self._lock: 397 self._update_current_window() 398 current_available_calls = self._call_limit - self._calls_num 399 400 if available_calls is not None and current_available_calls > available_calls: 401 logger.debug( 402 "got rate limit update from api, adjusting available calls from %s to %s", 403 current_available_calls, 404 available_calls, 405 ) 406 self._calls_num = self._call_limit - available_calls 407 408 if call_reset_ts is not None and call_reset_ts != self._next_reset_ts: 409 logger.debug( 410 "got rate limit update from api, adjusting reset time from %s to %s", 411 self._next_reset_ts, 412 call_reset_ts, 413 ) 414 self._next_reset_ts = call_reset_ts 415 416 def _update_current_window(self) -> None: 417 now = datetime.datetime.now() 418 if now > self._next_reset_ts: 419 logger.debug("started new window, %s calls available now", self._call_limit) 420 self._next_reset_ts = self._next_reset_ts + self._offset 421 self._calls_num = 0 422 423 424class MovingWindowCallRatePolicy(BaseCallRatePolicy): 425 """ 426 Policy to control requests rate implemented on top of PyRateLimiter lib. 427 The main difference between this policy and FixedWindowCallRatePolicy is that the rate-limiting window 428 is moving along requests that we made, and there is no moment when we reset an available number of calls. 429 This strategy requires saving of timestamps of all requests within a window. 430 """ 431 432 def __init__(self, rates: list[Rate], matchers: list[RequestMatcher]): 433 """Constructor 434 435 :param rates: list of rates, the order is important and must be ascending 436 :param matchers: 437 """ 438 if not rates: 439 raise ValueError("The list of rates can not be empty") 440 pyrate_rates = [ 441 PyRateRate(limit=rate.limit, interval=int(rate.interval.total_seconds() * 1000)) 442 for rate in rates 443 ] 444 self._bucket = InMemoryBucket(pyrate_rates) 445 # Limiter will create the background task that clears old requests in the bucket 446 self._limiter = Limiter(self._bucket) 447 super().__init__(matchers=matchers) 448 449 def try_acquire(self, request: Any, weight: int) -> None: 450 if not self.matches(request): 451 raise ValueError("Request does not match the policy") 452 lowest_limit = min(rate.limit for rate in self._bucket.rates) 453 if weight > lowest_limit: 454 raise ValueError( 455 f"Weight can not exceed the lowest configured rate limit ({lowest_limit})" 456 ) 457 458 try: 459 self._limiter.try_acquire(request, weight=weight) 460 except BucketFullException as exc: 461 item = self._limiter.bucket_factory.wrap_item(request, weight) 462 assert isinstance(item, RateItem) 463 464 with self._limiter.lock: 465 time_to_wait = self._bucket.waiting(item) 466 assert isinstance(time_to_wait, int) 467 468 raise CallRateLimitHit( 469 error=str(exc.meta_info["error"]), 470 item=request, 471 weight=int(exc.meta_info["weight"]), 472 rate=str(exc.meta_info["rate"]), 473 time_to_wait=timedelta(milliseconds=time_to_wait), 474 ) 475 476 def update( 477 self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime] 478 ) -> None: 479 """Adjust call bucket to reflect the state of the API server 480 481 :param available_calls: 482 :param call_reset_ts: 483 :return: 484 """ 485 if ( 486 available_calls is not None and call_reset_ts is None 487 ): # we do our best to sync buckets with API 488 if available_calls == 0: 489 with self._limiter.lock: 490 items_to_add = self._bucket.count() < self._bucket.rates[0].limit 491 if items_to_add > 0: 492 now: int = TimeClock().now() # type: ignore[no-untyped-call] 493 self._bucket.put(RateItem(name="dummy", timestamp=now, weight=items_to_add)) 494 # TODO: add support if needed, it might be that it is not possible to make a good solution for this case 495 # if available_calls is not None and call_reset_ts is not None: 496 # ts = call_reset_ts.timestamp() 497 498 def __str__(self) -> str: 499 """Return a human-friendly description of the moving window rate policy for logging purposes.""" 500 rates_info = ", ".join( 501 f"{rate.limit} per {timedelta(milliseconds=rate.interval)}" 502 for rate in self._bucket.rates 503 ) 504 current_bucket_count = self._bucket.count() 505 matcher_str = ", ".join(f"{matcher}" for matcher in self._matchers) 506 return ( 507 f"MovingWindowCallRatePolicy(rates=[{rates_info}], current_bucket_count={current_bucket_count}, " 508 f"matchers=[{matcher_str}])" 509 ) 510 511 512class AbstractAPIBudget(abc.ABC): 513 """Interface to some API where a client allowed to have N calls per T interval. 514 515 Important: APIBudget is not doing any API calls, the end user code is responsible to call this interface 516 to respect call rate limitation of the API. 517 518 It supports multiple policies applied to different group of requests. To distinct these groups we use RequestMatchers. 519 Individual policy represented by MovingWindowCallRatePolicy and currently supports only moving window strategy. 520 """ 521 522 @abc.abstractmethod 523 def acquire_call( 524 self, request: Any, block: bool = True, timeout: Optional[float] = None 525 ) -> None: 526 """Try to get a call from budget, will block by default 527 528 :param request: 529 :param block: when true (default) will block the current thread until call credit is available 530 :param timeout: if set will limit maximum time in block, otherwise will wait until credit is available 531 :raises: CallRateLimitHit - when no credits left and if timeout was set the waiting time exceed the timeout 532 """ 533 534 @abc.abstractmethod 535 def get_matching_policy(self, request: Any) -> Optional[AbstractCallRatePolicy]: 536 """Find matching call rate policy for specific request""" 537 538 @abc.abstractmethod 539 def update_from_response(self, request: Any, response: Any) -> None: 540 """Update budget information based on response from API 541 542 :param request: the initial request that triggered this response 543 :param response: response from the API 544 """ 545 546 547class APIBudget(AbstractAPIBudget): 548 """Default APIBudget implementation""" 549 550 def __init__( 551 self, policies: list[AbstractCallRatePolicy], maximum_attempts_to_acquire: int = 100000 552 ) -> None: 553 """Constructor 554 555 :param policies: list of policies in this budget 556 :param maximum_attempts_to_acquire: number of attempts before throwing hit ratelimit exception, we put some big number here 557 to avoid situations when many threads compete with each other for a few lots over a significant amount of time 558 """ 559 560 self._policies = policies 561 self._maximum_attempts_to_acquire = maximum_attempts_to_acquire 562 563 def _extract_endpoint(self, request: Any) -> str: 564 """Extract the endpoint URL from the request if available.""" 565 endpoint = None 566 try: 567 # If the request is already a PreparedRequest, it should have a URL. 568 if isinstance(request, requests.PreparedRequest): 569 endpoint = request.url 570 # If it's a requests.Request, we call prepare() to extract the URL. 571 elif isinstance(request, requests.Request): 572 prepared = request.prepare() 573 endpoint = prepared.url 574 except Exception as e: 575 logger.debug(f"Error extracting endpoint: {e}") 576 if endpoint: 577 return endpoint 578 return "unknown endpoint" 579 580 def get_matching_policy(self, request: Any) -> Optional[AbstractCallRatePolicy]: 581 for policy in self._policies: 582 if policy.matches(request): 583 return policy 584 return None 585 586 def acquire_call( 587 self, request: Any, block: bool = True, timeout: Optional[float] = None 588 ) -> None: 589 """Try to get a call from budget, will block by default. 590 Matchers will be called sequentially in the same order they were added. 591 The first matcher that returns True will 592 593 :param request: the API request 594 :param block: when True (default) will block until a call credit is available 595 :param timeout: if provided, limits maximum waiting time; otherwise, waits indefinitely 596 :raises: CallRateLimitHit if the call credit cannot be acquired within the timeout 597 """ 598 599 policy = self.get_matching_policy(request) 600 endpoint = self._extract_endpoint(request) 601 if policy: 602 logger.debug(f"Acquiring call for endpoint {endpoint} using policy: {policy}") 603 self._do_acquire(request=request, policy=policy, block=block, timeout=timeout) 604 elif self._policies: 605 logger.debug( 606 f"No policies matched for endpoint {endpoint} (request: {request}). Allowing call by default." 607 ) 608 609 def update_from_response(self, request: Any, response: Any) -> None: 610 """Update budget information based on the API response. 611 612 :param request: the initial request that triggered this response 613 :param response: response from the API 614 """ 615 pass 616 617 def _do_acquire( 618 self, request: Any, policy: AbstractCallRatePolicy, block: bool, timeout: Optional[float] 619 ) -> None: 620 """Internal method to try to acquire a call credit. 621 622 :param request: the API request 623 :param policy: the matching rate-limiting policy 624 :param block: indicates whether to block until a call credit is available 625 :param timeout: maximum time to wait if blocking 626 :raises: CallRateLimitHit if unable to acquire a call credit 627 """ 628 last_exception = None 629 endpoint = self._extract_endpoint(request) 630 # sometimes we spend all budget before a second attempt, so we have a few more attempts 631 for attempt in range(1, self._maximum_attempts_to_acquire): 632 try: 633 weight = policy.get_weight(request) if isinstance(policy, BaseCallRatePolicy) else 1 634 policy.try_acquire(request, weight=weight) 635 return 636 except CallRateLimitHit as exc: 637 last_exception = exc 638 if block: 639 if timeout is not None: 640 time_to_wait = min(timedelta(seconds=timeout), exc.time_to_wait) 641 else: 642 time_to_wait = exc.time_to_wait 643 # Ensure we never sleep for a negative duration. 644 time_to_wait = max(timedelta(0), time_to_wait) 645 logger.debug( 646 f"Policy {policy} reached call limit for endpoint {endpoint} ({exc.rate}). " 647 f"Sleeping for {time_to_wait} on attempt {attempt}." 648 ) 649 time.sleep(time_to_wait.total_seconds()) 650 else: 651 logger.debug( 652 f"Policy {policy} reached call limit for endpoint {endpoint} ({exc.rate}) " 653 f"and blocking is disabled." 654 ) 655 raise 656 657 if last_exception: 658 logger.debug( 659 f"Exhausted all {self._maximum_attempts_to_acquire} attempts to acquire a call for endpoint {endpoint} " 660 f"using policy: {policy}" 661 ) 662 raise last_exception 663 664 665class HttpAPIBudget(APIBudget): 666 """Implementation of AbstractAPIBudget for HTTP""" 667 668 def __init__( 669 self, 670 ratelimit_reset_header: str = "ratelimit-reset", 671 ratelimit_remaining_header: str = "ratelimit-remaining", 672 status_codes_for_ratelimit_hit: list[int] = [429], 673 **kwargs: Any, 674 ): 675 """Constructor 676 677 :param ratelimit_reset_header: name of the header that has a timestamp of the next reset of call budget 678 :param ratelimit_remaining_header: name of the header that has the number of calls left 679 :param status_codes_for_ratelimit_hit: list of HTTP status codes that signal about rate limit being hit 680 """ 681 self._ratelimit_reset_header = ratelimit_reset_header 682 self._ratelimit_remaining_header = ratelimit_remaining_header 683 self._status_codes_for_ratelimit_hit = status_codes_for_ratelimit_hit 684 super().__init__(**kwargs) 685 686 def update_from_response(self, request: Any, response: Any) -> None: 687 policy = self.get_matching_policy(request) 688 if not policy: 689 return 690 691 if isinstance(response, requests.Response): 692 available_calls = self.get_calls_left_from_response(response) 693 reset_ts = self.get_reset_ts_from_response(response) 694 policy.update(available_calls=available_calls, call_reset_ts=reset_ts) 695 696 def get_reset_ts_from_response( 697 self, response: requests.Response 698 ) -> Optional[datetime.datetime]: 699 if response.headers.get(self._ratelimit_reset_header): 700 return datetime.datetime.fromtimestamp( 701 int(response.headers[self._ratelimit_reset_header]) 702 ) 703 return None 704 705 def get_calls_left_from_response(self, response: requests.Response) -> Optional[int]: 706 if response.headers.get(self._ratelimit_remaining_header): 707 return int(response.headers[self._ratelimit_remaining_header]) 708 709 if response.status_code in self._status_codes_for_ratelimit_hit: 710 return 0 711 712 return None 713 714 715class LimiterMixin(MIXIN_BASE): 716 """Mixin class that adds rate-limiting behavior to requests.""" 717 718 def __init__( 719 self, 720 api_budget: AbstractAPIBudget, 721 **kwargs: Any, 722 ): 723 self._api_budget = api_budget 724 super().__init__(**kwargs) # type: ignore # Base Session doesn't take any kwargs 725 726 def send(self, request: requests.PreparedRequest, **kwargs: Any) -> requests.Response: 727 """Send a request with rate-limiting.""" 728 self._api_budget.acquire_call(request) 729 response = super().send(request, **kwargs) 730 self._api_budget.update_from_response(request, response) 731 return response 732 733 734class LimiterSession(LimiterMixin, requests.Session): 735 """Session that adds rate-limiting behavior to requests.""" 736 737 738class CachedLimiterSession(requests_cache.CacheMixin, LimiterMixin, requests.Session): 739 """Session class with caching and rate-limiting behavior."""
33@dataclasses.dataclass 34class Rate: 35 """Call rate limit""" 36 37 limit: int 38 interval: timedelta
Call rate limit
41class CallRateLimitHit(Exception): 42 def __init__(self, error: str, item: Any, weight: int, rate: str, time_to_wait: timedelta): 43 """Constructor 44 45 :param error: error message 46 :param item: object passed into acquire_call 47 :param weight: how many credits were requested 48 :param rate: string representation of the rate violated 49 :param time_to_wait: how long should wait util more call will be available 50 """ 51 self.item = item 52 self.weight = weight 53 self.rate = rate 54 self.time_to_wait = time_to_wait 55 super().__init__(error)
Common base class for all non-exit exceptions.
42 def __init__(self, error: str, item: Any, weight: int, rate: str, time_to_wait: timedelta): 43 """Constructor 44 45 :param error: error message 46 :param item: object passed into acquire_call 47 :param weight: how many credits were requested 48 :param rate: string representation of the rate violated 49 :param time_to_wait: how long should wait util more call will be available 50 """ 51 self.item = item 52 self.weight = weight 53 self.rate = rate 54 self.time_to_wait = time_to_wait 55 super().__init__(error)
Constructor
Parameters
- error: error message
- item: object passed into acquire_call
- weight: how many credits were requested
- rate: string representation of the rate violated
- time_to_wait: how long should wait util more call will be available
58class AbstractCallRatePolicy(abc.ABC): 59 """Call rate policy interface. 60 Should be configurable with different rules, like N per M for endpoint X. Endpoint X is matched with APIBudget. 61 """ 62 63 @abc.abstractmethod 64 def matches(self, request: Any) -> bool: 65 """Tells if this policy matches specific request and should apply to it 66 67 :param request: 68 :return: True if policy should apply to this request, False - otherwise 69 """ 70 71 @abc.abstractmethod 72 def try_acquire(self, request: Any, weight: int) -> None: 73 """Try to acquire request 74 75 :param request: a request object representing a single call to API 76 :param weight: number of requests to deduct from credit 77 :return: 78 """ 79 80 @abc.abstractmethod 81 def update( 82 self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime] 83 ) -> None: 84 """Update call rate counting with current values 85 86 :param available_calls: 87 :param call_reset_ts: 88 """
Call rate policy interface. Should be configurable with different rules, like N per M for endpoint X. Endpoint X is matched with APIBudget.
63 @abc.abstractmethod 64 def matches(self, request: Any) -> bool: 65 """Tells if this policy matches specific request and should apply to it 66 67 :param request: 68 :return: True if policy should apply to this request, False - otherwise 69 """
Tells if this policy matches specific request and should apply to it
Parameters
- request:
Returns
True if policy should apply to this request, False - otherwise
71 @abc.abstractmethod 72 def try_acquire(self, request: Any, weight: int) -> None: 73 """Try to acquire request 74 75 :param request: a request object representing a single call to API 76 :param weight: number of requests to deduct from credit 77 :return: 78 """
Try to acquire request
Parameters
- request: a request object representing a single call to API
- weight: number of requests to deduct from credit
Returns
80 @abc.abstractmethod 81 def update( 82 self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime] 83 ) -> None: 84 """Update call rate counting with current values 85 86 :param available_calls: 87 :param call_reset_ts: 88 """
Update call rate counting with current values
Parameters
- available_calls:
- call_reset_ts:
91class RequestMatcher(abc.ABC): 92 """Callable that help to match a request object with call rate policies.""" 93 94 @abc.abstractmethod 95 def __call__(self, request: Any) -> bool: 96 """ 97 98 :param request: 99 :return: True if matches the provided request object, False - otherwise 100 """
Callable that help to match a request object with call rate policies.
103class HttpRequestMatcher(RequestMatcher): 104 """Simple implementation of RequestMatcher for HTTP requests using HttpRequestRegexMatcher under the hood.""" 105 106 def __init__( 107 self, 108 method: Optional[str] = None, 109 url: Optional[str] = None, 110 params: Optional[Mapping[str, Any]] = None, 111 headers: Optional[Mapping[str, Any]] = None, 112 ): 113 """Constructor 114 115 :param method: HTTP method (e.g., "GET", "POST"). 116 :param url: Full URL to match. 117 :param params: Dictionary of query parameters to match. 118 :param headers: Dictionary of headers to match. 119 """ 120 # Parse the URL to extract the base and path 121 if url: 122 parsed_url = parse.urlsplit(url) 123 url_base = f"{parsed_url.scheme}://{parsed_url.netloc}" 124 url_path = parsed_url.path if parsed_url.path != "/" else None 125 else: 126 url_base = None 127 url_path = None 128 129 # Use HttpRequestRegexMatcher under the hood 130 self._regex_matcher = HttpRequestRegexMatcher( 131 method=method, 132 url_base=url_base, 133 url_path_pattern=re.escape(url_path) if url_path else None, 134 params=params, 135 headers=headers, 136 ) 137 138 def __call__(self, request: Any) -> bool: 139 """ 140 :param request: A requests.Request or requests.PreparedRequest instance. 141 :return: True if the request matches all provided criteria; False otherwise. 142 """ 143 return self._regex_matcher(request) 144 145 def __str__(self) -> str: 146 return ( 147 f"HttpRequestMatcher(method={self._regex_matcher._method}, " 148 f"url={self._regex_matcher._url_base}{self._regex_matcher._url_path_pattern.pattern if self._regex_matcher._url_path_pattern else ''}, " 149 f"params={self._regex_matcher._params}, headers={self._regex_matcher._headers})" 150 )
Simple implementation of RequestMatcher for HTTP requests using HttpRequestRegexMatcher under the hood.
106 def __init__( 107 self, 108 method: Optional[str] = None, 109 url: Optional[str] = None, 110 params: Optional[Mapping[str, Any]] = None, 111 headers: Optional[Mapping[str, Any]] = None, 112 ): 113 """Constructor 114 115 :param method: HTTP method (e.g., "GET", "POST"). 116 :param url: Full URL to match. 117 :param params: Dictionary of query parameters to match. 118 :param headers: Dictionary of headers to match. 119 """ 120 # Parse the URL to extract the base and path 121 if url: 122 parsed_url = parse.urlsplit(url) 123 url_base = f"{parsed_url.scheme}://{parsed_url.netloc}" 124 url_path = parsed_url.path if parsed_url.path != "/" else None 125 else: 126 url_base = None 127 url_path = None 128 129 # Use HttpRequestRegexMatcher under the hood 130 self._regex_matcher = HttpRequestRegexMatcher( 131 method=method, 132 url_base=url_base, 133 url_path_pattern=re.escape(url_path) if url_path else None, 134 params=params, 135 headers=headers, 136 )
Constructor
Parameters
- method: HTTP method (e.g., "GET", "POST").
- url: Full URL to match.
- params: Dictionary of query parameters to match.
- headers: Dictionary of headers to match.
153class HttpRequestRegexMatcher(RequestMatcher): 154 """ 155 Extended RequestMatcher for HTTP requests that supports matching on: 156 - HTTP method (case-insensitive) 157 - URL base (scheme + netloc) optionally 158 - URL path pattern (a regex applied to the path portion of the URL) 159 - Query parameters (must be present) 160 - Headers (header names compared case-insensitively) 161 """ 162 163 def __init__( 164 self, 165 method: Optional[str] = None, 166 url_base: Optional[str] = None, 167 url_path_pattern: Optional[str] = None, 168 params: Optional[Mapping[str, Any]] = None, 169 headers: Optional[Mapping[str, Any]] = None, 170 weight: Optional[int] = None, 171 ): 172 """ 173 :param method: HTTP method (e.g. "GET", "POST"); compared case-insensitively. 174 :param url_base: Base URL (scheme://host) that must match. 175 :param url_path_pattern: A regex pattern that will be applied to the path portion of the URL. 176 :param params: Dictionary of query parameters that must be present in the request. 177 :param headers: Dictionary of headers that must be present (header keys are compared case-insensitively). 178 :param weight: The weight of a request matching this matcher. If set, this value is used 179 when acquiring a call from the rate limiter, enabling cost-based rate limiting 180 where different endpoints consume different amounts from a shared budget. 181 If not set, each request counts as 1. 182 """ 183 if weight is not None and weight < 1: 184 raise ValueError(f"weight must be >= 1, got {weight}") 185 self._weight = weight 186 self._method = method.upper() if method else None 187 188 # Normalize the url_base if provided: remove trailing slash. 189 self._url_base = url_base.rstrip("/") if url_base else None 190 191 # Compile the URL path pattern if provided. 192 self._url_path_pattern = re.compile(url_path_pattern) if url_path_pattern else None 193 194 # Normalize query parameters to strings. 195 self._params = {str(k): str(v) for k, v in (params or {}).items()} 196 197 # Normalize header keys to lowercase. 198 self._headers = {str(k).lower(): str(v) for k, v in (headers or {}).items()} 199 200 @staticmethod 201 def _match_dict(obj: Mapping[str, Any], pattern: Mapping[str, Any]) -> bool: 202 """Check that every key/value in the pattern exists in the object.""" 203 return pattern.items() <= obj.items() 204 205 def __call__(self, request: Any) -> bool: 206 """ 207 :param request: A requests.Request or requests.PreparedRequest instance. 208 :return: True if the request matches all provided criteria; False otherwise. 209 """ 210 # Prepare the request (if needed) and extract the URL details. 211 if isinstance(request, requests.Request): 212 prepared_request = request.prepare() 213 elif isinstance(request, requests.PreparedRequest): 214 prepared_request = request 215 else: 216 return False 217 218 # Check HTTP method. 219 if self._method is not None: 220 if prepared_request.method != self._method: 221 return False 222 223 # Parse the URL. 224 parsed_url = parse.urlsplit(prepared_request.url) 225 # Reconstruct the base: scheme://netloc 226 request_url_base = f"{str(parsed_url.scheme)}://{str(parsed_url.netloc)}" 227 # The path (without query parameters) 228 request_path = str(parsed_url.path).rstrip("/") 229 230 # If a base URL is provided, check that it matches. 231 if self._url_base is not None: 232 if request_url_base != self._url_base: 233 return False 234 235 # If a URL path pattern is provided, ensure the path matches the regex. 236 if self._url_path_pattern is not None: 237 if not self._url_path_pattern.search(request_path): 238 return False 239 240 # Check query parameters. 241 if self._params: 242 query_params = dict(parse.parse_qsl(str(parsed_url.query))) 243 if not self._match_dict(query_params, self._params): 244 return False 245 246 # Check headers (normalize keys to lower-case). 247 if self._headers: 248 req_headers = {k.lower(): v for k, v in prepared_request.headers.items()} 249 if not self._match_dict(req_headers, self._headers): 250 return False 251 252 return True 253 254 @property 255 def weight(self) -> Optional[int]: 256 """The weight of a request matching this matcher, or None if not set.""" 257 return self._weight 258 259 def __str__(self) -> str: 260 regex = self._url_path_pattern.pattern if self._url_path_pattern else None 261 return ( 262 f"HttpRequestRegexMatcher(method={self._method}, url_base={self._url_base}, " 263 f"url_path_pattern={regex}, params={self._params}, headers={self._headers}, weight={self._weight})" 264 )
Extended RequestMatcher for HTTP requests that supports matching on:
- HTTP method (case-insensitive)
- URL base (scheme + netloc) optionally
- URL path pattern (a regex applied to the path portion of the URL)
- Query parameters (must be present)
- Headers (header names compared case-insensitively)
163 def __init__( 164 self, 165 method: Optional[str] = None, 166 url_base: Optional[str] = None, 167 url_path_pattern: Optional[str] = None, 168 params: Optional[Mapping[str, Any]] = None, 169 headers: Optional[Mapping[str, Any]] = None, 170 weight: Optional[int] = None, 171 ): 172 """ 173 :param method: HTTP method (e.g. "GET", "POST"); compared case-insensitively. 174 :param url_base: Base URL (scheme://host) that must match. 175 :param url_path_pattern: A regex pattern that will be applied to the path portion of the URL. 176 :param params: Dictionary of query parameters that must be present in the request. 177 :param headers: Dictionary of headers that must be present (header keys are compared case-insensitively). 178 :param weight: The weight of a request matching this matcher. If set, this value is used 179 when acquiring a call from the rate limiter, enabling cost-based rate limiting 180 where different endpoints consume different amounts from a shared budget. 181 If not set, each request counts as 1. 182 """ 183 if weight is not None and weight < 1: 184 raise ValueError(f"weight must be >= 1, got {weight}") 185 self._weight = weight 186 self._method = method.upper() if method else None 187 188 # Normalize the url_base if provided: remove trailing slash. 189 self._url_base = url_base.rstrip("/") if url_base else None 190 191 # Compile the URL path pattern if provided. 192 self._url_path_pattern = re.compile(url_path_pattern) if url_path_pattern else None 193 194 # Normalize query parameters to strings. 195 self._params = {str(k): str(v) for k, v in (params or {}).items()} 196 197 # Normalize header keys to lowercase. 198 self._headers = {str(k).lower(): str(v) for k, v in (headers or {}).items()}
Parameters
- method: HTTP method (e.g. "GET", "POST"); compared case-insensitively.
- url_base: Base URL (scheme: //host) that must match.
- url_path_pattern: A regex pattern that will be applied to the path portion of the URL.
- params: Dictionary of query parameters that must be present in the request.
- headers: Dictionary of headers that must be present (header keys are compared case-insensitively).
- weight: The weight of a request matching this matcher. If set, this value is used when acquiring a call from the rate limiter, enabling cost-based rate limiting where different endpoints consume different amounts from a shared budget. If not set, each request counts as 1.
267class BaseCallRatePolicy(AbstractCallRatePolicy, abc.ABC): 268 def __init__(self, matchers: list[RequestMatcher]): 269 self._matchers = matchers 270 271 def matches(self, request: Any) -> bool: 272 """Tell if this policy matches specific request and should apply to it 273 274 :param request: 275 :return: True if policy should apply to this request, False - otherwise 276 """ 277 278 if not self._matchers: 279 return True 280 return any(matcher(request) for matcher in self._matchers) 281 282 def get_weight(self, request: Any) -> int: 283 """Get the weight for a request based on the first matching matcher. 284 285 If a matcher has a weight configured, that weight is used. 286 Otherwise, defaults to 1. 287 288 :param request: a request object 289 :return: the weight for this request 290 """ 291 for matcher in self._matchers: 292 if matcher(request): 293 if isinstance(matcher, HttpRequestRegexMatcher) and matcher.weight is not None: 294 return matcher.weight 295 return 1 296 return 1
Call rate policy interface. Should be configurable with different rules, like N per M for endpoint X. Endpoint X is matched with APIBudget.
271 def matches(self, request: Any) -> bool: 272 """Tell if this policy matches specific request and should apply to it 273 274 :param request: 275 :return: True if policy should apply to this request, False - otherwise 276 """ 277 278 if not self._matchers: 279 return True 280 return any(matcher(request) for matcher in self._matchers)
Tell if this policy matches specific request and should apply to it
Parameters
- request:
Returns
True if policy should apply to this request, False - otherwise
282 def get_weight(self, request: Any) -> int: 283 """Get the weight for a request based on the first matching matcher. 284 285 If a matcher has a weight configured, that weight is used. 286 Otherwise, defaults to 1. 287 288 :param request: a request object 289 :return: the weight for this request 290 """ 291 for matcher in self._matchers: 292 if matcher(request): 293 if isinstance(matcher, HttpRequestRegexMatcher) and matcher.weight is not None: 294 return matcher.weight 295 return 1 296 return 1
Get the weight for a request based on the first matching matcher.
If a matcher has a weight configured, that weight is used. Otherwise, defaults to 1.
Parameters
- request: a request object
Returns
the weight for this request
Inherited Members
299class UnlimitedCallRatePolicy(BaseCallRatePolicy): 300 """ 301 This policy is for explicit unlimited call rates. 302 It can be used when we want to match a specific group of requests and don't apply any limits. 303 304 Example: 305 306 APICallBudget( 307 [ 308 UnlimitedCallRatePolicy( 309 matchers=[HttpRequestMatcher(url="/some/method", headers={"sandbox": true})], 310 ), 311 FixedWindowCallRatePolicy( 312 matchers=[HttpRequestMatcher(url="/some/method")], 313 next_reset_ts=datetime.now(), 314 period=timedelta(hours=1) 315 call_limit=1000, 316 ), 317 ] 318 ) 319 320 The code above will limit all calls to /some/method except calls that have header sandbox=True 321 """ 322 323 def try_acquire(self, request: Any, weight: int) -> None: 324 """Do nothing""" 325 326 def update( 327 self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime] 328 ) -> None: 329 """Do nothing"""
This policy is for explicit unlimited call rates. It can be used when we want to match a specific group of requests and don't apply any limits.
Example:
APICallBudget( [ UnlimitedCallRatePolicy( matchers=[HttpRequestMatcher(url="/some/method", headers={"sandbox": true})], ), FixedWindowCallRatePolicy( matchers=[HttpRequestMatcher(url="/some/method")], next_reset_ts=datetime.now(), period=timedelta(hours=1) call_limit=1000, ), ] )
The code above will limit all calls to /some/method except calls that have header sandbox=True
326 def update( 327 self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime] 328 ) -> None: 329 """Do nothing"""
Do nothing
Inherited Members
332class FixedWindowCallRatePolicy(BaseCallRatePolicy): 333 def __init__( 334 self, 335 next_reset_ts: datetime.datetime, 336 period: timedelta, 337 call_limit: int, 338 matchers: list[RequestMatcher], 339 ): 340 """A policy that allows {call_limit} calls within a {period} time interval 341 342 :param next_reset_ts: next call rate reset time point 343 :param period: call rate reset period 344 :param call_limit: 345 :param matchers: 346 """ 347 348 self._next_reset_ts = next_reset_ts 349 self._offset = period 350 self._call_limit = call_limit 351 self._calls_num = 0 352 self._lock = RLock() 353 super().__init__(matchers=matchers) 354 355 def try_acquire(self, request: Any, weight: int) -> None: 356 if weight > self._call_limit: 357 raise ValueError("Weight can not exceed the call limit") 358 if not self.matches(request): 359 raise ValueError("Request does not match the policy") 360 361 with self._lock: 362 self._update_current_window() 363 364 if self._calls_num + weight > self._call_limit: 365 reset_in = self._next_reset_ts - datetime.datetime.now() 366 error_message = ( 367 f"reached maximum number of allowed calls {self._call_limit} " 368 f"per {self._offset} interval, next reset in {reset_in}." 369 ) 370 raise CallRateLimitHit( 371 error=error_message, 372 item=request, 373 weight=weight, 374 rate=f"{self._call_limit} per {self._offset}", 375 time_to_wait=reset_in, 376 ) 377 378 self._calls_num += weight 379 380 def __str__(self) -> str: 381 matcher_str = ", ".join(f"{matcher}" for matcher in self._matchers) 382 return ( 383 f"FixedWindowCallRatePolicy(call_limit={self._call_limit}, period={self._offset}, " 384 f"calls_used={self._calls_num}, next_reset={self._next_reset_ts}, " 385 f"matchers=[{matcher_str}])" 386 ) 387 388 def update( 389 self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime] 390 ) -> None: 391 """Update call rate counters, by default, only reacts to decreasing updates of available_calls and changes to call_reset_ts. 392 We ignore updates with available_calls > current_available_calls to support call rate limits that are lower than API limits. 393 394 :param available_calls: 395 :param call_reset_ts: 396 """ 397 with self._lock: 398 self._update_current_window() 399 current_available_calls = self._call_limit - self._calls_num 400 401 if available_calls is not None and current_available_calls > available_calls: 402 logger.debug( 403 "got rate limit update from api, adjusting available calls from %s to %s", 404 current_available_calls, 405 available_calls, 406 ) 407 self._calls_num = self._call_limit - available_calls 408 409 if call_reset_ts is not None and call_reset_ts != self._next_reset_ts: 410 logger.debug( 411 "got rate limit update from api, adjusting reset time from %s to %s", 412 self._next_reset_ts, 413 call_reset_ts, 414 ) 415 self._next_reset_ts = call_reset_ts 416 417 def _update_current_window(self) -> None: 418 now = datetime.datetime.now() 419 if now > self._next_reset_ts: 420 logger.debug("started new window, %s calls available now", self._call_limit) 421 self._next_reset_ts = self._next_reset_ts + self._offset 422 self._calls_num = 0
Call rate policy interface. Should be configurable with different rules, like N per M for endpoint X. Endpoint X is matched with APIBudget.
333 def __init__( 334 self, 335 next_reset_ts: datetime.datetime, 336 period: timedelta, 337 call_limit: int, 338 matchers: list[RequestMatcher], 339 ): 340 """A policy that allows {call_limit} calls within a {period} time interval 341 342 :param next_reset_ts: next call rate reset time point 343 :param period: call rate reset period 344 :param call_limit: 345 :param matchers: 346 """ 347 348 self._next_reset_ts = next_reset_ts 349 self._offset = period 350 self._call_limit = call_limit 351 self._calls_num = 0 352 self._lock = RLock() 353 super().__init__(matchers=matchers)
A policy that allows {call_limit} calls within a {period} time interval
Parameters
- next_reset_ts: next call rate reset time point
- period: call rate reset period
- call_limit:
- matchers:
355 def try_acquire(self, request: Any, weight: int) -> None: 356 if weight > self._call_limit: 357 raise ValueError("Weight can not exceed the call limit") 358 if not self.matches(request): 359 raise ValueError("Request does not match the policy") 360 361 with self._lock: 362 self._update_current_window() 363 364 if self._calls_num + weight > self._call_limit: 365 reset_in = self._next_reset_ts - datetime.datetime.now() 366 error_message = ( 367 f"reached maximum number of allowed calls {self._call_limit} " 368 f"per {self._offset} interval, next reset in {reset_in}." 369 ) 370 raise CallRateLimitHit( 371 error=error_message, 372 item=request, 373 weight=weight, 374 rate=f"{self._call_limit} per {self._offset}", 375 time_to_wait=reset_in, 376 ) 377 378 self._calls_num += weight
Try to acquire request
Parameters
- request: a request object representing a single call to API
- weight: number of requests to deduct from credit
Returns
388 def update( 389 self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime] 390 ) -> None: 391 """Update call rate counters, by default, only reacts to decreasing updates of available_calls and changes to call_reset_ts. 392 We ignore updates with available_calls > current_available_calls to support call rate limits that are lower than API limits. 393 394 :param available_calls: 395 :param call_reset_ts: 396 """ 397 with self._lock: 398 self._update_current_window() 399 current_available_calls = self._call_limit - self._calls_num 400 401 if available_calls is not None and current_available_calls > available_calls: 402 logger.debug( 403 "got rate limit update from api, adjusting available calls from %s to %s", 404 current_available_calls, 405 available_calls, 406 ) 407 self._calls_num = self._call_limit - available_calls 408 409 if call_reset_ts is not None and call_reset_ts != self._next_reset_ts: 410 logger.debug( 411 "got rate limit update from api, adjusting reset time from %s to %s", 412 self._next_reset_ts, 413 call_reset_ts, 414 ) 415 self._next_reset_ts = call_reset_ts
Update call rate counters, by default, only reacts to decreasing updates of available_calls and changes to call_reset_ts. We ignore updates with available_calls > current_available_calls to support call rate limits that are lower than API limits.
Parameters
- available_calls:
- call_reset_ts:
Inherited Members
425class MovingWindowCallRatePolicy(BaseCallRatePolicy): 426 """ 427 Policy to control requests rate implemented on top of PyRateLimiter lib. 428 The main difference between this policy and FixedWindowCallRatePolicy is that the rate-limiting window 429 is moving along requests that we made, and there is no moment when we reset an available number of calls. 430 This strategy requires saving of timestamps of all requests within a window. 431 """ 432 433 def __init__(self, rates: list[Rate], matchers: list[RequestMatcher]): 434 """Constructor 435 436 :param rates: list of rates, the order is important and must be ascending 437 :param matchers: 438 """ 439 if not rates: 440 raise ValueError("The list of rates can not be empty") 441 pyrate_rates = [ 442 PyRateRate(limit=rate.limit, interval=int(rate.interval.total_seconds() * 1000)) 443 for rate in rates 444 ] 445 self._bucket = InMemoryBucket(pyrate_rates) 446 # Limiter will create the background task that clears old requests in the bucket 447 self._limiter = Limiter(self._bucket) 448 super().__init__(matchers=matchers) 449 450 def try_acquire(self, request: Any, weight: int) -> None: 451 if not self.matches(request): 452 raise ValueError("Request does not match the policy") 453 lowest_limit = min(rate.limit for rate in self._bucket.rates) 454 if weight > lowest_limit: 455 raise ValueError( 456 f"Weight can not exceed the lowest configured rate limit ({lowest_limit})" 457 ) 458 459 try: 460 self._limiter.try_acquire(request, weight=weight) 461 except BucketFullException as exc: 462 item = self._limiter.bucket_factory.wrap_item(request, weight) 463 assert isinstance(item, RateItem) 464 465 with self._limiter.lock: 466 time_to_wait = self._bucket.waiting(item) 467 assert isinstance(time_to_wait, int) 468 469 raise CallRateLimitHit( 470 error=str(exc.meta_info["error"]), 471 item=request, 472 weight=int(exc.meta_info["weight"]), 473 rate=str(exc.meta_info["rate"]), 474 time_to_wait=timedelta(milliseconds=time_to_wait), 475 ) 476 477 def update( 478 self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime] 479 ) -> None: 480 """Adjust call bucket to reflect the state of the API server 481 482 :param available_calls: 483 :param call_reset_ts: 484 :return: 485 """ 486 if ( 487 available_calls is not None and call_reset_ts is None 488 ): # we do our best to sync buckets with API 489 if available_calls == 0: 490 with self._limiter.lock: 491 items_to_add = self._bucket.count() < self._bucket.rates[0].limit 492 if items_to_add > 0: 493 now: int = TimeClock().now() # type: ignore[no-untyped-call] 494 self._bucket.put(RateItem(name="dummy", timestamp=now, weight=items_to_add)) 495 # TODO: add support if needed, it might be that it is not possible to make a good solution for this case 496 # if available_calls is not None and call_reset_ts is not None: 497 # ts = call_reset_ts.timestamp() 498 499 def __str__(self) -> str: 500 """Return a human-friendly description of the moving window rate policy for logging purposes.""" 501 rates_info = ", ".join( 502 f"{rate.limit} per {timedelta(milliseconds=rate.interval)}" 503 for rate in self._bucket.rates 504 ) 505 current_bucket_count = self._bucket.count() 506 matcher_str = ", ".join(f"{matcher}" for matcher in self._matchers) 507 return ( 508 f"MovingWindowCallRatePolicy(rates=[{rates_info}], current_bucket_count={current_bucket_count}, " 509 f"matchers=[{matcher_str}])" 510 )
Policy to control requests rate implemented on top of PyRateLimiter lib. The main difference between this policy and FixedWindowCallRatePolicy is that the rate-limiting window is moving along requests that we made, and there is no moment when we reset an available number of calls. This strategy requires saving of timestamps of all requests within a window.
433 def __init__(self, rates: list[Rate], matchers: list[RequestMatcher]): 434 """Constructor 435 436 :param rates: list of rates, the order is important and must be ascending 437 :param matchers: 438 """ 439 if not rates: 440 raise ValueError("The list of rates can not be empty") 441 pyrate_rates = [ 442 PyRateRate(limit=rate.limit, interval=int(rate.interval.total_seconds() * 1000)) 443 for rate in rates 444 ] 445 self._bucket = InMemoryBucket(pyrate_rates) 446 # Limiter will create the background task that clears old requests in the bucket 447 self._limiter = Limiter(self._bucket) 448 super().__init__(matchers=matchers)
Constructor
Parameters
- rates: list of rates, the order is important and must be ascending
- matchers:
450 def try_acquire(self, request: Any, weight: int) -> None: 451 if not self.matches(request): 452 raise ValueError("Request does not match the policy") 453 lowest_limit = min(rate.limit for rate in self._bucket.rates) 454 if weight > lowest_limit: 455 raise ValueError( 456 f"Weight can not exceed the lowest configured rate limit ({lowest_limit})" 457 ) 458 459 try: 460 self._limiter.try_acquire(request, weight=weight) 461 except BucketFullException as exc: 462 item = self._limiter.bucket_factory.wrap_item(request, weight) 463 assert isinstance(item, RateItem) 464 465 with self._limiter.lock: 466 time_to_wait = self._bucket.waiting(item) 467 assert isinstance(time_to_wait, int) 468 469 raise CallRateLimitHit( 470 error=str(exc.meta_info["error"]), 471 item=request, 472 weight=int(exc.meta_info["weight"]), 473 rate=str(exc.meta_info["rate"]), 474 time_to_wait=timedelta(milliseconds=time_to_wait), 475 )
Try to acquire request
Parameters
- request: a request object representing a single call to API
- weight: number of requests to deduct from credit
Returns
477 def update( 478 self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime] 479 ) -> None: 480 """Adjust call bucket to reflect the state of the API server 481 482 :param available_calls: 483 :param call_reset_ts: 484 :return: 485 """ 486 if ( 487 available_calls is not None and call_reset_ts is None 488 ): # we do our best to sync buckets with API 489 if available_calls == 0: 490 with self._limiter.lock: 491 items_to_add = self._bucket.count() < self._bucket.rates[0].limit 492 if items_to_add > 0: 493 now: int = TimeClock().now() # type: ignore[no-untyped-call] 494 self._bucket.put(RateItem(name="dummy", timestamp=now, weight=items_to_add)) 495 # TODO: add support if needed, it might be that it is not possible to make a good solution for this case 496 # if available_calls is not None and call_reset_ts is not None: 497 # ts = call_reset_ts.timestamp()
Adjust call bucket to reflect the state of the API server
Parameters
- available_calls:
- call_reset_ts:
Returns
Inherited Members
513class AbstractAPIBudget(abc.ABC): 514 """Interface to some API where a client allowed to have N calls per T interval. 515 516 Important: APIBudget is not doing any API calls, the end user code is responsible to call this interface 517 to respect call rate limitation of the API. 518 519 It supports multiple policies applied to different group of requests. To distinct these groups we use RequestMatchers. 520 Individual policy represented by MovingWindowCallRatePolicy and currently supports only moving window strategy. 521 """ 522 523 @abc.abstractmethod 524 def acquire_call( 525 self, request: Any, block: bool = True, timeout: Optional[float] = None 526 ) -> None: 527 """Try to get a call from budget, will block by default 528 529 :param request: 530 :param block: when true (default) will block the current thread until call credit is available 531 :param timeout: if set will limit maximum time in block, otherwise will wait until credit is available 532 :raises: CallRateLimitHit - when no credits left and if timeout was set the waiting time exceed the timeout 533 """ 534 535 @abc.abstractmethod 536 def get_matching_policy(self, request: Any) -> Optional[AbstractCallRatePolicy]: 537 """Find matching call rate policy for specific request""" 538 539 @abc.abstractmethod 540 def update_from_response(self, request: Any, response: Any) -> None: 541 """Update budget information based on response from API 542 543 :param request: the initial request that triggered this response 544 :param response: response from the API 545 """
Interface to some API where a client allowed to have N calls per T interval.
Important: APIBudget is not doing any API calls, the end user code is responsible to call this interface to respect call rate limitation of the API.
It supports multiple policies applied to different group of requests. To distinct these groups we use RequestMatchers. Individual policy represented by MovingWindowCallRatePolicy and currently supports only moving window strategy.
523 @abc.abstractmethod 524 def acquire_call( 525 self, request: Any, block: bool = True, timeout: Optional[float] = None 526 ) -> None: 527 """Try to get a call from budget, will block by default 528 529 :param request: 530 :param block: when true (default) will block the current thread until call credit is available 531 :param timeout: if set will limit maximum time in block, otherwise will wait until credit is available 532 :raises: CallRateLimitHit - when no credits left and if timeout was set the waiting time exceed the timeout 533 """
Try to get a call from budget, will block by default
Parameters
- request:
- block: when true (default) will block the current thread until call credit is available
- timeout: if set will limit maximum time in block, otherwise will wait until credit is available
Raises
- CallRateLimitHit - when no credits left and if timeout was set the waiting time exceed the timeout
535 @abc.abstractmethod 536 def get_matching_policy(self, request: Any) -> Optional[AbstractCallRatePolicy]: 537 """Find matching call rate policy for specific request"""
Find matching call rate policy for specific request
539 @abc.abstractmethod 540 def update_from_response(self, request: Any, response: Any) -> None: 541 """Update budget information based on response from API 542 543 :param request: the initial request that triggered this response 544 :param response: response from the API 545 """
Update budget information based on response from API
Parameters
- request: the initial request that triggered this response
- response: response from the API
548class APIBudget(AbstractAPIBudget): 549 """Default APIBudget implementation""" 550 551 def __init__( 552 self, policies: list[AbstractCallRatePolicy], maximum_attempts_to_acquire: int = 100000 553 ) -> None: 554 """Constructor 555 556 :param policies: list of policies in this budget 557 :param maximum_attempts_to_acquire: number of attempts before throwing hit ratelimit exception, we put some big number here 558 to avoid situations when many threads compete with each other for a few lots over a significant amount of time 559 """ 560 561 self._policies = policies 562 self._maximum_attempts_to_acquire = maximum_attempts_to_acquire 563 564 def _extract_endpoint(self, request: Any) -> str: 565 """Extract the endpoint URL from the request if available.""" 566 endpoint = None 567 try: 568 # If the request is already a PreparedRequest, it should have a URL. 569 if isinstance(request, requests.PreparedRequest): 570 endpoint = request.url 571 # If it's a requests.Request, we call prepare() to extract the URL. 572 elif isinstance(request, requests.Request): 573 prepared = request.prepare() 574 endpoint = prepared.url 575 except Exception as e: 576 logger.debug(f"Error extracting endpoint: {e}") 577 if endpoint: 578 return endpoint 579 return "unknown endpoint" 580 581 def get_matching_policy(self, request: Any) -> Optional[AbstractCallRatePolicy]: 582 for policy in self._policies: 583 if policy.matches(request): 584 return policy 585 return None 586 587 def acquire_call( 588 self, request: Any, block: bool = True, timeout: Optional[float] = None 589 ) -> None: 590 """Try to get a call from budget, will block by default. 591 Matchers will be called sequentially in the same order they were added. 592 The first matcher that returns True will 593 594 :param request: the API request 595 :param block: when True (default) will block until a call credit is available 596 :param timeout: if provided, limits maximum waiting time; otherwise, waits indefinitely 597 :raises: CallRateLimitHit if the call credit cannot be acquired within the timeout 598 """ 599 600 policy = self.get_matching_policy(request) 601 endpoint = self._extract_endpoint(request) 602 if policy: 603 logger.debug(f"Acquiring call for endpoint {endpoint} using policy: {policy}") 604 self._do_acquire(request=request, policy=policy, block=block, timeout=timeout) 605 elif self._policies: 606 logger.debug( 607 f"No policies matched for endpoint {endpoint} (request: {request}). Allowing call by default." 608 ) 609 610 def update_from_response(self, request: Any, response: Any) -> None: 611 """Update budget information based on the API response. 612 613 :param request: the initial request that triggered this response 614 :param response: response from the API 615 """ 616 pass 617 618 def _do_acquire( 619 self, request: Any, policy: AbstractCallRatePolicy, block: bool, timeout: Optional[float] 620 ) -> None: 621 """Internal method to try to acquire a call credit. 622 623 :param request: the API request 624 :param policy: the matching rate-limiting policy 625 :param block: indicates whether to block until a call credit is available 626 :param timeout: maximum time to wait if blocking 627 :raises: CallRateLimitHit if unable to acquire a call credit 628 """ 629 last_exception = None 630 endpoint = self._extract_endpoint(request) 631 # sometimes we spend all budget before a second attempt, so we have a few more attempts 632 for attempt in range(1, self._maximum_attempts_to_acquire): 633 try: 634 weight = policy.get_weight(request) if isinstance(policy, BaseCallRatePolicy) else 1 635 policy.try_acquire(request, weight=weight) 636 return 637 except CallRateLimitHit as exc: 638 last_exception = exc 639 if block: 640 if timeout is not None: 641 time_to_wait = min(timedelta(seconds=timeout), exc.time_to_wait) 642 else: 643 time_to_wait = exc.time_to_wait 644 # Ensure we never sleep for a negative duration. 645 time_to_wait = max(timedelta(0), time_to_wait) 646 logger.debug( 647 f"Policy {policy} reached call limit for endpoint {endpoint} ({exc.rate}). " 648 f"Sleeping for {time_to_wait} on attempt {attempt}." 649 ) 650 time.sleep(time_to_wait.total_seconds()) 651 else: 652 logger.debug( 653 f"Policy {policy} reached call limit for endpoint {endpoint} ({exc.rate}) " 654 f"and blocking is disabled." 655 ) 656 raise 657 658 if last_exception: 659 logger.debug( 660 f"Exhausted all {self._maximum_attempts_to_acquire} attempts to acquire a call for endpoint {endpoint} " 661 f"using policy: {policy}" 662 ) 663 raise last_exception
Default APIBudget implementation
551 def __init__( 552 self, policies: list[AbstractCallRatePolicy], maximum_attempts_to_acquire: int = 100000 553 ) -> None: 554 """Constructor 555 556 :param policies: list of policies in this budget 557 :param maximum_attempts_to_acquire: number of attempts before throwing hit ratelimit exception, we put some big number here 558 to avoid situations when many threads compete with each other for a few lots over a significant amount of time 559 """ 560 561 self._policies = policies 562 self._maximum_attempts_to_acquire = maximum_attempts_to_acquire
Constructor
Parameters
- policies: list of policies in this budget
- maximum_attempts_to_acquire: number of attempts before throwing hit ratelimit exception, we put some big number here to avoid situations when many threads compete with each other for a few lots over a significant amount of time
581 def get_matching_policy(self, request: Any) -> Optional[AbstractCallRatePolicy]: 582 for policy in self._policies: 583 if policy.matches(request): 584 return policy 585 return None
Find matching call rate policy for specific request
587 def acquire_call( 588 self, request: Any, block: bool = True, timeout: Optional[float] = None 589 ) -> None: 590 """Try to get a call from budget, will block by default. 591 Matchers will be called sequentially in the same order they were added. 592 The first matcher that returns True will 593 594 :param request: the API request 595 :param block: when True (default) will block until a call credit is available 596 :param timeout: if provided, limits maximum waiting time; otherwise, waits indefinitely 597 :raises: CallRateLimitHit if the call credit cannot be acquired within the timeout 598 """ 599 600 policy = self.get_matching_policy(request) 601 endpoint = self._extract_endpoint(request) 602 if policy: 603 logger.debug(f"Acquiring call for endpoint {endpoint} using policy: {policy}") 604 self._do_acquire(request=request, policy=policy, block=block, timeout=timeout) 605 elif self._policies: 606 logger.debug( 607 f"No policies matched for endpoint {endpoint} (request: {request}). Allowing call by default." 608 )
Try to get a call from budget, will block by default. Matchers will be called sequentially in the same order they were added. The first matcher that returns True will
Parameters
- request: the API request
- block: when True (default) will block until a call credit is available
- timeout: if provided, limits maximum waiting time; otherwise, waits indefinitely
Raises
- CallRateLimitHit if the call credit cannot be acquired within the timeout
610 def update_from_response(self, request: Any, response: Any) -> None: 611 """Update budget information based on the API response. 612 613 :param request: the initial request that triggered this response 614 :param response: response from the API 615 """ 616 pass
Update budget information based on the API response.
Parameters
- request: the initial request that triggered this response
- response: response from the API
666class HttpAPIBudget(APIBudget): 667 """Implementation of AbstractAPIBudget for HTTP""" 668 669 def __init__( 670 self, 671 ratelimit_reset_header: str = "ratelimit-reset", 672 ratelimit_remaining_header: str = "ratelimit-remaining", 673 status_codes_for_ratelimit_hit: list[int] = [429], 674 **kwargs: Any, 675 ): 676 """Constructor 677 678 :param ratelimit_reset_header: name of the header that has a timestamp of the next reset of call budget 679 :param ratelimit_remaining_header: name of the header that has the number of calls left 680 :param status_codes_for_ratelimit_hit: list of HTTP status codes that signal about rate limit being hit 681 """ 682 self._ratelimit_reset_header = ratelimit_reset_header 683 self._ratelimit_remaining_header = ratelimit_remaining_header 684 self._status_codes_for_ratelimit_hit = status_codes_for_ratelimit_hit 685 super().__init__(**kwargs) 686 687 def update_from_response(self, request: Any, response: Any) -> None: 688 policy = self.get_matching_policy(request) 689 if not policy: 690 return 691 692 if isinstance(response, requests.Response): 693 available_calls = self.get_calls_left_from_response(response) 694 reset_ts = self.get_reset_ts_from_response(response) 695 policy.update(available_calls=available_calls, call_reset_ts=reset_ts) 696 697 def get_reset_ts_from_response( 698 self, response: requests.Response 699 ) -> Optional[datetime.datetime]: 700 if response.headers.get(self._ratelimit_reset_header): 701 return datetime.datetime.fromtimestamp( 702 int(response.headers[self._ratelimit_reset_header]) 703 ) 704 return None 705 706 def get_calls_left_from_response(self, response: requests.Response) -> Optional[int]: 707 if response.headers.get(self._ratelimit_remaining_header): 708 return int(response.headers[self._ratelimit_remaining_header]) 709 710 if response.status_code in self._status_codes_for_ratelimit_hit: 711 return 0 712 713 return None
Implementation of AbstractAPIBudget for HTTP
669 def __init__( 670 self, 671 ratelimit_reset_header: str = "ratelimit-reset", 672 ratelimit_remaining_header: str = "ratelimit-remaining", 673 status_codes_for_ratelimit_hit: list[int] = [429], 674 **kwargs: Any, 675 ): 676 """Constructor 677 678 :param ratelimit_reset_header: name of the header that has a timestamp of the next reset of call budget 679 :param ratelimit_remaining_header: name of the header that has the number of calls left 680 :param status_codes_for_ratelimit_hit: list of HTTP status codes that signal about rate limit being hit 681 """ 682 self._ratelimit_reset_header = ratelimit_reset_header 683 self._ratelimit_remaining_header = ratelimit_remaining_header 684 self._status_codes_for_ratelimit_hit = status_codes_for_ratelimit_hit 685 super().__init__(**kwargs)
Constructor
Parameters
- ratelimit_reset_header: name of the header that has a timestamp of the next reset of call budget
- ratelimit_remaining_header: name of the header that has the number of calls left
- status_codes_for_ratelimit_hit: list of HTTP status codes that signal about rate limit being hit
687 def update_from_response(self, request: Any, response: Any) -> None: 688 policy = self.get_matching_policy(request) 689 if not policy: 690 return 691 692 if isinstance(response, requests.Response): 693 available_calls = self.get_calls_left_from_response(response) 694 reset_ts = self.get_reset_ts_from_response(response) 695 policy.update(available_calls=available_calls, call_reset_ts=reset_ts)
Update budget information based on the API response.
Parameters
- request: the initial request that triggered this response
- response: response from the API
706 def get_calls_left_from_response(self, response: requests.Response) -> Optional[int]: 707 if response.headers.get(self._ratelimit_remaining_header): 708 return int(response.headers[self._ratelimit_remaining_header]) 709 710 if response.status_code in self._status_codes_for_ratelimit_hit: 711 return 0 712 713 return None
Inherited Members
716class LimiterMixin(MIXIN_BASE): 717 """Mixin class that adds rate-limiting behavior to requests.""" 718 719 def __init__( 720 self, 721 api_budget: AbstractAPIBudget, 722 **kwargs: Any, 723 ): 724 self._api_budget = api_budget 725 super().__init__(**kwargs) # type: ignore # Base Session doesn't take any kwargs 726 727 def send(self, request: requests.PreparedRequest, **kwargs: Any) -> requests.Response: 728 """Send a request with rate-limiting.""" 729 self._api_budget.acquire_call(request) 730 response = super().send(request, **kwargs) 731 self._api_budget.update_from_response(request, response) 732 return response
Mixin class that adds rate-limiting behavior to requests.
727 def send(self, request: requests.PreparedRequest, **kwargs: Any) -> requests.Response: 728 """Send a request with rate-limiting.""" 729 self._api_budget.acquire_call(request) 730 response = super().send(request, **kwargs) 731 self._api_budget.update_from_response(request, response) 732 return response
Send a request with rate-limiting.
735class LimiterSession(LimiterMixin, requests.Session): 736 """Session that adds rate-limiting behavior to requests."""
Session that adds rate-limiting behavior to requests.
Inherited Members
739class CachedLimiterSession(requests_cache.CacheMixin, LimiterMixin, requests.Session): 740 """Session class with caching and rate-limiting behavior."""
Session class with caching and rate-limiting behavior.