airbyte_cdk.sources.streams.http.requests_native_auth.abstract_oauth

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5import logging
  6import threading
  7from abc import abstractmethod
  8from datetime import timedelta
  9from json import JSONDecodeError
 10from typing import Any, List, Mapping, MutableMapping, Optional, Tuple, Union
 11
 12import backoff
 13import requests
 14from requests.auth import AuthBase
 15
 16from airbyte_cdk.models import FailureType, Level
 17from airbyte_cdk.sources.http_logger import format_http_message
 18from airbyte_cdk.sources.message import MessageRepository, NoopMessageRepository
 19from airbyte_cdk.utils import AirbyteTracedException
 20from airbyte_cdk.utils.airbyte_secrets_utils import add_to_secrets
 21from airbyte_cdk.utils.datetime_helpers import AirbyteDateTime, ab_datetime_now, ab_datetime_parse
 22
 23from ..exceptions import DefaultBackoffException
 24
 25logger = logging.getLogger("airbyte")
 26_NOOP_MESSAGE_REPOSITORY = NoopMessageRepository()
 27
 28
 29class ResponseKeysMaxRecurtionReached(AirbyteTracedException):
 30    """
 31    Raised when the max level of recursion is reached, when trying to
 32    find-and-get the target key, during the `_make_handled_request`
 33    """
 34
 35
 36class AbstractOauth2Authenticator(AuthBase):
 37    """
 38    Abstract class for an OAuth authenticators that implements the OAuth token refresh flow. The authenticator
 39    is designed to generically perform the refresh flow without regard to how config fields are get/set by
 40    delegating that behavior to the classes implementing the interface.
 41    """
 42
 43    _NO_STREAM_NAME = None
 44
 45    # Class-level lock to prevent concurrent token refresh across multiple authenticator instances.
 46    # This is necessary because multiple streams may share the same OAuth credentials (refresh token)
 47    # through the connector config. Without this lock, concurrent refresh attempts can cause race
 48    # conditions where one stream successfully refreshes the token while others fail because the
 49    # refresh token has been invalidated (especially for single-use refresh tokens).
 50    _token_refresh_lock: threading.Lock = threading.Lock()
 51
 52    def __init__(
 53        self,
 54        refresh_token_error_status_codes: Tuple[int, ...] = (),
 55        refresh_token_error_key: str = "",
 56        refresh_token_error_values: Tuple[str, ...] = (),
 57    ) -> None:
 58        """
 59        If all of refresh_token_error_status_codes, refresh_token_error_key, and refresh_token_error_values are set,
 60        then http errors with such params will be wrapped in AirbyteTracedException.
 61        """
 62        self._refresh_token_error_status_codes = refresh_token_error_status_codes
 63        self._refresh_token_error_key = refresh_token_error_key
 64        self._refresh_token_error_values = refresh_token_error_values
 65
 66    def __call__(self, request: requests.PreparedRequest) -> requests.PreparedRequest:
 67        """Attach the HTTP headers required to authenticate on the HTTP request"""
 68        request.headers.update(self.get_auth_header())
 69        return request
 70
 71    @property
 72    def _is_access_token_flow(self) -> bool:
 73        return self.get_token_refresh_endpoint() is None and self.access_token is not None
 74
 75    @property
 76    def token_expiry_is_time_of_expiration(self) -> bool:
 77        """
 78        Indicates that the Token Expiry returns the date until which the token will be valid, not the amount of time it will be valid.
 79        """
 80
 81        return False
 82
 83    @property
 84    def token_expiry_date_format(self) -> Optional[str]:
 85        """
 86        Format of the datetime; exists it if expires_in is returned as the expiration datetime instead of seconds until it expires
 87        """
 88
 89        return None
 90
 91    def get_auth_header(self) -> Mapping[str, Any]:
 92        """HTTP header to set on the requests"""
 93        token = self.access_token if self._is_access_token_flow else self.get_access_token()
 94        return {"Authorization": f"Bearer {token}"}
 95
 96    def get_access_token(self) -> str:
 97        """
 98        Returns the access token.
 99
100        This method uses double-checked locking to ensure thread-safe token refresh.
101        When multiple threads (streams) detect an expired token simultaneously, only one
102        will perform the refresh while others wait. After acquiring the lock, the token
103        expiry is re-checked to avoid redundant refresh attempts.
104        """
105        if self.token_has_expired():
106            with self._token_refresh_lock:
107                # Double-check after acquiring lock - another thread may have already refreshed
108                if self.token_has_expired():
109                    self.refresh_and_set_access_token()
110
111        return self.access_token
112
113    def refresh_and_set_access_token(self) -> None:
114        """Force refresh the access token and update internal state.
115
116        This method refreshes the access token regardless of whether it has expired,
117        and updates the internal token and expiry date. Subclasses may override this
118        to handle additional state updates (e.g., persisting new refresh tokens).
119        """
120        token, expires_in = self.refresh_access_token()
121        self.access_token = token
122        self.set_token_expiry_date(expires_in)
123
124    def token_has_expired(self) -> bool:
125        """Returns True if the token is expired"""
126        return ab_datetime_now() > self.get_token_expiry_date()
127
128    def _build_standard_refresh_args(self) -> MutableMapping[str, Any]:
129        """Build the standard OAuth refresh args (grant_type, refresh_token, client
130        credentials, scopes, plus any user-configured `refresh_request_body` extras).
131
132        Used by both `build_refresh_request_body()` and
133        `build_refresh_request_query_params()` so the same set of args can be emitted
134        in either the body or the URL query string depending on
135        `should_send_refresh_request_as_query_params()`.
136
137        Client credentials (client_id and client_secret) are excluded when
138        `refresh_request_headers` contains an `Authorization` header (e.g. Basic
139        auth). This is required by OAuth providers like Gong that expect credentials
140        ONLY in the Authorization header and reject requests that include them in
141        both places.
142        """
143        headers = self.get_refresh_request_headers()
144        credentials_in_header = headers and "Authorization" in headers
145        include_client_credentials = not credentials_in_header
146
147        payload: MutableMapping[str, Any] = {
148            self.get_grant_type_name(): self.get_grant_type(),
149        }
150
151        if include_client_credentials:
152            payload[self.get_client_id_name()] = self.get_client_id()
153            payload[self.get_client_secret_name()] = self.get_client_secret()
154
155        payload[self.get_refresh_token_name()] = self.get_refresh_token()
156
157        if self.get_scopes():
158            payload["scopes"] = self.get_scopes()
159
160        if self.get_refresh_request_body():
161            for key, val in self.get_refresh_request_body().items():
162                # Existing oauth args take precedence over custom configured fields.
163                if key not in payload:
164                    payload[key] = val
165
166        return payload
167
168    def build_refresh_request_body(self) -> Mapping[str, Any]:
169        """Returns the request body to set on the refresh request.
170
171        When `should_send_refresh_request_as_query_params()` is `True`, the standard
172        refresh args are emitted on the URL query string instead and this method
173        returns an empty body. This supports OAuth providers like Gong that document
174        their refresh endpoint as a `POST` with parameters on the URL query string
175        and an empty body.
176        """
177        if self.should_send_refresh_request_as_query_params():
178            return {}
179        return self._build_standard_refresh_args()
180
181    def build_refresh_request_headers(self) -> Mapping[str, Any] | None:
182        """
183        Returns the request headers to set on the refresh request
184
185        """
186        headers = self.get_refresh_request_headers()
187        return headers if headers else None
188
189    def build_refresh_request_query_params(self) -> Mapping[str, Any] | None:
190        """Returns the URL query string parameters to set on the refresh request.
191
192        When `should_send_refresh_request_as_query_params()` is `True`, the standard
193        refresh args (grant_type, refresh_token, client credentials, scopes, plus
194        any user-configured `refresh_request_body` extras) are returned here and
195        `build_refresh_request_body()` returns an empty body.
196
197        Returns `None` otherwise so existing authenticators retain their previous
198        behavior (no query params on the refresh URL).
199        """
200        if not self.should_send_refresh_request_as_query_params():
201            return None
202        return self._build_standard_refresh_args()
203
204    def refresh_access_token(self) -> Tuple[str, AirbyteDateTime]:
205        """
206        Returns the refresh token and its expiration datetime
207
208        :return: a tuple of (access_token, token_lifespan)
209        """
210        try:
211            response_json = self._make_handled_request()
212        except (
213            requests.exceptions.ConnectionError,
214            requests.exceptions.ConnectTimeout,
215            requests.exceptions.ReadTimeout,
216        ) as e:
217            raise AirbyteTracedException(
218                message="OAuth access token refresh request failed due to a network error.",
219                internal_message=f"Network error during OAuth token refresh after retries were exhausted: {e}",
220                failure_type=FailureType.transient_error,
221            ) from e
222        self._ensure_access_token_in_response(response_json)
223
224        return (
225            self._extract_access_token(response_json),
226            self._extract_token_expiry_date(response_json),
227        )
228
229    # ----------------
230    # PRIVATE METHODS
231    # ----------------
232
233    def _default_token_expiry_date(self) -> AirbyteDateTime:
234        """
235        Returns the default token expiry date
236        """
237        # 1 hour was chosen as a middle ground to avoid unnecessary frequent refreshes and token expiration
238        default_token_expiry_duration_hours = 1  # 1 hour
239        return ab_datetime_now() + timedelta(hours=default_token_expiry_duration_hours)
240
241    def _wrap_refresh_token_exception(
242        self, exception: requests.exceptions.RequestException
243    ) -> bool:
244        """
245        Wraps and handles exceptions that occur during the refresh token process.
246
247        This method checks if the provided exception is related to a refresh token error
248        by examining the response status code and specific error content.
249
250        Args:
251            exception (requests.exceptions.RequestException): The exception raised during the request.
252
253        Returns:
254            bool: True if the exception is related to a refresh token error, False otherwise.
255        """
256        try:
257            if exception.response is not None:
258                exception_content = exception.response.json()
259            else:
260                return False
261        except JSONDecodeError:
262            return False
263        return (
264            exception.response.status_code in self._refresh_token_error_status_codes
265            and exception_content.get(self._refresh_token_error_key)
266            in self._refresh_token_error_values
267        )
268
269    @backoff.on_exception(
270        backoff.expo,
271        (
272            DefaultBackoffException,
273            requests.exceptions.ConnectionError,
274            requests.exceptions.ConnectTimeout,
275            requests.exceptions.ReadTimeout,
276        ),
277        on_backoff=lambda details: logger.info(
278            f"Caught retryable error after {details['tries']} tries. Waiting {details['wait']} seconds then retrying..."
279        ),
280        max_time=300,
281    )
282    def _make_handled_request(self) -> Any:
283        """
284        Makes a handled HTTP request to refresh an OAuth token.
285
286        This method sends a POST request to the token refresh endpoint with the necessary
287        headers and body to obtain a new access token. It handles various exceptions that
288        may occur during the request and logs the response for troubleshooting purposes.
289
290        Returns:
291            Mapping[str, Any]: The JSON response from the token refresh endpoint.
292
293        Raises:
294            DefaultBackoffException: If the response status code is 429 (Too Many Requests)
295                                     or any 5xx server error.
296            AirbyteTracedException: If the refresh token is invalid or expired, prompting
297                                    re-authentication.
298            Exception: For any other exceptions that occur during the request.
299        """
300        try:
301            response = requests.request(
302                method="POST",
303                url=self.get_token_refresh_endpoint(),  # type: ignore # returns None, if not provided, but str | bytes is expected.
304                data=self.build_refresh_request_body(),
305                headers=self.build_refresh_request_headers(),
306                params=self.build_refresh_request_query_params(),
307            )
308
309            if not response.ok:
310                # log the response even if the request failed for troubleshooting purposes
311                self._log_response(response)
312                response.raise_for_status()
313
314            response_json = response.json()
315
316            try:
317                # extract the access token and add to secrets to avoid logging the raw value
318                access_key = self._extract_access_token(response_json)
319                if access_key:
320                    add_to_secrets(access_key)
321            except ResponseKeysMaxRecurtionReached as e:
322                # could not find the access token in the response, so do nothing
323                pass
324
325            self._log_response(response)
326
327            return response_json
328        except requests.exceptions.RequestException as e:
329            if e.response is not None:
330                if e.response.status_code == 429 or e.response.status_code >= 500:
331                    raise DefaultBackoffException(
332                        request=e.response.request,
333                        response=e.response,
334                        failure_type=FailureType.transient_error,
335                    )
336            if self._wrap_refresh_token_exception(e):
337                message = "Refresh token is invalid or expired. Please re-authenticate from Sources/<your source>/Settings."
338                raise AirbyteTracedException(
339                    internal_message=message, message=message, failure_type=FailureType.config_error
340                )
341            raise
342        except Exception as e:
343            raise AirbyteTracedException(
344                message="OAuth access token refresh request failed.",
345                internal_message=f"Unexpected error during OAuth token refresh: {e}",
346                failure_type=FailureType.system_error,
347            ) from e
348
349    def _ensure_access_token_in_response(self, response_data: Mapping[str, Any]) -> None:
350        """
351        Ensures that the access token is present in the response data.
352
353        This method attempts to extract the access token from the provided response data.
354        If the access token is not found, it raises an exception indicating that the token
355        refresh API response was missing the access token.
356
357        Args:
358            response_data (Mapping[str, Any]): The response data from which to extract the access token.
359
360        Raises:
361            Exception: If the access token is not found in the response data.
362            ResponseKeysMaxRecurtionReached: If the maximum recursion depth is reached while extracting the access token.
363        """
364        try:
365            access_key = self._extract_access_token(response_data)
366            if not access_key:
367                raise Exception(
368                    f"Token refresh API response was missing access token {self.get_access_token_name()}"
369                )
370        except ResponseKeysMaxRecurtionReached as e:
371            raise e
372
373    def _parse_token_expiration_date(self, value: Union[str, int]) -> AirbyteDateTime:
374        """
375        Parse a string or integer token expiration date into a datetime object
376
377        :return: expiration datetime
378        """
379        if self.token_expiry_is_time_of_expiration:
380            if not self.token_expiry_date_format:
381                raise ValueError(
382                    f"Invalid token expiry date format {self.token_expiry_date_format}; a string representing the format is required."
383                )
384            try:
385                return ab_datetime_parse(str(value))
386            except ValueError as e:
387                raise ValueError(f"Invalid token expiry date format: {e}")
388        else:
389            try:
390                # Only accept numeric values (as int/float/string) when no format specified
391                seconds = int(float(str(value)))
392                return ab_datetime_now() + timedelta(seconds=seconds)
393            except (ValueError, TypeError):
394                raise ValueError(
395                    f"Invalid expires_in value: {value}. Expected number of seconds when no format specified."
396                )
397
398    def _extract_access_token(self, response_data: Mapping[str, Any]) -> Any:
399        """
400        Extracts the access token from the given response data.
401
402        Args:
403            response_data (Mapping[str, Any]): The response data from which to extract the access token.
404
405        Returns:
406            str: The extracted access token.
407        """
408        return self._find_and_get_value_from_response(response_data, self.get_access_token_name())
409
410    def _extract_refresh_token(self, response_data: Mapping[str, Any]) -> Any:
411        """
412        Extracts the refresh token from the given response data.
413
414        Args:
415            response_data (Mapping[str, Any]): The response data from which to extract the refresh token.
416
417        Returns:
418            str: The extracted refresh token.
419        """
420        return self._find_and_get_value_from_response(response_data, self.get_refresh_token_name())
421
422    def _extract_token_expiry_date(self, response_data: Mapping[str, Any]) -> AirbyteDateTime:
423        """
424        Extracts the token_expiry_date, like `expires_in` or `expires_at`, etc from the given response data.
425
426        If the token_expiry_date is not found, it will return an existing token expiry date if set, or a default token expiry date.
427
428        Args:
429            response_data (Mapping[str, Any]): The response data from which to extract the token_expiry_date.
430
431        Returns:
432            The extracted token_expiry_date or None if not found.
433        """
434        expires_in = self._find_and_get_value_from_response(
435            response_data, self.get_expires_in_name()
436        )
437        if expires_in is not None:
438            return self._parse_token_expiration_date(expires_in)
439
440        # expires_in is None
441        existing_expiry_date = self.get_token_expiry_date()
442        if existing_expiry_date and not self.token_has_expired():
443            return existing_expiry_date
444
445        return self._default_token_expiry_date()
446
447    def _find_and_get_value_from_response(
448        self,
449        response_data: Mapping[str, Any],
450        key_name: str,
451        max_depth: int = 5,
452        current_depth: int = 0,
453    ) -> Any:
454        """
455        Recursively searches for a specified key in a nested dictionary or list and returns its value if found.
456
457        Args:
458            response_data (Mapping[str, Any]): The response data to search through, which can be a dictionary or a list.
459            key_name (str): The key to search for in the response data.
460            max_depth (int, optional): The maximum depth to search for the key to avoid infinite recursion. Defaults to 5.
461            current_depth (int, optional): The current depth of the recursion. Defaults to 0.
462
463        Returns:
464            Any: The value associated with the specified key if found, otherwise None.
465
466        Raises:
467            AirbyteTracedException: If the maximum recursion depth is reached without finding the key.
468        """
469        if current_depth > max_depth:
470            # this is needed to avoid an inf loop, possible with a very deep nesting observed.
471            message = f"The maximum level of recursion is reached. Couldn't find the specified `{key_name}` in the response."
472            raise ResponseKeysMaxRecurtionReached(
473                internal_message=message, message=message, failure_type=FailureType.config_error
474            )
475
476        if isinstance(response_data, dict):
477            # get from the root level
478            if key_name in response_data:
479                return response_data[key_name]
480
481            # get from the nested object
482            for _, value in response_data.items():
483                result = self._find_and_get_value_from_response(
484                    value, key_name, max_depth, current_depth + 1
485                )
486                if result is not None:
487                    return result
488
489        # get from the nested array object
490        elif isinstance(response_data, list):
491            for item in response_data:
492                result = self._find_and_get_value_from_response(
493                    item, key_name, max_depth, current_depth + 1
494                )
495                if result is not None:
496                    return result
497
498        return None
499
500    @property
501    def _message_repository(self) -> Optional[MessageRepository]:
502        """
503        The implementation can define a message_repository if it wants debugging logs for HTTP requests
504        """
505        return _NOOP_MESSAGE_REPOSITORY
506
507    def _log_response(self, response: requests.Response) -> None:
508        """
509        Logs the HTTP response using the message repository if it is available.
510
511        Args:
512            response (requests.Response): The HTTP response to log.
513        """
514        if self._message_repository:
515            self._message_repository.log_message(
516                Level.DEBUG,
517                lambda: format_http_message(
518                    response,
519                    "Refresh token",
520                    "Obtains access token",
521                    self._NO_STREAM_NAME,
522                    is_auxiliary=True,
523                    type="AUTH",
524                ),
525            )
526
527    # ----------------
528    # ABSTR METHODS
529    # ----------------
530
531    @abstractmethod
532    def get_token_refresh_endpoint(self) -> Optional[str]:
533        """Returns the endpoint to refresh the access token"""
534
535    @abstractmethod
536    def get_client_id_name(self) -> str:
537        """The client id name to authenticate"""
538
539    @abstractmethod
540    def get_client_id(self) -> str:
541        """The client id to authenticate"""
542
543    @abstractmethod
544    def get_client_secret_name(self) -> str:
545        """The client secret name to authenticate"""
546
547    @abstractmethod
548    def get_client_secret(self) -> str:
549        """The client secret to authenticate"""
550
551    @abstractmethod
552    def get_refresh_token_name(self) -> str:
553        """The refresh token name to authenticate"""
554
555    @abstractmethod
556    def get_refresh_token(self) -> Optional[str]:
557        """The token used to refresh the access token when it expires"""
558
559    @abstractmethod
560    def get_scopes(self) -> List[str]:
561        """List of requested scopes"""
562
563    @abstractmethod
564    def get_token_expiry_date(self) -> AirbyteDateTime:
565        """Expiration date of the access token"""
566
567    @abstractmethod
568    def set_token_expiry_date(self, value: AirbyteDateTime) -> None:
569        """Setter for access token expiration date"""
570
571    @abstractmethod
572    def get_access_token_name(self) -> str:
573        """Field to extract access token from in the response"""
574
575    @abstractmethod
576    def get_expires_in_name(self) -> str:
577        """Returns the expires_in field name"""
578
579    @abstractmethod
580    def get_refresh_request_body(self) -> Mapping[str, Any]:
581        """Returns the request body to set on the refresh request"""
582
583    @abstractmethod
584    def get_refresh_request_headers(self) -> Mapping[str, Any]:
585        """Returns the request headers to set on the refresh request"""
586
587    def should_send_refresh_request_as_query_params(self) -> bool:
588        """Returns `True` if the standard refresh args should be sent on the URL
589        query string instead of in the request body.
590
591        Defaults to `False` so existing authenticators retain their previous
592        behavior (params in body, no query params on the refresh URL). Subclasses
593        can override this to opt into the URL-query-string shape required by OAuth
594        providers like Gong.
595        """
596        return False
597
598    @abstractmethod
599    def get_grant_type(self) -> str:
600        """Returns grant_type specified for requesting access_token"""
601
602    @abstractmethod
603    def get_grant_type_name(self) -> str:
604        """Returns grant_type specified name for requesting access_token"""
605
606    @property
607    @abstractmethod
608    def access_token(self) -> str:
609        """Returns the access token"""
610
611    @access_token.setter
612    @abstractmethod
613    def access_token(self, value: str) -> str:
614        """Setter for the access token"""
logger = <Logger airbyte (INFO)>
class ResponseKeysMaxRecurtionReached(airbyte_cdk.utils.traced_exception.AirbyteTracedException):
30class ResponseKeysMaxRecurtionReached(AirbyteTracedException):
31    """
32    Raised when the max level of recursion is reached, when trying to
33    find-and-get the target key, during the `_make_handled_request`
34    """

