airbyte_cdk.sources.streams.http.requests_native_auth.abstract_oauth
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import logging 6import threading 7from abc import abstractmethod 8from datetime import timedelta 9from json import JSONDecodeError 10from typing import Any, List, Mapping, MutableMapping, Optional, Tuple, Union 11 12import backoff 13import requests 14from requests.auth import AuthBase 15 16from airbyte_cdk.models import FailureType, Level 17from airbyte_cdk.sources.http_logger import format_http_message 18from airbyte_cdk.sources.message import MessageRepository, NoopMessageRepository 19from airbyte_cdk.utils import AirbyteTracedException 20from airbyte_cdk.utils.airbyte_secrets_utils import add_to_secrets 21from airbyte_cdk.utils.datetime_helpers import AirbyteDateTime, ab_datetime_now, ab_datetime_parse 22 23from ..exceptions import DefaultBackoffException 24 25logger = logging.getLogger("airbyte") 26_NOOP_MESSAGE_REPOSITORY = NoopMessageRepository() 27 28 29class ResponseKeysMaxRecurtionReached(AirbyteTracedException): 30 """ 31 Raised when the max level of recursion is reached, when trying to 32 find-and-get the target key, during the `_make_handled_request` 33 """ 34 35 36class AbstractOauth2Authenticator(AuthBase): 37 """ 38 Abstract class for an OAuth authenticators that implements the OAuth token refresh flow. The authenticator 39 is designed to generically perform the refresh flow without regard to how config fields are get/set by 40 delegating that behavior to the classes implementing the interface. 41 """ 42 43 _NO_STREAM_NAME = None 44 45 # Class-level lock to prevent concurrent token refresh across multiple authenticator instances. 46 # This is necessary because multiple streams may share the same OAuth credentials (refresh token) 47 # through the connector config. Without this lock, concurrent refresh attempts can cause race 48 # conditions where one stream successfully refreshes the token while others fail because the 49 # refresh token has been invalidated (especially for single-use refresh tokens). 50 _token_refresh_lock: threading.Lock = threading.Lock() 51 52 def __init__( 53 self, 54 refresh_token_error_status_codes: Tuple[int, ...] = (), 55 refresh_token_error_key: str = "", 56 refresh_token_error_values: Tuple[str, ...] = (), 57 ) -> None: 58 """ 59 If all of refresh_token_error_status_codes, refresh_token_error_key, and refresh_token_error_values are set, 60 then http errors with such params will be wrapped in AirbyteTracedException. 61 """ 62 self._refresh_token_error_status_codes = refresh_token_error_status_codes 63 self._refresh_token_error_key = refresh_token_error_key 64 self._refresh_token_error_values = refresh_token_error_values 65 66 def __call__(self, request: requests.PreparedRequest) -> requests.PreparedRequest: 67 """Attach the HTTP headers required to authenticate on the HTTP request""" 68 request.headers.update(self.get_auth_header()) 69 return request 70 71 @property 72 def _is_access_token_flow(self) -> bool: 73 return self.get_token_refresh_endpoint() is None and self.access_token is not None 74 75 @property 76 def token_expiry_is_time_of_expiration(self) -> bool: 77 """ 78 Indicates that the Token Expiry returns the date until which the token will be valid, not the amount of time it will be valid. 79 """ 80 81 return False 82 83 @property 84 def token_expiry_date_format(self) -> Optional[str]: 85 """ 86 Format of the datetime; exists it if expires_in is returned as the expiration datetime instead of seconds until it expires 87 """ 88 89 return None 90 91 def get_auth_header(self) -> Mapping[str, Any]: 92 """HTTP header to set on the requests""" 93 token = self.access_token if self._is_access_token_flow else self.get_access_token() 94 return {"Authorization": f"Bearer {token}"} 95 96 def get_access_token(self) -> str: 97 """ 98 Returns the access token. 99 100 This method uses double-checked locking to ensure thread-safe token refresh. 101 When multiple threads (streams) detect an expired token simultaneously, only one 102 will perform the refresh while others wait. After acquiring the lock, the token 103 expiry is re-checked to avoid redundant refresh attempts. 104 """ 105 if self.token_has_expired(): 106 with self._token_refresh_lock: 107 # Double-check after acquiring lock - another thread may have already refreshed 108 if self.token_has_expired(): 109 self.refresh_and_set_access_token() 110 111 return self.access_token 112 113 def refresh_and_set_access_token(self) -> None: 114 """Force refresh the access token and update internal state. 115 116 This method refreshes the access token regardless of whether it has expired, 117 and updates the internal token and expiry date. Subclasses may override this 118 to handle additional state updates (e.g., persisting new refresh tokens). 119 """ 120 token, expires_in = self.refresh_access_token() 121 self.access_token = token 122 self.set_token_expiry_date(expires_in) 123 124 def token_has_expired(self) -> bool: 125 """Returns True if the token is expired""" 126 return ab_datetime_now() > self.get_token_expiry_date() 127 128 def _build_standard_refresh_args(self) -> MutableMapping[str, Any]: 129 """Build the standard OAuth refresh args (grant_type, refresh_token, client 130 credentials, scopes, plus any user-configured `refresh_request_body` extras). 131 132 Used by both `build_refresh_request_body()` and 133 `build_refresh_request_query_params()` so the same set of args can be emitted 134 in either the body or the URL query string depending on 135 `should_send_refresh_request_as_query_params()`. 136 137 Client credentials (client_id and client_secret) are excluded when 138 `refresh_request_headers` contains an `Authorization` header (e.g. Basic 139 auth). This is required by OAuth providers like Gong that expect credentials 140 ONLY in the Authorization header and reject requests that include them in 141 both places. 142 """ 143 headers = self.get_refresh_request_headers() 144 credentials_in_header = headers and "Authorization" in headers 145 include_client_credentials = not credentials_in_header 146 147 payload: MutableMapping[str, Any] = { 148 self.get_grant_type_name(): self.get_grant_type(), 149 } 150 151 if include_client_credentials: 152 payload[self.get_client_id_name()] = self.get_client_id() 153 payload[self.get_client_secret_name()] = self.get_client_secret() 154 155 payload[self.get_refresh_token_name()] = self.get_refresh_token() 156 157 if self.get_scopes(): 158 payload["scopes"] = self.get_scopes() 159 160 if self.get_refresh_request_body(): 161 for key, val in self.get_refresh_request_body().items(): 162 # Existing oauth args take precedence over custom configured fields. 163 if key not in payload: 164 payload[key] = val 165 166 return payload 167 168 def build_refresh_request_body(self) -> Mapping[str, Any]: 169 """Returns the request body to set on the refresh request. 170 171 When `should_send_refresh_request_as_query_params()` is `True`, the standard 172 refresh args are emitted on the URL query string instead and this method 173 returns an empty body. This supports OAuth providers like Gong that document 174 their refresh endpoint as a `POST` with parameters on the URL query string 175 and an empty body. 176 """ 177 if self.should_send_refresh_request_as_query_params(): 178 return {} 179 return self._build_standard_refresh_args() 180 181 def build_refresh_request_headers(self) -> Mapping[str, Any] | None: 182 """ 183 Returns the request headers to set on the refresh request 184 185 """ 186 headers = self.get_refresh_request_headers() 187 return headers if headers else None 188 189 def build_refresh_request_query_params(self) -> Mapping[str, Any] | None: 190 """Returns the URL query string parameters to set on the refresh request. 191 192 When `should_send_refresh_request_as_query_params()` is `True`, the standard 193 refresh args (grant_type, refresh_token, client credentials, scopes, plus 194 any user-configured `refresh_request_body` extras) are returned here and 195 `build_refresh_request_body()` returns an empty body. 196 197 Returns `None` otherwise so existing authenticators retain their previous 198 behavior (no query params on the refresh URL). 199 """ 200 if not self.should_send_refresh_request_as_query_params(): 201 return None 202 return self._build_standard_refresh_args() 203 204 def refresh_access_token(self) -> Tuple[str, AirbyteDateTime]: 205 """ 206 Returns the refresh token and its expiration datetime 207 208 :return: a tuple of (access_token, token_lifespan) 209 """ 210 try: 211 response_json = self._make_handled_request() 212 except ( 213 requests.exceptions.ConnectionError, 214 requests.exceptions.ConnectTimeout, 215 requests.exceptions.ReadTimeout, 216 ) as e: 217 raise AirbyteTracedException( 218 message="OAuth access token refresh request failed due to a network error.", 219 internal_message=f"Network error during OAuth token refresh after retries were exhausted: {e}", 220 failure_type=FailureType.transient_error, 221 ) from e 222 self._ensure_access_token_in_response(response_json) 223 224 return ( 225 self._extract_access_token(response_json), 226 self._extract_token_expiry_date(response_json), 227 ) 228 229 # ---------------- 230 # PRIVATE METHODS 231 # ---------------- 232 233 def _default_token_expiry_date(self) -> AirbyteDateTime: 234 """ 235 Returns the default token expiry date 236 """ 237 # 1 hour was chosen as a middle ground to avoid unnecessary frequent refreshes and token expiration 238 default_token_expiry_duration_hours = 1 # 1 hour 239 return ab_datetime_now() + timedelta(hours=default_token_expiry_duration_hours) 240 241 def _wrap_refresh_token_exception( 242 self, exception: requests.exceptions.RequestException 243 ) -> bool: 244 """ 245 Wraps and handles exceptions that occur during the refresh token process. 246 247 This method checks if the provided exception is related to a refresh token error 248 by examining the response status code and specific error content. 249 250 Args: 251 exception (requests.exceptions.RequestException): The exception raised during the request. 252 253 Returns: 254 bool: True if the exception is related to a refresh token error, False otherwise. 255 """ 256 try: 257 if exception.response is not None: 258 exception_content = exception.response.json() 259 else: 260 return False 261 except JSONDecodeError: 262 return False 263 return ( 264 exception.response.status_code in self._refresh_token_error_status_codes 265 and exception_content.get(self._refresh_token_error_key) 266 in self._refresh_token_error_values 267 ) 268 269 @backoff.on_exception( 270 backoff.expo, 271 ( 272 DefaultBackoffException, 273 requests.exceptions.ConnectionError, 274 requests.exceptions.ConnectTimeout, 275 requests.exceptions.ReadTimeout, 276 ), 277 on_backoff=lambda details: logger.info( 278 f"Caught retryable error after {details['tries']} tries. Waiting {details['wait']} seconds then retrying..." 279 ), 280 max_time=300, 281 ) 282 def _make_handled_request(self) -> Any: 283 """ 284 Makes a handled HTTP request to refresh an OAuth token. 285 286 This method sends a POST request to the token refresh endpoint with the necessary 287 headers and body to obtain a new access token. It handles various exceptions that 288 may occur during the request and logs the response for troubleshooting purposes. 289 290 Returns: 291 Mapping[str, Any]: The JSON response from the token refresh endpoint. 292 293 Raises: 294 DefaultBackoffException: If the response status code is 429 (Too Many Requests) 295 or any 5xx server error. 296 AirbyteTracedException: If the refresh token is invalid or expired, prompting 297 re-authentication. 298 Exception: For any other exceptions that occur during the request. 299 """ 300 try: 301 response = requests.request( 302 method="POST", 303 url=self.get_token_refresh_endpoint(), # type: ignore # returns None, if not provided, but str | bytes is expected. 304 data=self.build_refresh_request_body(), 305 headers=self.build_refresh_request_headers(), 306 params=self.build_refresh_request_query_params(), 307 ) 308 309 if not response.ok: 310 # log the response even if the request failed for troubleshooting purposes 311 self._log_response(response) 312 response.raise_for_status() 313 314 response_json = response.json() 315 316 try: 317 # extract the access token and add to secrets to avoid logging the raw value 318 access_key = self._extract_access_token(response_json) 319 if access_key: 320 add_to_secrets(access_key) 321 except ResponseKeysMaxRecurtionReached as e: 322 # could not find the access token in the response, so do nothing 323 pass 324 325 self._log_response(response) 326 327 return response_json 328 except requests.exceptions.RequestException as e: 329 if e.response is not None: 330 if e.response.status_code == 429 or e.response.status_code >= 500: 331 raise DefaultBackoffException( 332 request=e.response.request, 333 response=e.response, 334 failure_type=FailureType.transient_error, 335 ) 336 if self._wrap_refresh_token_exception(e): 337 message = "Refresh token is invalid or expired. Please re-authenticate from Sources/<your source>/Settings." 338 raise AirbyteTracedException( 339 internal_message=message, message=message, failure_type=FailureType.config_error 340 ) 341 raise 342 except Exception as e: 343 raise AirbyteTracedException( 344 message="OAuth access token refresh request failed.", 345 internal_message=f"Unexpected error during OAuth token refresh: {e}", 346 failure_type=FailureType.system_error, 347 ) from e 348 349 def _ensure_access_token_in_response(self, response_data: Mapping[str, Any]) -> None: 350 """ 351 Ensures that the access token is present in the response data. 352 353 This method attempts to extract the access token from the provided response data. 354 If the access token is not found, it raises an exception indicating that the token 355 refresh API response was missing the access token. 356 357 Args: 358 response_data (Mapping[str, Any]): The response data from which to extract the access token. 359 360 Raises: 361 Exception: If the access token is not found in the response data. 362 ResponseKeysMaxRecurtionReached: If the maximum recursion depth is reached while extracting the access token. 363 """ 364 try: 365 access_key = self._extract_access_token(response_data) 366 if not access_key: 367 raise Exception( 368 f"Token refresh API response was missing access token {self.get_access_token_name()}" 369 ) 370 except ResponseKeysMaxRecurtionReached as e: 371 raise e 372 373 def _parse_token_expiration_date(self, value: Union[str, int]) -> AirbyteDateTime: 374 """ 375 Parse a string or integer token expiration date into a datetime object 376 377 :return: expiration datetime 378 """ 379 if self.token_expiry_is_time_of_expiration: 380 if not self.token_expiry_date_format: 381 raise ValueError( 382 f"Invalid token expiry date format {self.token_expiry_date_format}; a string representing the format is required." 383 ) 384 try: 385 return ab_datetime_parse(str(value)) 386 except ValueError as e: 387 raise ValueError(f"Invalid token expiry date format: {e}") 388 else: 389 try: 390 # Only accept numeric values (as int/float/string) when no format specified 391 seconds = int(float(str(value))) 392 return ab_datetime_now() + timedelta(seconds=seconds) 393 except (ValueError, TypeError): 394 raise ValueError( 395 f"Invalid expires_in value: {value}. Expected number of seconds when no format specified." 396 ) 397 398 def _extract_access_token(self, response_data: Mapping[str, Any]) -> Any: 399 """ 400 Extracts the access token from the given response data. 401 402 Args: 403 response_data (Mapping[str, Any]): The response data from which to extract the access token. 404 405 Returns: 406 str: The extracted access token. 407 """ 408 return self._find_and_get_value_from_response(response_data, self.get_access_token_name()) 409 410 def _extract_refresh_token(self, response_data: Mapping[str, Any]) -> Any: 411 """ 412 Extracts the refresh token from the given response data. 413 414 Args: 415 response_data (Mapping[str, Any]): The response data from which to extract the refresh token. 416 417 Returns: 418 str: The extracted refresh token. 419 """ 420 return self._find_and_get_value_from_response(response_data, self.get_refresh_token_name()) 421 422 def _extract_token_expiry_date(self, response_data: Mapping[str, Any]) -> AirbyteDateTime: 423 """ 424 Extracts the token_expiry_date, like `expires_in` or `expires_at`, etc from the given response data. 425 426 If the token_expiry_date is not found, it will return an existing token expiry date if set, or a default token expiry date. 427 428 Args: 429 response_data (Mapping[str, Any]): The response data from which to extract the token_expiry_date. 430 431 Returns: 432 The extracted token_expiry_date or None if not found. 433 """ 434 expires_in = self._find_and_get_value_from_response( 435 response_data, self.get_expires_in_name() 436 ) 437 if expires_in is not None: 438 return self._parse_token_expiration_date(expires_in) 439 440 # expires_in is None 441 existing_expiry_date = self.get_token_expiry_date() 442 if existing_expiry_date and not self.token_has_expired(): 443 return existing_expiry_date 444 445 return self._default_token_expiry_date() 446 447 def _find_and_get_value_from_response( 448 self, 449 response_data: Mapping[str, Any], 450 key_name: str, 451 max_depth: int = 5, 452 current_depth: int = 0, 453 ) -> Any: 454 """ 455 Recursively searches for a specified key in a nested dictionary or list and returns its value if found. 456 457 Args: 458 response_data (Mapping[str, Any]): The response data to search through, which can be a dictionary or a list. 459 key_name (str): The key to search for in the response data. 460 max_depth (int, optional): The maximum depth to search for the key to avoid infinite recursion. Defaults to 5. 461 current_depth (int, optional): The current depth of the recursion. Defaults to 0. 462 463 Returns: 464 Any: The value associated with the specified key if found, otherwise None. 465 466 Raises: 467 AirbyteTracedException: If the maximum recursion depth is reached without finding the key. 468 """ 469 if current_depth > max_depth: 470 # this is needed to avoid an inf loop, possible with a very deep nesting observed. 471 message = f"The maximum level of recursion is reached. Couldn't find the specified `{key_name}` in the response." 472 raise ResponseKeysMaxRecurtionReached( 473 internal_message=message, message=message, failure_type=FailureType.config_error 474 ) 475 476 if isinstance(response_data, dict): 477 # get from the root level 478 if key_name in response_data: 479 return response_data[key_name] 480 481 # get from the nested object 482 for _, value in response_data.items(): 483 result = self._find_and_get_value_from_response( 484 value, key_name, max_depth, current_depth + 1 485 ) 486 if result is not None: 487 return result 488 489 # get from the nested array object 490 elif isinstance(response_data, list): 491 for item in response_data: 492 result = self._find_and_get_value_from_response( 493 item, key_name, max_depth, current_depth + 1 494 ) 495 if result is not None: 496 return result 497 498 return None 499 500 @property 501 def _message_repository(self) -> Optional[MessageRepository]: 502 """ 503 The implementation can define a message_repository if it wants debugging logs for HTTP requests 504 """ 505 return _NOOP_MESSAGE_REPOSITORY 506 507 def _log_response(self, response: requests.Response) -> None: 508 """ 509 Logs the HTTP response using the message repository if it is available. 510 511 Args: 512 response (requests.Response): The HTTP response to log. 513 """ 514 if self._message_repository: 515 self._message_repository.log_message( 516 Level.DEBUG, 517 lambda: format_http_message( 518 response, 519 "Refresh token", 520 "Obtains access token", 521 self._NO_STREAM_NAME, 522 is_auxiliary=True, 523 type="AUTH", 524 ), 525 ) 526 527 # ---------------- 528 # ABSTR METHODS 529 # ---------------- 530 531 @abstractmethod 532 def get_token_refresh_endpoint(self) -> Optional[str]: 533 """Returns the endpoint to refresh the access token""" 534 535 @abstractmethod 536 def get_client_id_name(self) -> str: 537 """The client id name to authenticate""" 538 539 @abstractmethod 540 def get_client_id(self) -> str: 541 """The client id to authenticate""" 542 543 @abstractmethod 544 def get_client_secret_name(self) -> str: 545 """The client secret name to authenticate""" 546 547 @abstractmethod 548 def get_client_secret(self) -> str: 549 """The client secret to authenticate""" 550 551 @abstractmethod 552 def get_refresh_token_name(self) -> str: 553 """The refresh token name to authenticate""" 554 555 @abstractmethod 556 def get_refresh_token(self) -> Optional[str]: 557 """The token used to refresh the access token when it expires""" 558 559 @abstractmethod 560 def get_scopes(self) -> List[str]: 561 """List of requested scopes""" 562 563 @abstractmethod 564 def get_token_expiry_date(self) -> AirbyteDateTime: 565 """Expiration date of the access token""" 566 567 @abstractmethod 568 def set_token_expiry_date(self, value: AirbyteDateTime) -> None: 569 """Setter for access token expiration date""" 570 571 @abstractmethod 572 def get_access_token_name(self) -> str: 573 """Field to extract access token from in the response""" 574 575 @abstractmethod 576 def get_expires_in_name(self) -> str: 577 """Returns the expires_in field name""" 578 579 @abstractmethod 580 def get_refresh_request_body(self) -> Mapping[str, Any]: 581 """Returns the request body to set on the refresh request""" 582 583 @abstractmethod 584 def get_refresh_request_headers(self) -> Mapping[str, Any]: 585 """Returns the request headers to set on the refresh request""" 586 587 def should_send_refresh_request_as_query_params(self) -> bool: 588 """Returns `True` if the standard refresh args should be sent on the URL 589 query string instead of in the request body. 590 591 Defaults to `False` so existing authenticators retain their previous 592 behavior (params in body, no query params on the refresh URL). Subclasses 593 can override this to opt into the URL-query-string shape required by OAuth 594 providers like Gong. 595 """ 596 return False 597 598 @abstractmethod 599 def get_grant_type(self) -> str: 600 """Returns grant_type specified for requesting access_token""" 601 602 @abstractmethod 603 def get_grant_type_name(self) -> str: 604 """Returns grant_type specified name for requesting access_token""" 605 606 @property 607 @abstractmethod 608 def access_token(self) -> str: 609 """Returns the access token""" 610 611 @access_token.setter 612 @abstractmethod 613 def access_token(self, value: str) -> str: 614 """Setter for the access token"""
30class ResponseKeysMaxRecurtionReached(AirbyteTracedException): 31 """ 32 Raised when the max level of recursion is reached, when trying to 33 find-and-get the target key, during the `_make_handled_request` 34 """
Raised when the max level of recursion is reached, when trying to
find-and-get the target key, during the _make_handled_request
37class AbstractOauth2Authenticator(AuthBase): 38 """ 39 Abstract class for an OAuth authenticators that implements the OAuth token refresh flow. The authenticator 40 is designed to generically perform the refresh flow without regard to how config fields are get/set by 41 delegating that behavior to the classes implementing the interface. 42 """ 43 44 _NO_STREAM_NAME = None 45 46 # Class-level lock to prevent concurrent token refresh across multiple authenticator instances. 47 # This is necessary because multiple streams may share the same OAuth credentials (refresh token) 48 # through the connector config. Without this lock, concurrent refresh attempts can cause race 49 # conditions where one stream successfully refreshes the token while others fail because the 50 # refresh token has been invalidated (especially for single-use refresh tokens). 51 _token_refresh_lock: threading.Lock = threading.Lock() 52 53 def __init__( 54 self, 55 refresh_token_error_status_codes: Tuple[int, ...] = (), 56 refresh_token_error_key: str = "", 57 refresh_token_error_values: Tuple[str, ...] = (), 58 ) -> None: 59 """ 60 If all of refresh_token_error_status_codes, refresh_token_error_key, and refresh_token_error_values are set, 61 then http errors with such params will be wrapped in AirbyteTracedException. 62 """ 63 self._refresh_token_error_status_codes = refresh_token_error_status_codes 64 self._refresh_token_error_key = refresh_token_error_key 65 self._refresh_token_error_values = refresh_token_error_values 66 67 def __call__(self, request: requests.PreparedRequest) -> requests.PreparedRequest: 68 """Attach the HTTP headers required to authenticate on the HTTP request""" 69 request.headers.update(self.get_auth_header()) 70 return request 71 72 @property 73 def _is_access_token_flow(self) -> bool: 74 return self.get_token_refresh_endpoint() is None and self.access_token is not None 75 76 @property 77 def token_expiry_is_time_of_expiration(self) -> bool: 78 """ 79 Indicates that the Token Expiry returns the date until which the token will be valid, not the amount of time it will be valid. 80 """ 81 82 return False 83 84 @property 85 def token_expiry_date_format(self) -> Optional[str]: 86 """ 87 Format of the datetime; exists it if expires_in is returned as the expiration datetime instead of seconds until it expires 88 """ 89 90 return None 91 92 def get_auth_header(self) -> Mapping[str, Any]: 93 """HTTP header to set on the requests""" 94 token = self.access_token if self._is_access_token_flow else self.get_access_token() 95 return {"Authorization": f"Bearer {token}"} 96 97 def get_access_token(self) -> str: 98 """ 99 Returns the access token. 100 101 This method uses double-checked locking to ensure thread-safe token refresh. 102 When multiple threads (streams) detect an expired token simultaneously, only one 103 will perform the refresh while others wait. After acquiring the lock, the token 104 expiry is re-checked to avoid redundant refresh attempts. 105 """ 106 if self.token_has_expired(): 107 with self._token_refresh_lock: 108 # Double-check after acquiring lock - another thread may have already refreshed 109 if self.token_has_expired(): 110 self.refresh_and_set_access_token() 111 112 return self.access_token 113 114 def refresh_and_set_access_token(self) -> None: 115 """Force refresh the access token and update internal state. 116 117 This method refreshes the access token regardless of whether it has expired, 118 and updates the internal token and expiry date. Subclasses may override this 119 to handle additional state updates (e.g., persisting new refresh tokens). 120 """ 121 token, expires_in = self.refresh_access_token() 122 self.access_token = token 123 self.set_token_expiry_date(expires_in) 124 125 def token_has_expired(self) -> bool: 126 """Returns True if the token is expired""" 127 return ab_datetime_now() > self.get_token_expiry_date() 128 129 def _build_standard_refresh_args(self) -> MutableMapping[str, Any]: 130 """Build the standard OAuth refresh args (grant_type, refresh_token, client 131 credentials, scopes, plus any user-configured `refresh_request_body` extras). 132 133 Used by both `build_refresh_request_body()` and 134 `build_refresh_request_query_params()` so the same set of args can be emitted 135 in either the body or the URL query string depending on 136 `should_send_refresh_request_as_query_params()`. 137 138 Client credentials (client_id and client_secret) are excluded when 139 `refresh_request_headers` contains an `Authorization` header (e.g. Basic 140 auth). This is required by OAuth providers like Gong that expect credentials 141 ONLY in the Authorization header and reject requests that include them in 142 both places. 143 """ 144 headers = self.get_refresh_request_headers() 145 credentials_in_header = headers and "Authorization" in headers 146 include_client_credentials = not credentials_in_header 147 148 payload: MutableMapping[str, Any] = { 149 self.get_grant_type_name(): self.get_grant_type(), 150 } 151 152 if include_client_credentials: 153 payload[self.get_client_id_name()] = self.get_client_id() 154 payload[self.get_client_secret_name()] = self.get_client_secret() 155 156 payload[self.get_refresh_token_name()] = self.get_refresh_token() 157 158 if self.get_scopes(): 159 payload["scopes"] = self.get_scopes() 160 161 if self.get_refresh_request_body(): 162 for key, val in self.get_refresh_request_body().items(): 163 # Existing oauth args take precedence over custom configured fields. 164 if key not in payload: 165 payload[key] = val 166 167 return payload 168 169 def build_refresh_request_body(self) -> Mapping[str, Any]: 170 """Returns the request body to set on the refresh request. 171 172 When `should_send_refresh_request_as_query_params()` is `True`, the standard 173 refresh args are emitted on the URL query string instead and this method 174 returns an empty body. This supports OAuth providers like Gong that document 175 their refresh endpoint as a `POST` with parameters on the URL query string 176 and an empty body. 177 """ 178 if self.should_send_refresh_request_as_query_params(): 179 return {} 180 return self._build_standard_refresh_args() 181 182 def build_refresh_request_headers(self) -> Mapping[str, Any] | None: 183 """ 184 Returns the request headers to set on the refresh request 185 186 """ 187 headers = self.get_refresh_request_headers() 188 return headers if headers else None 189 190 def build_refresh_request_query_params(self) -> Mapping[str, Any] | None: 191 """Returns the URL query string parameters to set on the refresh request. 192 193 When `should_send_refresh_request_as_query_params()` is `True`, the standard 194 refresh args (grant_type, refresh_token, client credentials, scopes, plus 195 any user-configured `refresh_request_body` extras) are returned here and 196 `build_refresh_request_body()` returns an empty body. 197 198 Returns `None` otherwise so existing authenticators retain their previous 199 behavior (no query params on the refresh URL). 200 """ 201 if not self.should_send_refresh_request_as_query_params(): 202 return None 203 return self._build_standard_refresh_args() 204 205 def refresh_access_token(self) -> Tuple[str, AirbyteDateTime]: 206 """ 207 Returns the refresh token and its expiration datetime 208 209 :return: a tuple of (access_token, token_lifespan) 210 """ 211 try: 212 response_json = self._make_handled_request() 213 except ( 214 requests.exceptions.ConnectionError, 215 requests.exceptions.ConnectTimeout, 216 requests.exceptions.ReadTimeout, 217 ) as e: 218 raise AirbyteTracedException( 219 message="OAuth access token refresh request failed due to a network error.", 220 internal_message=f"Network error during OAuth token refresh after retries were exhausted: {e}", 221 failure_type=FailureType.transient_error, 222 ) from e 223 self._ensure_access_token_in_response(response_json) 224 225 return ( 226 self._extract_access_token(response_json), 227 self._extract_token_expiry_date(response_json), 228 ) 229 230 # ---------------- 231 # PRIVATE METHODS 232 # ---------------- 233 234 def _default_token_expiry_date(self) -> AirbyteDateTime: 235 """ 236 Returns the default token expiry date 237 """ 238 # 1 hour was chosen as a middle ground to avoid unnecessary frequent refreshes and token expiration 239 default_token_expiry_duration_hours = 1 # 1 hour 240 return ab_datetime_now() + timedelta(hours=default_token_expiry_duration_hours) 241 242 def _wrap_refresh_token_exception( 243 self, exception: requests.exceptions.RequestException 244 ) -> bool: 245 """ 246 Wraps and handles exceptions that occur during the refresh token process. 247 248 This method checks if the provided exception is related to a refresh token error 249 by examining the response status code and specific error content. 250 251 Args: 252 exception (requests.exceptions.RequestException): The exception raised during the request. 253 254 Returns: 255 bool: True if the exception is related to a refresh token error, False otherwise. 256 """ 257 try: 258 if exception.response is not None: 259 exception_content = exception.response.json() 260 else: 261 return False 262 except JSONDecodeError: 263 return False 264 return ( 265 exception.response.status_code in self._refresh_token_error_status_codes 266 and exception_content.get(self._refresh_token_error_key) 267 in self._refresh_token_error_values 268 ) 269 270 @backoff.on_exception( 271 backoff.expo, 272 ( 273 DefaultBackoffException, 274 requests.exceptions.ConnectionError, 275 requests.exceptions.ConnectTimeout, 276 requests.exceptions.ReadTimeout, 277 ), 278 on_backoff=lambda details: logger.info( 279 f"Caught retryable error after {details['tries']} tries. Waiting {details['wait']} seconds then retrying..." 280 ), 281 max_time=300, 282 ) 283 def _make_handled_request(self) -> Any: 284 """ 285 Makes a handled HTTP request to refresh an OAuth token. 286 287 This method sends a POST request to the token refresh endpoint with the necessary 288 headers and body to obtain a new access token. It handles various exceptions that 289 may occur during the request and logs the response for troubleshooting purposes. 290 291 Returns: 292 Mapping[str, Any]: The JSON response from the token refresh endpoint. 293 294 Raises: 295 DefaultBackoffException: If the response status code is 429 (Too Many Requests) 296 or any 5xx server error. 297 AirbyteTracedException: If the refresh token is invalid or expired, prompting 298 re-authentication. 299 Exception: For any other exceptions that occur during the request. 300 """ 301 try: 302 response = requests.request( 303 method="POST", 304 url=self.get_token_refresh_endpoint(), # type: ignore # returns None, if not provided, but str | bytes is expected. 305 data=self.build_refresh_request_body(), 306 headers=self.build_refresh_request_headers(), 307 params=self.build_refresh_request_query_params(), 308 ) 309 310 if not response.ok: 311 # log the response even if the request failed for troubleshooting purposes 312 self._log_response(response) 313 response.raise_for_status() 314 315 response_json = response.json() 316 317 try: 318 # extract the access token and add to secrets to avoid logging the raw value 319 access_key = self._extract_access_token(response_json) 320 if access_key: 321 add_to_secrets(access_key) 322 except ResponseKeysMaxRecurtionReached as e: 323 # could not find the access token in the response, so do nothing 324 pass 325 326 self._log_response(response) 327 328 return response_json 329 except requests.exceptions.RequestException as e: 330 if e.response is not None: 331 if e.response.status_code == 429 or e.response.status_code >= 500: 332 raise DefaultBackoffException( 333 request=e.response.request, 334 response=e.response, 335 failure_type=FailureType.transient_error, 336 ) 337 if self._wrap_refresh_token_exception(e): 338 message = "Refresh token is invalid or expired. Please re-authenticate from Sources/<your source>/Settings." 339 raise AirbyteTracedException( 340 internal_message=message, message=message, failure_type=FailureType.config_error 341 ) 342 raise 343 except Exception as e: 344 raise AirbyteTracedException( 345 message="OAuth access token refresh request failed.", 346 internal_message=f"Unexpected error during OAuth token refresh: {e}", 347 failure_type=FailureType.system_error, 348 ) from e 349 350 def _ensure_access_token_in_response(self, response_data: Mapping[str, Any]) -> None: 351 """ 352 Ensures that the access token is present in the response data. 353 354 This method attempts to extract the access token from the provided response data. 355 If the access token is not found, it raises an exception indicating that the token 356 refresh API response was missing the access token. 357 358 Args: 359 response_data (Mapping[str, Any]): The response data from which to extract the access token. 360 361 Raises: 362 Exception: If the access token is not found in the response data. 363 ResponseKeysMaxRecurtionReached: If the maximum recursion depth is reached while extracting the access token. 364 """ 365 try: 366 access_key = self._extract_access_token(response_data) 367 if not access_key: 368 raise Exception( 369 f"Token refresh API response was missing access token {self.get_access_token_name()}" 370 ) 371 except ResponseKeysMaxRecurtionReached as e: 372 raise e 373 374 def _parse_token_expiration_date(self, value: Union[str, int]) -> AirbyteDateTime: 375 """ 376 Parse a string or integer token expiration date into a datetime object 377 378 :return: expiration datetime 379 """ 380 if self.token_expiry_is_time_of_expiration: 381 if not self.token_expiry_date_format: 382 raise ValueError( 383 f"Invalid token expiry date format {self.token_expiry_date_format}; a string representing the format is required." 384 ) 385 try: 386 return ab_datetime_parse(str(value)) 387 except ValueError as e: 388 raise ValueError(f"Invalid token expiry date format: {e}") 389 else: 390 try: 391 # Only accept numeric values (as int/float/string) when no format specified 392 seconds = int(float(str(value))) 393 return ab_datetime_now() + timedelta(seconds=seconds) 394 except (ValueError, TypeError): 395 raise ValueError( 396 f"Invalid expires_in value: {value}. Expected number of seconds when no format specified." 397 ) 398 399 def _extract_access_token(self, response_data: Mapping[str, Any]) -> Any: 400 """ 401 Extracts the access token from the given response data. 402 403 Args: 404 response_data (Mapping[str, Any]): The response data from which to extract the access token. 405 406 Returns: 407 str: The extracted access token. 408 """ 409 return self._find_and_get_value_from_response(response_data, self.get_access_token_name()) 410 411 def _extract_refresh_token(self, response_data: Mapping[str, Any]) -> Any: 412 """ 413 Extracts the refresh token from the given response data. 414 415 Args: 416 response_data (Mapping[str, Any]): The response data from which to extract the refresh token. 417 418 Returns: 419 str: The extracted refresh token. 420 """ 421 return self._find_and_get_value_from_response(response_data, self.get_refresh_token_name()) 422 423 def _extract_token_expiry_date(self, response_data: Mapping[str, Any]) -> AirbyteDateTime: 424 """ 425 Extracts the token_expiry_date, like `expires_in` or `expires_at`, etc from the given response data. 426 427 If the token_expiry_date is not found, it will return an existing token expiry date if set, or a default token expiry date. 428 429 Args: 430 response_data (Mapping[str, Any]): The response data from which to extract the token_expiry_date. 431 432 Returns: 433 The extracted token_expiry_date or None if not found. 434 """ 435 expires_in = self._find_and_get_value_from_response( 436 response_data, self.get_expires_in_name() 437 ) 438 if expires_in is not None: 439 return self._parse_token_expiration_date(expires_in) 440 441 # expires_in is None 442 existing_expiry_date = self.get_token_expiry_date() 443 if existing_expiry_date and not self.token_has_expired(): 444 return existing_expiry_date 445 446 return self._default_token_expiry_date() 447 448 def _find_and_get_value_from_response( 449 self, 450 response_data: Mapping[str, Any], 451 key_name: str, 452 max_depth: int = 5, 453 current_depth: int = 0, 454 ) -> Any: 455 """ 456 Recursively searches for a specified key in a nested dictionary or list and returns its value if found. 457 458 Args: 459 response_data (Mapping[str, Any]): The response data to search through, which can be a dictionary or a list. 460 key_name (str): The key to search for in the response data. 461 max_depth (int, optional): The maximum depth to search for the key to avoid infinite recursion. Defaults to 5. 462 current_depth (int, optional): The current depth of the recursion. Defaults to 0. 463 464 Returns: 465 Any: The value associated with the specified key if found, otherwise None. 466 467 Raises: 468 AirbyteTracedException: If the maximum recursion depth is reached without finding the key. 469 """ 470 if current_depth > max_depth: 471 # this is needed to avoid an inf loop, possible with a very deep nesting observed. 472 message = f"The maximum level of recursion is reached. Couldn't find the specified `{key_name}` in the response." 473 raise ResponseKeysMaxRecurtionReached( 474 internal_message=message, message=message, failure_type=FailureType.config_error 475 ) 476 477 if isinstance(response_data, dict): 478 # get from the root level 479 if key_name in response_data: 480 return response_data[key_name] 481 482 # get from the nested object 483 for _, value in response_data.items(): 484 result = self._find_and_get_value_from_response( 485 value, key_name, max_depth, current_depth + 1 486 ) 487 if result is not None: 488 return result 489 490 # get from the nested array object 491 elif isinstance(response_data, list): 492 for item in response_data: 493 result = self._find_and_get_value_from_response( 494 item, key_name, max_depth, current_depth + 1 495 ) 496 if result is not None: 497 return result 498 499 return None 500 501 @property 502 def _message_repository(self) -> Optional[MessageRepository]: 503 """ 504 The implementation can define a message_repository if it wants debugging logs for HTTP requests 505 """ 506 return _NOOP_MESSAGE_REPOSITORY 507 508 def _log_response(self, response: requests.Response) -> None: 509 """ 510 Logs the HTTP response using the message repository if it is available. 511 512 Args: 513 response (requests.Response): The HTTP response to log. 514 """ 515 if self._message_repository: 516 self._message_repository.log_message( 517 Level.DEBUG, 518 lambda: format_http_message( 519 response, 520 "Refresh token", 521 "Obtains access token", 522 self._NO_STREAM_NAME, 523 is_auxiliary=True, 524 type="AUTH", 525 ), 526 ) 527 528 # ---------------- 529 # ABSTR METHODS 530 # ---------------- 531 532 @abstractmethod 533 def get_token_refresh_endpoint(self) -> Optional[str]: 534 """Returns the endpoint to refresh the access token""" 535 536 @abstractmethod 537 def get_client_id_name(self) -> str: 538 """The client id name to authenticate""" 539 540 @abstractmethod 541 def get_client_id(self) -> str: 542 """The client id to authenticate""" 543 544 @abstractmethod 545 def get_client_secret_name(self) -> str: 546 """The client secret name to authenticate""" 547 548 @abstractmethod 549 def get_client_secret(self) -> str: 550 """The client secret to authenticate""" 551 552 @abstractmethod 553 def get_refresh_token_name(self) -> str: 554 """The refresh token name to authenticate""" 555 556 @abstractmethod 557 def get_refresh_token(self) -> Optional[str]: 558 """The token used to refresh the access token when it expires""" 559 560 @abstractmethod 561 def get_scopes(self) -> List[str]: 562 """List of requested scopes""" 563 564 @abstractmethod 565 def get_token_expiry_date(self) -> AirbyteDateTime: 566 """Expiration date of the access token""" 567 568 @abstractmethod 569 def set_token_expiry_date(self, value: AirbyteDateTime) -> None: 570 """Setter for access token expiration date""" 571 572 @abstractmethod 573 def get_access_token_name(self) -> str: 574 """Field to extract access token from in the response""" 575 576 @abstractmethod 577 def get_expires_in_name(self) -> str: 578 """Returns the expires_in field name""" 579 580 @abstractmethod 581 def get_refresh_request_body(self) -> Mapping[str, Any]: 582 """Returns the request body to set on the refresh request""" 583 584 @abstractmethod 585 def get_refresh_request_headers(self) -> Mapping[str, Any]: 586 """Returns the request headers to set on the refresh request""" 587 588 def should_send_refresh_request_as_query_params(self) -> bool: 589 """Returns `True` if the standard refresh args should be sent on the URL 590 query string instead of in the request body. 591 592 Defaults to `False` so existing authenticators retain their previous 593 behavior (params in body, no query params on the refresh URL). Subclasses 594 can override this to opt into the URL-query-string shape required by OAuth 595 providers like Gong. 596 """ 597 return False 598 599 @abstractmethod 600 def get_grant_type(self) -> str: 601 """Returns grant_type specified for requesting access_token""" 602 603 @abstractmethod 604 def get_grant_type_name(self) -> str: 605 """Returns grant_type specified name for requesting access_token""" 606 607 @property 608 @abstractmethod 609 def access_token(self) -> str: 610 """Returns the access token""" 611 612 @access_token.setter 613 @abstractmethod 614 def access_token(self, value: str) -> str: 615 """Setter for the access token"""
Abstract class for an OAuth authenticators that implements the OAuth token refresh flow. The authenticator is designed to generically perform the refresh flow without regard to how config fields are get/set by delegating that behavior to the classes implementing the interface.
53 def __init__( 54 self, 55 refresh_token_error_status_codes: Tuple[int, ...] = (), 56 refresh_token_error_key: str = "", 57 refresh_token_error_values: Tuple[str, ...] = (), 58 ) -> None: 59 """ 60 If all of refresh_token_error_status_codes, refresh_token_error_key, and refresh_token_error_values are set, 61 then http errors with such params will be wrapped in AirbyteTracedException. 62 """ 63 self._refresh_token_error_status_codes = refresh_token_error_status_codes 64 self._refresh_token_error_key = refresh_token_error_key 65 self._refresh_token_error_values = refresh_token_error_values
If all of refresh_token_error_status_codes, refresh_token_error_key, and refresh_token_error_values are set, then http errors with such params will be wrapped in AirbyteTracedException.
76 @property 77 def token_expiry_is_time_of_expiration(self) -> bool: 78 """ 79 Indicates that the Token Expiry returns the date until which the token will be valid, not the amount of time it will be valid. 80 """ 81 82 return False
Indicates that the Token Expiry returns the date until which the token will be valid, not the amount of time it will be valid.
84 @property 85 def token_expiry_date_format(self) -> Optional[str]: 86 """ 87 Format of the datetime; exists it if expires_in is returned as the expiration datetime instead of seconds until it expires 88 """ 89 90 return None
Format of the datetime; exists it if expires_in is returned as the expiration datetime instead of seconds until it expires
92 def get_auth_header(self) -> Mapping[str, Any]: 93 """HTTP header to set on the requests""" 94 token = self.access_token if self._is_access_token_flow else self.get_access_token() 95 return {"Authorization": f"Bearer {token}"}
HTTP header to set on the requests
97 def get_access_token(self) -> str: 98 """ 99 Returns the access token. 100 101 This method uses double-checked locking to ensure thread-safe token refresh. 102 When multiple threads (streams) detect an expired token simultaneously, only one 103 will perform the refresh while others wait. After acquiring the lock, the token 104 expiry is re-checked to avoid redundant refresh attempts. 105 """ 106 if self.token_has_expired(): 107 with self._token_refresh_lock: 108 # Double-check after acquiring lock - another thread may have already refreshed 109 if self.token_has_expired(): 110 self.refresh_and_set_access_token() 111 112 return self.access_token
Returns the access token.
This method uses double-checked locking to ensure thread-safe token refresh. When multiple threads (streams) detect an expired token simultaneously, only one will perform the refresh while others wait. After acquiring the lock, the token expiry is re-checked to avoid redundant refresh attempts.
114 def refresh_and_set_access_token(self) -> None: 115 """Force refresh the access token and update internal state. 116 117 This method refreshes the access token regardless of whether it has expired, 118 and updates the internal token and expiry date. Subclasses may override this 119 to handle additional state updates (e.g., persisting new refresh tokens). 120 """ 121 token, expires_in = self.refresh_access_token() 122 self.access_token = token 123 self.set_token_expiry_date(expires_in)
Force refresh the access token and update internal state.
This method refreshes the access token regardless of whether it has expired, and updates the internal token and expiry date. Subclasses may override this to handle additional state updates (e.g., persisting new refresh tokens).
125 def token_has_expired(self) -> bool: 126 """Returns True if the token is expired""" 127 return ab_datetime_now() > self.get_token_expiry_date()
Returns True if the token is expired
169 def build_refresh_request_body(self) -> Mapping[str, Any]: 170 """Returns the request body to set on the refresh request. 171 172 When `should_send_refresh_request_as_query_params()` is `True`, the standard 173 refresh args are emitted on the URL query string instead and this method 174 returns an empty body. This supports OAuth providers like Gong that document 175 their refresh endpoint as a `POST` with parameters on the URL query string 176 and an empty body. 177 """ 178 if self.should_send_refresh_request_as_query_params(): 179 return {} 180 return self._build_standard_refresh_args()
Returns the request body to set on the refresh request.
When should_send_refresh_request_as_query_params() is True, the standard
refresh args are emitted on the URL query string instead and this method
returns an empty body. This supports OAuth providers like Gong that document
their refresh endpoint as a POST with parameters on the URL query string
and an empty body.
182 def build_refresh_request_headers(self) -> Mapping[str, Any] | None: 183 """ 184 Returns the request headers to set on the refresh request 185 186 """ 187 headers = self.get_refresh_request_headers() 188 return headers if headers else None
Returns the request headers to set on the refresh request
190 def build_refresh_request_query_params(self) -> Mapping[str, Any] | None: 191 """Returns the URL query string parameters to set on the refresh request. 192 193 When `should_send_refresh_request_as_query_params()` is `True`, the standard 194 refresh args (grant_type, refresh_token, client credentials, scopes, plus 195 any user-configured `refresh_request_body` extras) are returned here and 196 `build_refresh_request_body()` returns an empty body. 197 198 Returns `None` otherwise so existing authenticators retain their previous 199 behavior (no query params on the refresh URL). 200 """ 201 if not self.should_send_refresh_request_as_query_params(): 202 return None 203 return self._build_standard_refresh_args()
Returns the URL query string parameters to set on the refresh request.
When should_send_refresh_request_as_query_params() is True, the standard
refresh args (grant_type, refresh_token, client credentials, scopes, plus
any user-configured refresh_request_body extras) are returned here and
build_refresh_request_body() returns an empty body.
Returns None otherwise so existing authenticators retain their previous
behavior (no query params on the refresh URL).
205 def refresh_access_token(self) -> Tuple[str, AirbyteDateTime]: 206 """ 207 Returns the refresh token and its expiration datetime 208 209 :return: a tuple of (access_token, token_lifespan) 210 """ 211 try: 212 response_json = self._make_handled_request() 213 except ( 214 requests.exceptions.ConnectionError, 215 requests.exceptions.ConnectTimeout, 216 requests.exceptions.ReadTimeout, 217 ) as e: 218 raise AirbyteTracedException( 219 message="OAuth access token refresh request failed due to a network error.", 220 internal_message=f"Network error during OAuth token refresh after retries were exhausted: {e}", 221 failure_type=FailureType.transient_error, 222 ) from e 223 self._ensure_access_token_in_response(response_json) 224 225 return ( 226 self._extract_access_token(response_json), 227 self._extract_token_expiry_date(response_json), 228 )
Returns the refresh token and its expiration datetime
Returns
a tuple of (access_token, token_lifespan)
532 @abstractmethod 533 def get_token_refresh_endpoint(self) -> Optional[str]: 534 """Returns the endpoint to refresh the access token"""
Returns the endpoint to refresh the access token
536 @abstractmethod 537 def get_client_id_name(self) -> str: 538 """The client id name to authenticate"""
The client id name to authenticate
544 @abstractmethod 545 def get_client_secret_name(self) -> str: 546 """The client secret name to authenticate"""
The client secret name to authenticate
548 @abstractmethod 549 def get_client_secret(self) -> str: 550 """The client secret to authenticate"""
The client secret to authenticate
552 @abstractmethod 553 def get_refresh_token_name(self) -> str: 554 """The refresh token name to authenticate"""
The refresh token name to authenticate
556 @abstractmethod 557 def get_refresh_token(self) -> Optional[str]: 558 """The token used to refresh the access token when it expires"""
The token used to refresh the access token when it expires
564 @abstractmethod 565 def get_token_expiry_date(self) -> AirbyteDateTime: 566 """Expiration date of the access token"""
Expiration date of the access token
568 @abstractmethod 569 def set_token_expiry_date(self, value: AirbyteDateTime) -> None: 570 """Setter for access token expiration date"""
Setter for access token expiration date
572 @abstractmethod 573 def get_access_token_name(self) -> str: 574 """Field to extract access token from in the response"""
Field to extract access token from in the response
576 @abstractmethod 577 def get_expires_in_name(self) -> str: 578 """Returns the expires_in field name"""
Returns the expires_in field name
580 @abstractmethod 581 def get_refresh_request_body(self) -> Mapping[str, Any]: 582 """Returns the request body to set on the refresh request"""
Returns the request body to set on the refresh request
584 @abstractmethod 585 def get_refresh_request_headers(self) -> Mapping[str, Any]: 586 """Returns the request headers to set on the refresh request"""
Returns the request headers to set on the refresh request
588 def should_send_refresh_request_as_query_params(self) -> bool: 589 """Returns `True` if the standard refresh args should be sent on the URL 590 query string instead of in the request body. 591 592 Defaults to `False` so existing authenticators retain their previous 593 behavior (params in body, no query params on the refresh URL). Subclasses 594 can override this to opt into the URL-query-string shape required by OAuth 595 providers like Gong. 596 """ 597 return False
Returns True if the standard refresh args should be sent on the URL
query string instead of in the request body.
Defaults to False so existing authenticators retain their previous
behavior (params in body, no query params on the refresh URL). Subclasses
can override this to opt into the URL-query-string shape required by OAuth
providers like Gong.
599 @abstractmethod 600 def get_grant_type(self) -> str: 601 """Returns grant_type specified for requesting access_token"""
Returns grant_type specified for requesting access_token