Raised when the max level of recursion is reached, when trying to find-and-get the target key, during the _make_handled_request

class AbstractOauth2Authenticator(requests.auth.AuthBase):
 37class AbstractOauth2Authenticator(AuthBase):
 38    """
 39    Abstract class for an OAuth authenticators that implements the OAuth token refresh flow. The authenticator
 40    is designed to generically perform the refresh flow without regard to how config fields are get/set by
 41    delegating that behavior to the classes implementing the interface.
 42    """
 43
 44    _NO_STREAM_NAME = None
 45
 46    # Class-level lock to prevent concurrent token refresh across multiple authenticator instances.
 47    # This is necessary because multiple streams may share the same OAuth credentials (refresh token)
 48    # through the connector config. Without this lock, concurrent refresh attempts can cause race
 49    # conditions where one stream successfully refreshes the token while others fail because the
 50    # refresh token has been invalidated (especially for single-use refresh tokens).
 51    _token_refresh_lock: threading.Lock = threading.Lock()
 52
 53    def __init__(
 54        self,
 55        refresh_token_error_status_codes: Tuple[int, ...] = (),
 56        refresh_token_error_key: str = "",
 57        refresh_token_error_values: Tuple[str, ...] = (),
 58    ) -> None:
 59        """
 60        If all of refresh_token_error_status_codes, refresh_token_error_key, and refresh_token_error_values are set,
 61        then http errors with such params will be wrapped in AirbyteTracedException.
 62        """
 63        self._refresh_token_error_status_codes = refresh_token_error_status_codes
 64        self._refresh_token_error_key = refresh_token_error_key
 65        self._refresh_token_error_values = refresh_token_error_values
 66
 67    def __call__(self, request: requests.PreparedRequest) -> requests.PreparedRequest:
 68        """Attach the HTTP headers required to authenticate on the HTTP request"""
 69        request.headers.update(self.get_auth_header())
 70        return request
 71
 72    @property
 73    def _is_access_token_flow(self) -> bool:
 74        return self.get_token_refresh_endpoint() is None and self.access_token is not None
 75
 76    @property
 77    def token_expiry_is_time_of_expiration(self) -> bool:
 78        """
 79        Indicates that the Token Expiry returns the date until which the token will be valid, not the amount of time it will be valid.
 80        """
 81
 82        return False
 83
 84    @property
 85    def token_expiry_date_format(self) -> Optional[str]:
 86        """
 87        Format of the datetime; exists it if expires_in is returned as the expiration datetime instead of seconds until it expires
 88        """
 89
 90        return None
 91
 92    def get_auth_header(self) -> Mapping[str, Any]:
 93        """HTTP header to set on the requests"""
 94        token = self.access_token if self._is_access_token_flow else self.get_access_token()
 95        return {"Authorization": f"Bearer {token}"}
 96
 97    def get_access_token(self) -> str:
 98        """
 99        Returns the access token.
100
101        This method uses double-checked locking to ensure thread-safe token refresh.
102        When multiple threads (streams) detect an expired token simultaneously, only one
103        will perform the refresh while others wait. After acquiring the lock, the token
104        expiry is re-checked to avoid redundant refresh attempts.
105        """
106        if self.token_has_expired():
107            with self._token_refresh_lock:
108                # Double-check after acquiring lock - another thread may have already refreshed
109                if self.token_has_expired():
110                    self.refresh_and_set_access_token()
111
112        return self.access_token
113
114    def refresh_and_set_access_token(self) -> None:
115        """Force refresh the access token and update internal state.
116
117        This method refreshes the access token regardless of whether it has expired,
118        and updates the internal token and expiry date. Subclasses may override this
119        to handle additional state updates (e.g., persisting new refresh tokens).
120        """
121        token, expires_in = self.refresh_access_token()
122        self.access_token = token
123        self.set_token_expiry_date(expires_in)
124
125    def token_has_expired(self) -> bool:
126        """Returns True if the token is expired"""
127        return ab_datetime_now() > self.get_token_expiry_date()
128
129    def _build_standard_refresh_args(self) -> MutableMapping[str, Any]:
130        """Build the standard OAuth refresh args (grant_type, refresh_token, client
131        credentials, scopes, plus any user-configured `refresh_request_body` extras).
132
133        Used by both `build_refresh_request_body()` and
134        `build_refresh_request_query_params()` so the same set of args can be emitted
135        in either the body or the URL query string depending on
136        `should_send_refresh_request_as_query_params()`.
137
138        Client credentials (client_id and client_secret) are excluded when
139        `refresh_request_headers` contains an `Authorization` header (e.g. Basic
140        auth). This is required by OAuth providers like Gong that expect credentials
141        ONLY in the Authorization header and reject requests that include them in
142        both places.
143        """
144        headers = self.get_refresh_request_headers()
145        credentials_in_header = headers and "Authorization" in headers
146        include_client_credentials = not credentials_in_header
147
148        payload: MutableMapping[str, Any] = {
149            self.get_grant_type_name(): self.get_grant_type(),
150        }
151
152        if include_client_credentials:
153            payload[self.get_client_id_name()] = self.get_client_id()
154            payload[self.get_client_secret_name()] = self.get_client_secret()
155
156        payload[self.get_refresh_token_name()] = self.get_refresh_token()
157
158        if self.get_scopes():
159            payload["scopes"] = self.get_scopes()
160
161        if self.get_refresh_request_body():
162            for key, val in self.get_refresh_request_body().items():
163                # Existing oauth args take precedence over custom configured fields.
164                if key not in payload:
165                    payload[key] = val
166
167        return payload
168
169    def build_refresh_request_body(self) -> Mapping[str, Any]:
170        """Returns the request body to set on the refresh request.
171
172        When `should_send_refresh_request_as_query_params()` is `True`, the standard
173        refresh args are emitted on the URL query string instead and this method
174        returns an empty body. This supports OAuth providers like Gong that document
175        their refresh endpoint as a `POST` with parameters on the URL query string
176        and an empty body.
177        """
178        if self.should_send_refresh_request_as_query_params():
179            return {}
180        return self._build_standard_refresh_args()
181
182    def build_refresh_request_headers(self) -> Mapping[str, Any] | None:
183        """
184        Returns the request headers to set on the refresh request
185
186        """
187        headers = self.get_refresh_request_headers()
188        return headers if headers else None
189
190    def build_refresh_request_query_params(self) -> Mapping[str, Any] | None:
191        """Returns the URL query string parameters to set on the refresh request.
192
193        When `should_send_refresh_request_as_query_params()` is `True`, the standard
194        refresh args (grant_type, refresh_token, client credentials, scopes, plus
195        any user-configured `refresh_request_body` extras) are returned here and
196        `build_refresh_request_body()` returns an empty body.
197
198        Returns `None` otherwise so existing authenticators retain their previous
199        behavior (no query params on the refresh URL).
200        """
201        if not self.should_send_refresh_request_as_query_params():
202            return None
203        return self._build_standard_refresh_args()
204
205    def refresh_access_token(self) -> Tuple[str, AirbyteDateTime]:
206        """
207        Returns the refresh token and its expiration datetime
208
209        :return: a tuple of (access_token, token_lifespan)
210        """
211        try:
212            response_json = self._make_handled_request()
213        except (
214            requests.exceptions.ConnectionError,
215            requests.exceptions.ConnectTimeout,
216            requests.exceptions.ReadTimeout,
217        ) as e:
218            raise AirbyteTracedException(
219                message="OAuth access token refresh request failed due to a network error.",
220                internal_message=f"Network error during OAuth token refresh after retries were exhausted: {e}",
221                failure_type=FailureType.transient_error,
222            ) from e
223        self._ensure_access_token_in_response(response_json)
224
225        return (
226            self._extract_access_token(response_json),
227            self._extract_token_expiry_date(response_json),
228        )
229
230    # ----------------
231    # PRIVATE METHODS
232    # ----------------
233
234    def _default_token_expiry_date(self) -> AirbyteDateTime:
235        """
236        Returns the default token expiry date
237        """
238        # 1 hour was chosen as a middle ground to avoid unnecessary frequent refreshes and token expiration
239        default_token_expiry_duration_hours = 1  # 1 hour
240        return ab_datetime_now() + timedelta(hours=default_token_expiry_duration_hours)
241
242    def _wrap_refresh_token_exception(
243        self, exception: requests.exceptions.RequestException
244    ) -> bool:
245        """
246        Wraps and handles exceptions that occur during the refresh token process.
247
248        This method checks if the provided exception is related to a refresh token error
249        by examining the response status code and specific error content.
250
251        Args:
252            exception (requests.exceptions.RequestException): The exception raised during the request.
253
254        Returns:
255            bool: True if the exception is related to a refresh token error, False otherwise.
256        """
257        try:
258            if exception.response is not None:
259                exception_content = exception.response.json()
260            else:
261                return False
262        except JSONDecodeError:
263            return False
264        return (
265            exception.response.status_code in self._refresh_token_error_status_codes
266            and exception_content.get(self._refresh_token_error_key)
267            in self._refresh_token_error_values
268        )
269
270    @backoff.on_exception(
271        backoff.expo,
272        (
273            DefaultBackoffException,
274            requests.exceptions.ConnectionError,
275            requests.exceptions.ConnectTimeout,
276            requests.exceptions.ReadTimeout,
277        ),
278        on_backoff=lambda details: logger.info(
279            f"Caught retryable error after {details['tries']} tries. Waiting {details['wait']} seconds then retrying..."
280        ),
281        max_time=300,
282    )
283    def _make_handled_request(self) -> Any:
284        """
285        Makes a handled HTTP request to refresh an OAuth token.
286
287        This method sends a POST request to the token refresh endpoint with the necessary
288        headers and body to obtain a new access token. It handles various exceptions that
289        may occur during the request and logs the response for troubleshooting purposes.
290
291        Returns:
292            Mapping[str, Any]: The JSON response from the token refresh endpoint.
293
294        Raises:
295            DefaultBackoffException: If the response status code is 429 (Too Many Requests)
296                                     or any 5xx server error.
297            AirbyteTracedException: If the refresh token is invalid or expired, prompting
298                                    re-authentication.
299            Exception: For any other exceptions that occur during the request.
300        """
301        try:
302            response = requests.request(
303                method="POST",
304                url=self.get_token_refresh_endpoint(),  # type: ignore # returns None, if not provided, but str | bytes is expected.
305                data=self.build_refresh_request_body(),
306                headers=self.build_refresh_request_headers(),
307                params=self.build_refresh_request_query_params(),
308            )
309
310            if not response.ok:
311                # log the response even if the request failed for troubleshooting purposes
312                self._log_response(response)
313                response.raise_for_status()
314
315            response_json = response.json()
316
317            try:
318                # extract the access token and add to secrets to avoid logging the raw value
319                access_key = self._extract_access_token(response_json)
320                if access_key:
321                    add_to_secrets(access_key)
322            except ResponseKeysMaxRecurtionReached as e:
323                # could not find the access token in the response, so do nothing
324                pass
325
326            self._log_response(response)
327
328            return response_json
329        except requests.exceptions.RequestException as e:
330            if e.response is not None:
331                if e.response.status_code == 429 or e.response.status_code >= 500:
332                    raise DefaultBackoffException(
333                        request=e.response.request,
334                        response=e.response,
335                        failure_type=FailureType.transient_error,
336                    )
337            if self._wrap_refresh_token_exception(e):
338                message = "Refresh token is invalid or expired. Please re-authenticate from Sources/<your source>/Settings."
339                raise AirbyteTracedException(
340                    internal_message=message, message=message, failure_type=FailureType.config_error
341                )
342            raise
343        except Exception as e:
344            raise AirbyteTracedException(
345                message="OAuth access token refresh request failed.",
346                internal_message=f"Unexpected error during OAuth token refresh: {e}",
347                failure_type=FailureType.system_error,
348            ) from e
349
350    def _ensure_access_token_in_response(self, response_data: Mapping[str, Any]) -> None:
351        """
352        Ensures that the access token is present in the response data.
353
354        This method attempts to extract the access token from the provided response data.
355        If the access token is not found, it raises an exception indicating that the token
356        refresh API response was missing the access token.
357
358        Args:
359            response_data (Mapping[str, Any]): The response data from which to extract the access token.
360
361        Raises:
362            Exception: If the access token is not found in the response data.
363            ResponseKeysMaxRecurtionReached: If the maximum recursion depth is reached while extracting the access token.
364        """
365        try:
366            access_key = self._extract_access_token(response_data)
367            if not access_key:
368                raise Exception(
369                    f"Token refresh API response was missing access token {self.get_access_token_name()}"
370                )
371        except ResponseKeysMaxRecurtionReached as e:
372            raise e
373
374    def _parse_token_expiration_date(self, value: Union[str, int]) -> AirbyteDateTime:
375        """
376        Parse a string or integer token expiration date into a datetime object
377
378        :return: expiration datetime
379        """
380        if self.token_expiry_is_time_of_expiration:
381            if not self.token_expiry_date_format:
382                raise ValueError(
383                    f"Invalid token expiry date format {self.token_expiry_date_format}; a string representing the format is required."
384                )
385            try:
386                return ab_datetime_parse(str(value))
387            except ValueError as e:
388                raise ValueError(f"Invalid token expiry date format: {e}")
389        else:
390            try:
391                # Only accept numeric values (as int/float/string) when no format specified
392                seconds = int(float(str(value)))
393                return ab_datetime_now() + timedelta(seconds=seconds)
394            except (ValueError, TypeError):
395                raise ValueError(
396                    f"Invalid expires_in value: {value}. Expected number of seconds when no format specified."
397                )
398
399    def _extract_access_token(self, response_data: Mapping[str, Any]) -> Any:
400        """
401        Extracts the access token from the given response data.
402
403        Args:
404            response_data (Mapping[str, Any]): The response data from which to extract the access token.
405
406        Returns:
407            str: The extracted access token.
408        """
409        return self._find_and_get_value_from_response(response_data, self.get_access_token_name())
410
411    def _extract_refresh_token(self, response_data: Mapping[str, Any]) -> Any:
412        """
413        Extracts the refresh token from the given response data.
414
415        Args:
416            response_data (Mapping[str, Any]): The response data from which to extract the refresh token.
417
418        Returns:
419            str: The extracted refresh token.
420        """
421        return self._find_and_get_value_from_response(response_data, self.get_refresh_token_name())
422
423    def _extract_token_expiry_date(self, response_data: Mapping[str, Any]) -> AirbyteDateTime:
424        """
425        Extracts the token_expiry_date, like `expires_in` or `expires_at`, etc from the given response data.
426
427        If the token_expiry_date is not found, it will return an existing token expiry date if set, or a default token expiry date.
428
429        Args:
430            response_data (Mapping[str, Any]): The response data from which to extract the token_expiry_date.
431
432        Returns:
433            The extracted token_expiry_date or None if not found.
434        """
435        expires_in = self._find_and_get_value_from_response(
436            response_data, self.get_expires_in_name()
437        )
438        if expires_in is not None:
439            return self._parse_token_expiration_date(expires_in)
440
441        # expires_in is None
442        existing_expiry_date = self.get_token_expiry_date()
443        if existing_expiry_date and not self.token_has_expired():
444            return existing_expiry_date
445
446        return self._default_token_expiry_date()
447
448    def _find_and_get_value_from_response(
449        self,
450        response_data: Mapping[str, Any],
451        key_name: str,
452        max_depth: int = 5,
453        current_depth: int = 0,
454    ) -> Any:
455        """
456        Recursively searches for a specified key in a nested dictionary or list and returns its value if found.
457
458        Args:
459            response_data (Mapping[str, Any]): The response data to search through, which can be a dictionary or a list.
460            key_name (str): The key to search for in the response data.
461            max_depth (int, optional): The maximum depth to search for the key to avoid infinite recursion. Defaults to 5.
462            current_depth (int, optional): The current depth of the recursion. Defaults to 0.
463
464        Returns:
465            Any: The value associated with the specified key if found, otherwise None.
466
467        Raises:
468            AirbyteTracedException: If the maximum recursion depth is reached without finding the key.
469        """
470        if current_depth > max_depth:
471            # this is needed to avoid an inf loop, possible with a very deep nesting observed.
472            message = f"The maximum level of recursion is reached. Couldn't find the specified `{key_name}` in the response."
473            raise ResponseKeysMaxRecurtionReached(
474                internal_message=message, message=message, failure_type=FailureType.config_error
475            )
476
477        if isinstance(response_data, dict):
478            # get from the root level
479            if key_name in response_data:
480                return response_data[key_name]
481
482            # get from the nested object
483            for _, value in response_data.items():
484                result = self._find_and_get_value_from_response(
485                    value, key_name, max_depth, current_depth + 1
486                )
487                if result is not None:
488                    return result
489
490        # get from the nested array object
491        elif isinstance(response_data, list):
492            for item in response_data:
493                result = self._find_and_get_value_from_response(
494                    item, key_name, max_depth, current_depth + 1
495                )
496                if result is not None:
497                    return result
498
499        return None
500
501    @property
502    def _message_repository(self) -> Optional[MessageRepository]:
503        """
504        The implementation can define a message_repository if it wants debugging logs for HTTP requests
505        """
506        return _NOOP_MESSAGE_REPOSITORY
507
508    def _log_response(self, response: requests.Response) -> None:
509        """
510        Logs the HTTP response using the message repository if it is available.
511
512        Args:
513            response (requests.Response): The HTTP response to log.
514        """
515        if self._message_repository:
516            self._message_repository.log_message(
517                Level.DEBUG,
518                lambda: format_http_message(
519                    response,
520                    "Refresh token",
521                    "Obtains access token",
522                    self._NO_STREAM_NAME,
523                    is_auxiliary=True,
524                    type="AUTH",
525                ),
526            )
527
528    # ----------------
529    # ABSTR METHODS
530    # ----------------
531
532    @abstractmethod
533    def get_token_refresh_endpoint(self) -> Optional[str]:
534        """Returns the endpoint to refresh the access token"""
535
536    @abstractmethod
537    def get_client_id_name(self) -> str:
538        """The client id name to authenticate"""
539
540    @abstractmethod
541    def get_client_id(self) -> str:
542        """The client id to authenticate"""
543
544    @abstractmethod
545    def get_client_secret_name(self) -> str:
546        """The client secret name to authenticate"""
547
548    @abstractmethod
549    def get_client_secret(self) -> str:
550        """The client secret to authenticate"""
551
552    @abstractmethod
553    def get_refresh_token_name(self) -> str:
554        """The refresh token name to authenticate"""
555
556    @abstractmethod
557    def get_refresh_token(self) -> Optional[str]:
558        """The token used to refresh the access token when it expires"""
559
560    @abstractmethod
561    def get_scopes(self) -> List[str]:
562        """List of requested scopes"""
563
564    @abstractmethod
565    def get_token_expiry_date(self) -> AirbyteDateTime:
566        """Expiration date of the access token"""
567
568    @abstractmethod
569    def set_token_expiry_date(self, value: AirbyteDateTime) -> None:
570        """Setter for access token expiration date"""
571
572    @abstractmethod
573    def get_access_token_name(self) -> str:
574        """Field to extract access token from in the response"""
575
576    @abstractmethod
577    def get_expires_in_name(self) -> str:
578        """Returns the expires_in field name"""
579
580    @abstractmethod
581    def get_refresh_request_body(self) -> Mapping[str, Any]:
582        """Returns the request body to set on the refresh request"""
583
584    @abstractmethod
585    def get_refresh_request_headers(self) -> Mapping[str, Any]:
586        """Returns the request headers to set on the refresh request"""
587
588    def should_send_refresh_request_as_query_params(self) -> bool:
589        """Returns `True` if the standard refresh args should be sent on the URL
590        query string instead of in the request body.
591
592        Defaults to `False` so existing authenticators retain their previous
593        behavior (params in body, no query params on the refresh URL). Subclasses
594        can override this to opt into the URL-query-string shape required by OAuth
595        providers like Gong.
596        """
597        return False
598
599    @abstractmethod
600    def get_grant_type(self) -> str:
601        """Returns grant_type specified for requesting access_token"""
602
603    @abstractmethod
604    def get_grant_type_name(self) -> str:
605        """Returns grant_type specified name for requesting access_token"""
606
607    @property
608    @abstractmethod
609    def access_token(self) -> str:
610        """Returns the access token"""
611
612    @access_token.setter
613    @abstractmethod
614    def access_token(self, value: str) -> str:
615        """Setter for the access token"""

Abstract class for an OAuth authenticators that implements the OAuth token refresh flow. The authenticator is designed to generically perform the refresh flow without regard to how config fields are get/set by delegating that behavior to the classes implementing the interface.

AbstractOauth2Authenticator( refresh_token_error_status_codes: Tuple[int, ...] = (), refresh_token_error_key: str = '', refresh_token_error_values: Tuple[str, ...] = ())
53    def __init__(
54        self,
55        refresh_token_error_status_codes: Tuple[int, ...] = (),
56        refresh_token_error_key: str = "",
57        refresh_token_error_values: Tuple[str, ...] = (),
58    ) -> None:
59        """
60        If all of refresh_token_error_status_codes, refresh_token_error_key, and refresh_token_error_values are set,
61        then http errors with such params will be wrapped in AirbyteTracedException.
62        """
63        self._refresh_token_error_status_codes = refresh_token_error_status_codes
64        self._refresh_token_error_key = refresh_token_error_key
65        self._refresh_token_error_values = refresh_token_error_values

If all of refresh_token_error_status_codes, refresh_token_error_key, and refresh_token_error_values are set, then http errors with such params will be wrapped in AirbyteTracedException.

token_expiry_is_time_of_expiration: bool
76    @property
77    def token_expiry_is_time_of_expiration(self) -> bool:
78        """
79        Indicates that the Token Expiry returns the date until which the token will be valid, not the amount of time it will be valid.
80        """
81
82        return False

Indicates that the Token Expiry returns the date until which the token will be valid, not the amount of time it will be valid.

token_expiry_date_format: Optional[str]
84    @property
85    def token_expiry_date_format(self) -> Optional[str]:
86        """
87        Format of the datetime; exists it if expires_in is returned as the expiration datetime instead of seconds until it expires
88        """
89
90        return None

Format of the datetime; exists it if expires_in is returned as the expiration datetime instead of seconds until it expires

def get_auth_header(self) -> Mapping[str, Any]:
92    def get_auth_header(self) -> Mapping[str, Any]:
93        """HTTP header to set on the requests"""
94        token = self.access_token if self._is_access_token_flow else self.get_access_token()
95        return {"Authorization": f"Bearer {token}"}

HTTP header to set on the requests

def get_access_token(self) -> str:
 97    def get_access_token(self) -> str:
 98        """
 99        Returns the access token.
100
101        This method uses double-checked locking to ensure thread-safe token refresh.
102        When multiple threads (streams) detect an expired token simultaneously, only one
103        will perform the refresh while others wait. After acquiring the lock, the token
104        expiry is re-checked to avoid redundant refresh attempts.
105        """
106        if self.token_has_expired():
107            with self._token_refresh_lock:
108                # Double-check after acquiring lock - another thread may have already refreshed
109                if self.token_has_expired():
110                    self.refresh_and_set_access_token()
111
112        return self.access_token

Returns the access token.

This method uses double-checked locking to ensure thread-safe token refresh. When multiple threads (streams) detect an expired token simultaneously, only one will perform the refresh while others wait. After acquiring the lock, the token expiry is re-checked to avoid redundant refresh attempts.

def refresh_and_set_access_token(self) -> None:
114    def refresh_and_set_access_token(self) -> None:
115        """Force refresh the access token and update internal state.
116
117        This method refreshes the access token regardless of whether it has expired,
118        and updates the internal token and expiry date. Subclasses may override this
119        to handle additional state updates (e.g., persisting new refresh tokens).
120        """
121        token, expires_in = self.refresh_access_token()
122        self.access_token = token
123        self.set_token_expiry_date(expires_in)

Force refresh the access token and update internal state.

This method refreshes the access token regardless of whether it has expired, and updates the internal token and expiry date. Subclasses may override this to handle additional state updates (e.g., persisting new refresh tokens).

def token_has_expired(self) -> bool:
125    def token_has_expired(self) -> bool:
126        """Returns True if the token is expired"""
127        return ab_datetime_now() > self.get_token_expiry_date()

Returns True if the token is expired

def build_refresh_request_body(self) -> Mapping[str, Any]:
169    def build_refresh_request_body(self) -> Mapping[str, Any]:
170        """Returns the request body to set on the refresh request.
171
172        When `should_send_refresh_request_as_query_params()` is `True`, the standard
173        refresh args are emitted on the URL query string instead and this method
174        returns an empty body. This supports OAuth providers like Gong that document
175        their refresh endpoint as a `POST` with parameters on the URL query string
176        and an empty body.
177        """
178        if self.should_send_refresh_request_as_query_params():
179            return {}
180        return self._build_standard_refresh_args()

Returns the request body to set on the refresh request.

When should_send_refresh_request_as_query_params() is True, the standard refresh args are emitted on the URL query string instead and this method returns an empty body. This supports OAuth providers like Gong that document their refresh endpoint as a POST with parameters on the URL query string and an empty body.

def build_refresh_request_headers(self) -> Optional[Mapping[str, Any]]:
182    def build_refresh_request_headers(self) -> Mapping[str, Any] | None:
183        """
184        Returns the request headers to set on the refresh request
185
186        """
187        headers = self.get_refresh_request_headers()
188        return headers if headers else None

Returns the request headers to set on the refresh request

def build_refresh_request_query_params(self) -> Optional[Mapping[str, Any]]:
190    def build_refresh_request_query_params(self) -> Mapping[str, Any] | None:
191        """Returns the URL query string parameters to set on the refresh request.
192
193        When `should_send_refresh_request_as_query_params()` is `True`, the standard
194        refresh args (grant_type, refresh_token, client credentials, scopes, plus
195        any user-configured `refresh_request_body` extras) are returned here and
196        `build_refresh_request_body()` returns an empty body.
197
198        Returns `None` otherwise so existing authenticators retain their previous
199        behavior (no query params on the refresh URL).
200        """
201        if not self.should_send_refresh_request_as_query_params():
202            return None
203        return self._build_standard_refresh_args()

Returns the URL query string parameters to set on the refresh request.

When should_send_refresh_request_as_query_params() is True, the standard refresh args (grant_type, refresh_token, client credentials, scopes, plus any user-configured refresh_request_body extras) are returned here and build_refresh_request_body() returns an empty body.

Returns None otherwise so existing authenticators retain their previous behavior (no query params on the refresh URL).

def refresh_access_token(self) -> Tuple[str, airbyte_cdk.utils.datetime_helpers.AirbyteDateTime]:
205    def refresh_access_token(self) -> Tuple[str, AirbyteDateTime]:
206        """
207        Returns the refresh token and its expiration datetime
208
209        :return: a tuple of (access_token, token_lifespan)
210        """
211        try:
212            response_json = self._make_handled_request()
213        except (
214            requests.exceptions.ConnectionError,
215            requests.exceptions.ConnectTimeout,
216            requests.exceptions.ReadTimeout,
217        ) as e:
218            raise AirbyteTracedException(
219                message="OAuth access token refresh request failed due to a network error.",
220                internal_message=f"Network error during OAuth token refresh after retries were exhausted: {e}",
221                failure_type=FailureType.transient_error,
222            ) from e
223        self._ensure_access_token_in_response(response_json)
224
225        return (
226            self._extract_access_token(response_json),
227            self._extract_token_expiry_date(response_json),
228        )

Returns the refresh token and its expiration datetime

Returns

a tuple of (access_token, token_lifespan)

@abstractmethod
def get_token_refresh_endpoint(self) -> Optional[str]:
532    @abstractmethod
533    def get_token_refresh_endpoint(self) -> Optional[str]:
534        """Returns the endpoint to refresh the access token"""

Returns the endpoint to refresh the access token

@abstractmethod
def get_client_id_name(self) -> str:
536    @abstractmethod
537    def get_client_id_name(self) -> str:
538        """The client id name to authenticate"""

The client id name to authenticate

@abstractmethod
def get_client_id(self) -> str:
540    @abstractmethod
541    def get_client_id(self) -> str:
542        """The client id to authenticate"""

The client id to authenticate

@abstractmethod
def get_client_secret_name(self) -> str:
544    @abstractmethod
545    def get_client_secret_name(self) -> str:
546        """The client secret name to authenticate"""

The client secret name to authenticate

@abstractmethod
def get_client_secret(self) -> str:
548    @abstractmethod
549    def get_client_secret(self) -> str:
550        """The client secret to authenticate"""

The client secret to authenticate

@abstractmethod
def get_refresh_token_name(self) -> str:
552    @abstractmethod
553    def get_refresh_token_name(self) -> str:
554        """The refresh token name to authenticate"""

The refresh token name to authenticate

@abstractmethod
def get_refresh_token(self) -> Optional[str]:
556    @abstractmethod
557    def get_refresh_token(self) -> Optional[str]:
558        """The token used to refresh the access token when it expires"""

The token used to refresh the access token when it expires

@abstractmethod
def get_scopes(self) -> List[str]:
560    @abstractmethod
561    def get_scopes(self) -> List[str]:
562        """List of requested scopes"""

List of requested scopes

@abstractmethod
def get_token_expiry_date(self) -> airbyte_cdk.utils.datetime_helpers.AirbyteDateTime:
564    @abstractmethod
565    def get_token_expiry_date(self) -> AirbyteDateTime:
566        """Expiration date of the access token"""

Expiration date of the access token

@abstractmethod
def set_token_expiry_date(self, value: airbyte_cdk.utils.datetime_helpers.AirbyteDateTime) -> None:
568    @abstractmethod
569    def set_token_expiry_date(self, value: AirbyteDateTime) -> None:
570        """Setter for access token expiration date"""

Setter for access token expiration date

@abstractmethod
def get_access_token_name(self) -> str:
572    @abstractmethod
573    def get_access_token_name(self) -> str:
574        """Field to extract access token from in the response"""

Field to extract access token from in the response

@abstractmethod
def get_expires_in_name(self) -> str:
576    @abstractmethod
577    def get_expires_in_name(self) -> str:
578        """Returns the expires_in field name"""

Returns the expires_in field name

@abstractmethod
def get_refresh_request_body(self) -> Mapping[str, Any]:
580    @abstractmethod
581    def get_refresh_request_body(self) -> Mapping[str, Any]:
582        """Returns the request body to set on the refresh request"""

Returns the request body to set on the refresh request

@abstractmethod
def get_refresh_request_headers(self) -> Mapping[str, Any]:
584    @abstractmethod
585    def get_refresh_request_headers(self) -> Mapping[str, Any]:
586        """Returns the request headers to set on the refresh request"""

Returns the request headers to set on the refresh request

def should_send_refresh_request_as_query_params(self) -> bool:
588    def should_send_refresh_request_as_query_params(self) -> bool:
589        """Returns `True` if the standard refresh args should be sent on the URL
590        query string instead of in the request body.
591
592        Defaults to `False` so existing authenticators retain their previous
593        behavior (params in body, no query params on the refresh URL). Subclasses
594        can override this to opt into the URL-query-string shape required by OAuth
595        providers like Gong.
596        """
597        return False

Returns True if the standard refresh args should be sent on the URL query string instead of in the request body.

Defaults to False so existing authenticators retain their previous behavior (params in body, no query params on the refresh URL). Subclasses can override this to opt into the URL-query-string shape required by OAuth providers like Gong.

@abstractmethod
def get_grant_type(self) -> str:
599    @abstractmethod
600    def get_grant_type(self) -> str:
601        """Returns grant_type specified for requesting access_token"""

Returns grant_type specified for requesting access_token

@abstractmethod
def get_grant_type_name(self) -> str:
603    @abstractmethod
604    def get_grant_type_name(self) -> str:
605        """Returns grant_type specified name for requesting access_token"""

Returns grant_type specified name for requesting access_token

access_token: str
607    @property
608    @abstractmethod
609    def access_token(self) -> str:
610        """Returns the access token"""

Returns the access token