airbyte_cdk.sources.streams.http.requests_native_auth

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5from .oauth import Oauth2Authenticator, SingleUseRefreshTokenOauth2Authenticator
 6from .token import BasicHttpAuthenticator, MultipleTokenAuthenticator, TokenAuthenticator
 7
 8__all__ = [
 9    "Oauth2Authenticator",
10    "SingleUseRefreshTokenOauth2Authenticator",
11    "TokenAuthenticator",
12    "MultipleTokenAuthenticator",
13    "BasicHttpAuthenticator",
14]
 26class Oauth2Authenticator(AbstractOauth2Authenticator):
 27    """
 28    Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials.
 29    The generated access token is attached to each request via the Authorization header.
 30    If a connector_config is provided any mutation of it's value in the scope of this class will emit AirbyteControlConnectorConfigMessage.
 31    """
 32
 33    def __init__(
 34        self,
 35        token_refresh_endpoint: str,
 36        client_id: str,
 37        client_secret: str,
 38        refresh_token: str,
 39        client_id_name: str = "client_id",
 40        client_secret_name: str = "client_secret",
 41        refresh_token_name: str = "refresh_token",
 42        scopes: List[str] | None = None,
 43        token_expiry_date: AirbyteDateTime | None = None,
 44        token_expiry_date_format: str | None = None,
 45        access_token_name: str = "access_token",
 46        expires_in_name: str = "expires_in",
 47        refresh_request_body: Mapping[str, Any] | None = None,
 48        refresh_request_headers: Mapping[str, Any] | None = None,
 49        send_refresh_request_as_query_params: bool = False,
 50        grant_type_name: str = "grant_type",
 51        grant_type: str = "refresh_token",
 52        token_expiry_is_time_of_expiration: bool = False,
 53        refresh_token_error_status_codes: Tuple[int, ...] = (),
 54        refresh_token_error_key: str = "",
 55        refresh_token_error_values: Tuple[str, ...] = (),
 56    ) -> None:
 57        self._token_refresh_endpoint = token_refresh_endpoint
 58        self._client_secret_name = client_secret_name
 59        self._client_secret = client_secret
 60        self._client_id_name = client_id_name
 61        self._client_id = client_id
 62        self._refresh_token_name = refresh_token_name
 63        self._refresh_token = refresh_token
 64        self._scopes = scopes
 65        self._access_token_name = access_token_name
 66        self._expires_in_name = expires_in_name
 67        self._refresh_request_body = refresh_request_body
 68        self._refresh_request_headers = refresh_request_headers
 69        self._send_refresh_request_as_query_params = send_refresh_request_as_query_params
 70        self._grant_type_name = grant_type_name
 71        self._grant_type = grant_type
 72
 73        self._token_expiry_date = token_expiry_date or (ab_datetime_now() - timedelta(days=1))
 74        self._token_expiry_date_format = token_expiry_date_format
 75        self._token_expiry_is_time_of_expiration = token_expiry_is_time_of_expiration
 76        self._access_token = None
 77        super().__init__(
 78            refresh_token_error_status_codes, refresh_token_error_key, refresh_token_error_values
 79        )
 80
 81    def get_token_refresh_endpoint(self) -> str:
 82        return self._token_refresh_endpoint
 83
 84    def get_client_id_name(self) -> str:
 85        return self._client_id_name
 86
 87    def get_client_id(self) -> str:
 88        return self._client_id
 89
 90    def get_client_secret_name(self) -> str:
 91        return self._client_secret_name
 92
 93    def get_client_secret(self) -> str:
 94        return self._client_secret
 95
 96    def get_refresh_token_name(self) -> str:
 97        return self._refresh_token_name
 98
 99    def get_refresh_token(self) -> str:
100        return self._refresh_token
101
102    def get_access_token_name(self) -> str:
103        return self._access_token_name
104
105    def get_scopes(self) -> list[str]:
106        return self._scopes  # type: ignore[return-value]
107
108    def get_expires_in_name(self) -> str:
109        return self._expires_in_name
110
111    def get_refresh_request_body(self) -> Mapping[str, Any]:
112        return self._refresh_request_body  # type: ignore[return-value]
113
114    def get_refresh_request_headers(self) -> Mapping[str, Any]:
115        return self._refresh_request_headers  # type: ignore[return-value]
116
117    def should_send_refresh_request_as_query_params(self) -> bool:
118        return self._send_refresh_request_as_query_params
119
120    def get_grant_type_name(self) -> str:
121        return self._grant_type_name
122
123    def get_grant_type(self) -> str:
124        return self._grant_type
125
126    def get_token_expiry_date(self) -> AirbyteDateTime:
127        return self._token_expiry_date
128
129    def set_token_expiry_date(self, value: AirbyteDateTime) -> None:
130        self._token_expiry_date = value
131
132    @property
133    def token_expiry_is_time_of_expiration(self) -> bool:
134        return self._token_expiry_is_time_of_expiration
135
136    @property
137    def token_expiry_date_format(self) -> Optional[str]:
138        return self._token_expiry_date_format
139
140    @property
141    def access_token(self) -> str:
142        return self._access_token  # type: ignore[return-value]
143
144    @access_token.setter
145    def access_token(self, value: str) -> None:
146        self._access_token = value  # type: ignore[assignment]  # Incorrect type for assignment

Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials. The generated access token is attached to each request via the Authorization header. If a connector_config is provided any mutation of it's value in the scope of this class will emit AirbyteControlConnectorConfigMessage.

Oauth2Authenticator( token_refresh_endpoint: str, client_id: str, client_secret: str, refresh_token: str, client_id_name: str = 'client_id', client_secret_name: str = 'client_secret', refresh_token_name: str = 'refresh_token', scopes: Optional[List[str]] = None, token_expiry_date: airbyte_cdk.utils.datetime_helpers.AirbyteDateTime | None = None, token_expiry_date_format: str | None = None, access_token_name: str = 'access_token', expires_in_name: str = 'expires_in', refresh_request_body: Optional[Mapping[str, Any]] = None, refresh_request_headers: Optional[Mapping[str, Any]] = None, send_refresh_request_as_query_params: bool = False, grant_type_name: str = 'grant_type', grant_type: str = 'refresh_token', token_expiry_is_time_of_expiration: bool = False, refresh_token_error_status_codes: Tuple[int, ...] = (), refresh_token_error_key: str = '', refresh_token_error_values: Tuple[str, ...] = ())
33    def __init__(
34        self,
35        token_refresh_endpoint: str,
36        client_id: str,
37        client_secret: str,
38        refresh_token: str,
39        client_id_name: str = "client_id",
40        client_secret_name: str = "client_secret",
41        refresh_token_name: str = "refresh_token",
42        scopes: List[str] | None = None,
43        token_expiry_date: AirbyteDateTime | None = None,
44        token_expiry_date_format: str | None = None,
45        access_token_name: str = "access_token",
46        expires_in_name: str = "expires_in",
47        refresh_request_body: Mapping[str, Any] | None = None,
48        refresh_request_headers: Mapping[str, Any] | None = None,
49        send_refresh_request_as_query_params: bool = False,
50        grant_type_name: str = "grant_type",
51        grant_type: str = "refresh_token",
52        token_expiry_is_time_of_expiration: bool = False,
53        refresh_token_error_status_codes: Tuple[int, ...] = (),
54        refresh_token_error_key: str = "",
55        refresh_token_error_values: Tuple[str, ...] = (),
56    ) -> None:
57        self._token_refresh_endpoint = token_refresh_endpoint
58        self._client_secret_name = client_secret_name
59        self._client_secret = client_secret
60        self._client_id_name = client_id_name
61        self._client_id = client_id
62        self._refresh_token_name = refresh_token_name
63        self._refresh_token = refresh_token
64        self._scopes = scopes
65        self._access_token_name = access_token_name
66        self._expires_in_name = expires_in_name
67        self._refresh_request_body = refresh_request_body
68        self._refresh_request_headers = refresh_request_headers
69        self._send_refresh_request_as_query_params = send_refresh_request_as_query_params
70        self._grant_type_name = grant_type_name
71        self._grant_type = grant_type
72
73        self._token_expiry_date = token_expiry_date or (ab_datetime_now() - timedelta(days=1))
74        self._token_expiry_date_format = token_expiry_date_format
75        self._token_expiry_is_time_of_expiration = token_expiry_is_time_of_expiration
76        self._access_token = None
77        super().__init__(
78            refresh_token_error_status_codes, refresh_token_error_key, refresh_token_error_values
79        )

If all of refresh_token_error_status_codes, refresh_token_error_key, and refresh_token_error_values are set, then http errors with such params will be wrapped in AirbyteTracedException.

def get_token_refresh_endpoint(self) -> str:
81    def get_token_refresh_endpoint(self) -> str:
82        return self._token_refresh_endpoint

Returns the endpoint to refresh the access token

def get_client_id_name(self) -> str:
84    def get_client_id_name(self) -> str:
85        return self._client_id_name

The client id name to authenticate

def get_client_id(self) -> str:
87    def get_client_id(self) -> str:
88        return self._client_id

The client id to authenticate

def get_client_secret_name(self) -> str:
90    def get_client_secret_name(self) -> str:
91        return self._client_secret_name

The client secret name to authenticate

def get_client_secret(self) -> str:
93    def get_client_secret(self) -> str:
94        return self._client_secret

The client secret to authenticate

def get_refresh_token_name(self) -> str:
96    def get_refresh_token_name(self) -> str:
97        return self._refresh_token_name

The refresh token name to authenticate

def get_refresh_token(self) -> str:
 99    def get_refresh_token(self) -> str:
100        return self._refresh_token

The token used to refresh the access token when it expires

def get_access_token_name(self) -> str:
102    def get_access_token_name(self) -> str:
103        return self._access_token_name

Field to extract access token from in the response

def get_scopes(self) -> list[str]:
105    def get_scopes(self) -> list[str]:
106        return self._scopes  # type: ignore[return-value]

List of requested scopes

def get_expires_in_name(self) -> str:
108    def get_expires_in_name(self) -> str:
109        return self._expires_in_name

Returns the expires_in field name

def get_refresh_request_body(self) -> Mapping[str, Any]:
111    def get_refresh_request_body(self) -> Mapping[str, Any]:
112        return self._refresh_request_body  # type: ignore[return-value]

Returns the request body to set on the refresh request

def get_refresh_request_headers(self) -> Mapping[str, Any]:
114    def get_refresh_request_headers(self) -> Mapping[str, Any]:
115        return self._refresh_request_headers  # type: ignore[return-value]

Returns the request headers to set on the refresh request

def should_send_refresh_request_as_query_params(self) -> bool:
117    def should_send_refresh_request_as_query_params(self) -> bool:
118        return self._send_refresh_request_as_query_params

Returns True if the standard refresh args should be sent on the URL query string instead of in the request body.

Defaults to False so existing authenticators retain their previous behavior (params in body, no query params on the refresh URL). Subclasses can override this to opt into the URL-query-string shape required by OAuth providers like Gong.

def get_grant_type_name(self) -> str:
120    def get_grant_type_name(self) -> str:
121        return self._grant_type_name

Returns grant_type specified name for requesting access_token

def get_grant_type(self) -> str:
123    def get_grant_type(self) -> str:
124        return self._grant_type

Returns grant_type specified for requesting access_token

def get_token_expiry_date(self) -> airbyte_cdk.utils.datetime_helpers.AirbyteDateTime:
126    def get_token_expiry_date(self) -> AirbyteDateTime:
127        return self._token_expiry_date

Expiration date of the access token

def set_token_expiry_date(self, value: airbyte_cdk.utils.datetime_helpers.AirbyteDateTime) -> None:
129    def set_token_expiry_date(self, value: AirbyteDateTime) -> None:
130        self._token_expiry_date = value

Setter for access token expiration date

token_expiry_is_time_of_expiration: bool
132    @property
133    def token_expiry_is_time_of_expiration(self) -> bool:
134        return self._token_expiry_is_time_of_expiration

Indicates that the Token Expiry returns the date until which the token will be valid, not the amount of time it will be valid.

token_expiry_date_format: Optional[str]
136    @property
137    def token_expiry_date_format(self) -> Optional[str]:
138        return self._token_expiry_date_format

Format of the datetime; exists it if expires_in is returned as the expiration datetime instead of seconds until it expires

access_token: str
140    @property
141    def access_token(self) -> str:
142        return self._access_token  # type: ignore[return-value]

Returns the access token

class SingleUseRefreshTokenOauth2Authenticator(airbyte_cdk.sources.streams.http.requests_native_auth.Oauth2Authenticator):
149class SingleUseRefreshTokenOauth2Authenticator(Oauth2Authenticator):
150    """
151    Authenticator that should be used for API implementing single use refresh tokens:
152    when refreshing access token some API returns a new refresh token that needs to used in the next refresh flow.
153    This authenticator updates the configuration with new refresh token by emitting Airbyte control message from an observed mutation.
154    By default, this authenticator expects a connector config with a "credentials" field with the following nested fields: client_id,
155    client_secret, refresh_token. This behavior can be changed by defining custom config path (using dpath paths) in client_id_config_path,
156    client_secret_config_path, refresh_token_config_path constructor arguments.
157    """
158
159    def __init__(
160        self,
161        connector_config: Mapping[str, Any],
162        token_refresh_endpoint: str,
163        scopes: List[str] | None = None,
164        access_token_name: str = "access_token",
165        expires_in_name: str = "expires_in",
166        refresh_token_name: str = "refresh_token",
167        refresh_request_body: Mapping[str, Any] | None = None,
168        refresh_request_headers: Mapping[str, Any] | None = None,
169        send_refresh_request_as_query_params: bool = False,
170        grant_type_name: str = "grant_type",
171        grant_type: str = "refresh_token",
172        client_id_name: str = "client_id",
173        client_id: Optional[str] = None,
174        client_secret_name: str = "client_secret",
175        client_secret: Optional[str] = None,
176        access_token_config_path: Sequence[str] = ("credentials", "access_token"),
177        refresh_token_config_path: Sequence[str] = ("credentials", "refresh_token"),
178        token_expiry_date_config_path: Sequence[str] = ("credentials", "token_expiry_date"),
179        token_expiry_date_format: Optional[str] = None,
180        message_repository: MessageRepository = NoopMessageRepository(),
181        token_expiry_is_time_of_expiration: bool = False,
182        refresh_token_error_status_codes: Tuple[int, ...] = (),
183        refresh_token_error_key: str = "",
184        refresh_token_error_values: Tuple[str, ...] = (),
185    ) -> None:
186        """
187        Args:
188            connector_config (Mapping[str, Any]): The full connector configuration
189            token_refresh_endpoint (str): Full URL to the token refresh endpoint
190            scopes (List[str], optional): List of OAuth scopes to pass in the refresh token request body. Defaults to None.
191            access_token_name (str, optional): Name of the access token field, used to parse the refresh token response. Defaults to "access_token".
192            expires_in_name (str, optional): Name of the name of the field that characterizes when the current access token will expire, used to parse the refresh token response. Defaults to "expires_in".
193            refresh_token_name (str, optional): Name of the name of the refresh token field, used to parse the refresh token response. Defaults to "refresh_token".
194            refresh_request_body (Mapping[str, Any], optional): Custom key value pair that will be added to the refresh token request body. Defaults to None.
195            refresh_request_headers (Mapping[str, Any], optional): Custom key value pair that will be added to the refresh token request headers. Defaults to None.
196            send_refresh_request_as_query_params (bool, optional): When True, the standard refresh args (`grant_type`, `refresh_token`, client credentials when not in an `Authorization` header, scopes, plus any `refresh_request_body` extras) are sent on the URL query string and the request body is emitted empty. Use this for OAuth providers like Gong that document their refresh endpoint with refresh args on the URL query string. Defaults to False.
197            grant_type (str, optional): OAuth grant type. Defaults to "refresh_token".
198            client_id (Optional[str]): The client id to authenticate. If not specified, defaults to credentials.client_id in the config object.
199            client_secret (Optional[str]): The client secret to authenticate. If not specified, defaults to credentials.client_secret in the config object.
200            access_token_config_path (Sequence[str]): Dpath to the access_token field in the connector configuration. Defaults to ("credentials", "access_token").
201            refresh_token_config_path (Sequence[str]): Dpath to the refresh_token field in the connector configuration. Defaults to ("credentials", "refresh_token").
202            token_expiry_date_config_path (Sequence[str]): Dpath to the token_expiry_date field in the connector configuration. Defaults to ("credentials", "token_expiry_date").
203            token_expiry_date_format (Optional[str]): Date format of the token expiry date field (set by expires_in_name). If not specified the token expiry date is interpreted as number of seconds until expiration.
204            token_expiry_is_time_of_expiration bool: set True it if expires_in is returned as time of expiration instead of the number seconds until expiration
205            message_repository (MessageRepository): the message repository used to emit logs on HTTP requests and control message on config update
206        """
207        self._connector_config = connector_config
208        self._client_id: str = self._get_config_value_by_path(
209            ("credentials", "client_id"), client_id
210        )
211        self._client_secret: str = self._get_config_value_by_path(
212            ("credentials", "client_secret"), client_secret
213        )
214        self._client_id_name = client_id_name
215        self._client_secret_name = client_secret_name
216        self._access_token_config_path = access_token_config_path
217        self._refresh_token_config_path = refresh_token_config_path
218        self._token_expiry_date_config_path = token_expiry_date_config_path
219        self._token_expiry_date_format = token_expiry_date_format
220        self._refresh_token_name = refresh_token_name
221        self._grant_type_name = grant_type_name
222        self._connector_config = connector_config
223        self.__message_repository = message_repository
224        super().__init__(
225            token_refresh_endpoint=token_refresh_endpoint,
226            client_id_name=self._client_id_name,
227            client_id=self._client_id,
228            client_secret_name=self._client_secret_name,
229            client_secret=self._client_secret,
230            refresh_token=self.get_refresh_token(),
231            refresh_token_name=self._refresh_token_name,
232            scopes=scopes,
233            token_expiry_date=self.get_token_expiry_date(),
234            access_token_name=access_token_name,
235            expires_in_name=expires_in_name,
236            refresh_request_body=refresh_request_body,
237            refresh_request_headers=refresh_request_headers,
238            send_refresh_request_as_query_params=send_refresh_request_as_query_params,
239            grant_type_name=self._grant_type_name,
240            grant_type=grant_type,
241            token_expiry_date_format=token_expiry_date_format,
242            token_expiry_is_time_of_expiration=token_expiry_is_time_of_expiration,
243            refresh_token_error_status_codes=refresh_token_error_status_codes,
244            refresh_token_error_key=refresh_token_error_key,
245            refresh_token_error_values=refresh_token_error_values,
246        )
247
248    @property
249    def access_token(self) -> str:
250        """
251        Retrieve the access token from the configuration.
252
253        Returns:
254            str: The access token.
255        """
256        return self._get_config_value_by_path(self._access_token_config_path)  # type: ignore[return-value]
257
258    @access_token.setter
259    def access_token(self, new_access_token: str) -> None:
260        """
261        Sets a new access token.
262
263        Args:
264            new_access_token (str): The new access token to be set.
265        """
266        self._set_config_value_by_path(self._access_token_config_path, new_access_token)
267
268    def get_refresh_token(self) -> str:
269        """
270        Retrieve the refresh token from the configuration.
271
272        This method fetches the refresh token using the configuration path specified
273        by `_refresh_token_config_path`.
274
275        Returns:
276            str: The refresh token as a string.
277        """
278        return self._get_config_value_by_path(self._refresh_token_config_path)  # type: ignore[return-value]
279
280    def set_refresh_token(self, new_refresh_token: str) -> None:
281        """
282        Updates the refresh token in the configuration.
283
284        Args:
285            new_refresh_token (str): The new refresh token to be set.
286        """
287        self._set_config_value_by_path(self._refresh_token_config_path, new_refresh_token)
288
289    def get_token_expiry_date(self) -> AirbyteDateTime:
290        """
291        Retrieves the token expiry date from the configuration.
292
293        This method fetches the token expiry date from the configuration using the specified path.
294        If the expiry date is an empty string, it returns the current date and time minus one day.
295        Otherwise, it parses the expiry date string into an AirbyteDateTime object.
296
297        Returns:
298            AirbyteDateTime: The parsed or calculated token expiry date.
299
300        Raises:
301            TypeError: If the result is not an instance of AirbyteDateTime.
302        """
303        expiry_date = self._get_config_value_by_path(self._token_expiry_date_config_path)
304        result = (
305            ab_datetime_now() - timedelta(days=1)
306            if expiry_date == ""
307            else ab_datetime_parse(str(expiry_date))
308        )
309        if isinstance(result, AirbyteDateTime):
310            return result
311        raise TypeError("Invalid datetime conversion")
312
313    def set_token_expiry_date(self, new_token_expiry_date: AirbyteDateTime) -> None:  # type: ignore[override]
314        """
315        Sets the token expiry date in the configuration.
316
317        Args:
318            new_token_expiry_date (AirbyteDateTime): The new expiry date for the token.
319        """
320        self._set_config_value_by_path(
321            self._token_expiry_date_config_path, str(new_token_expiry_date)
322        )
323
324    def token_has_expired(self) -> bool:
325        """Returns True if the token is expired"""
326        return ab_datetime_now() > self.get_token_expiry_date()
327
328    def get_access_token(self) -> str:
329        """Retrieve new access and refresh token if the access token has expired.
330
331        This method uses double-checked locking to ensure thread-safe token refresh.
332        This is especially critical for single-use refresh tokens where concurrent
333        refresh attempts would cause failures as the refresh token is invalidated
334        after first use.
335
336        The new refresh token is persisted with the set_refresh_token function.
337
338        Returns:
339            str: The current access_token, updated if it was previously expired.
340        """
341        if self.token_has_expired():
342            with self._token_refresh_lock:
343                # Double-check after acquiring lock - another thread may have already refreshed
344                if self.token_has_expired():
345                    self.refresh_and_set_access_token()
346        return self.access_token
347
348    def refresh_and_set_access_token(self) -> None:
349        """Force refresh the access token and update internal state.
350
351        For single-use refresh tokens, this also persists the new refresh token
352        and emits a control message to update the connector config. If the
353        response omits a refresh token, the existing one is preserved.
354        """
355        new_access_token, access_token_expires_in, new_refresh_token = self.refresh_access_token()
356        self.access_token = new_access_token
357        if new_refresh_token is not None:
358            self.set_refresh_token(new_refresh_token)
359        self.set_token_expiry_date(access_token_expires_in)
360        self._emit_control_message()
361
362    def refresh_access_token(self) -> Tuple[str, AirbyteDateTime, Optional[str]]:  # type: ignore[override]
363        """
364        Refreshes the access token by making a handled request and extracting the necessary token information.
365
366        Returns:
367            A tuple of (access_token, token_expiry_date, refresh_token). The refresh token
368            is `None` when the OAuth provider omits it from the response.
369        """
370        response_json = self._make_handled_request()
371        return (
372            self._extract_access_token(response_json),
373            self._extract_token_expiry_date(response_json),
374            self._extract_refresh_token(response_json),
375        )
376
377    def _set_config_value_by_path(self, config_path: Union[str, Sequence[str]], value: Any) -> None:
378        """
379        Set a value in the connector configuration at the specified path.
380
381        Args:
382            config_path (Union[str, Sequence[str]]): The path within the configuration where the value should be set.
383                This can be a string representing a single key or a sequence of strings representing a nested path.
384            value (Any): The value to set at the specified path in the configuration.
385
386        Returns:
387            None
388        """
389        dpath.new(self._connector_config, config_path, value)  # type: ignore[arg-type]
390
391    def _get_config_value_by_path(
392        self, config_path: Union[str, Sequence[str]], default: Optional[str] = None
393    ) -> str | Any:
394        """
395        Retrieve a value from the connector configuration using a specified path.
396
397        Args:
398            config_path (Union[str, Sequence[str]]): The path to the desired configuration value. This can be a string or a sequence of strings.
399            default (Optional[str], optional): The default value to return if the specified path does not exist in the configuration. Defaults to None.
400
401        Returns:
402            Any: The value from the configuration at the specified path, or the default value if the path does not exist.
403        """
404        return dpath.get(
405            self._connector_config,  # type: ignore[arg-type]
406            config_path,
407            default=default if default is not None else "",
408        )
409
410    def _emit_control_message(self) -> None:
411        """
412        Emits a control message based on the connector configuration.
413
414        Control messages for config updates (like refreshed tokens) must be printed directly
415        to stdout so the platform can process them immediately. The message repository is
416        also used to queue the message for any additional processing.
417
418        Note:
419            The function `emit_configuration_as_airbyte_control_message` prints directly to
420            stdout, which is required for the platform to detect and persist config changes.
421        """
422        # Always emit to stdout so the platform can process the config update immediately.
423        # This is critical for single-use refresh tokens where the new token must be persisted
424        # before subsequent operations try to use the old (now invalid) token.
425        emit_configuration_as_airbyte_control_message(self._connector_config)  # type: ignore[arg-type]
426
427        # Also emit to the message repository for any additional processing (e.g., logging)
428        if not isinstance(self._message_repository, NoopMessageRepository):
429            self._message_repository.emit_message(
430                create_connector_config_control_message(self._connector_config)  # type: ignore[arg-type]
431            )
432
433    @property
434    def _message_repository(self) -> MessageRepository:
435        """
436        Overriding AbstractOauth2Authenticator._message_repository to allow for HTTP request logs
437        """
438        return self.__message_repository

Authenticator that should be used for API implementing single use refresh tokens: when refreshing access token some API returns a new refresh token that needs to used in the next refresh flow. This authenticator updates the configuration with new refresh token by emitting Airbyte control message from an observed mutation. By default, this authenticator expects a connector config with a "credentials" field with the following nested fields: client_id, client_secret, refresh_token. This behavior can be changed by defining custom config path (using dpath paths) in client_id_config_path, client_secret_config_path, refresh_token_config_path constructor arguments.

SingleUseRefreshTokenOauth2Authenticator( connector_config: Mapping[str, Any], token_refresh_endpoint: str, scopes: Optional[List[str]] = None, access_token_name: str = 'access_token', expires_in_name: str = 'expires_in', refresh_token_name: str = 'refresh_token', refresh_request_body: Optional[Mapping[str, Any]] = None, refresh_request_headers: Optional[Mapping[str, Any]] = None, send_refresh_request_as_query_params: bool = False, grant_type_name: str = 'grant_type', grant_type: str = 'refresh_token', client_id_name: str = 'client_id', client_id: Optional[str] = None, client_secret_name: str = 'client_secret', client_secret: Optional[str] = None, access_token_config_path: Sequence[str] = ('credentials', 'access_token'), refresh_token_config_path: Sequence[str] = ('credentials', 'refresh_token'), token_expiry_date_config_path: Sequence[str] = ('credentials', 'token_expiry_date'), token_expiry_date_format: Optional[str] = None, message_repository: airbyte_cdk.MessageRepository = <airbyte_cdk.sources.message.NoopMessageRepository object>, token_expiry_is_time_of_expiration: bool = False, refresh_token_error_status_codes: Tuple[int, ...] = (), refresh_token_error_key: str = '', refresh_token_error_values: Tuple[str, ...] = ())
159    def __init__(
160        self,
161        connector_config: Mapping[str, Any],
162        token_refresh_endpoint: str,
163        scopes: List[str] | None = None,
164        access_token_name: str = "access_token",
165        expires_in_name: str = "expires_in",
166        refresh_token_name: str = "refresh_token",
167        refresh_request_body: Mapping[str, Any] | None = None,
168        refresh_request_headers: Mapping[str, Any] | None = None,
169        send_refresh_request_as_query_params: bool = False,
170        grant_type_name: str = "grant_type",
171        grant_type: str = "refresh_token",
172        client_id_name: str = "client_id",
173        client_id: Optional[str] = None,
174        client_secret_name: str = "client_secret",
175        client_secret: Optional[str] = None,
176        access_token_config_path: Sequence[str] = ("credentials", "access_token"),
177        refresh_token_config_path: Sequence[str] = ("credentials", "refresh_token"),
178        token_expiry_date_config_path: Sequence[str] = ("credentials", "token_expiry_date"),
179        token_expiry_date_format: Optional[str] = None,
180        message_repository: MessageRepository = NoopMessageRepository(),
181        token_expiry_is_time_of_expiration: bool = False,
182        refresh_token_error_status_codes: Tuple[int, ...] = (),
183        refresh_token_error_key: str = "",
184        refresh_token_error_values: Tuple[str, ...] = (),
185    ) -> None:
186        """
187        Args:
188            connector_config (Mapping[str, Any]): The full connector configuration
189            token_refresh_endpoint (str): Full URL to the token refresh endpoint
190            scopes (List[str], optional): List of OAuth scopes to pass in the refresh token request body. Defaults to None.
191            access_token_name (str, optional): Name of the access token field, used to parse the refresh token response. Defaults to "access_token".
192            expires_in_name (str, optional): Name of the name of the field that characterizes when the current access token will expire, used to parse the refresh token response. Defaults to "expires_in".
193            refresh_token_name (str, optional): Name of the name of the refresh token field, used to parse the refresh token response. Defaults to "refresh_token".
194            refresh_request_body (Mapping[str, Any], optional): Custom key value pair that will be added to the refresh token request body. Defaults to None.
195            refresh_request_headers (Mapping[str, Any], optional): Custom key value pair that will be added to the refresh token request headers. Defaults to None.
196            send_refresh_request_as_query_params (bool, optional): When True, the standard refresh args (`grant_type`, `refresh_token`, client credentials when not in an `Authorization` header, scopes, plus any `refresh_request_body` extras) are sent on the URL query string and the request body is emitted empty. Use this for OAuth providers like Gong that document their refresh endpoint with refresh args on the URL query string. Defaults to False.
197            grant_type (str, optional): OAuth grant type. Defaults to "refresh_token".
198            client_id (Optional[str]): The client id to authenticate. If not specified, defaults to credentials.client_id in the config object.
199            client_secret (Optional[str]): The client secret to authenticate. If not specified, defaults to credentials.client_secret in the config object.
200            access_token_config_path (Sequence[str]): Dpath to the access_token field in the connector configuration. Defaults to ("credentials", "access_token").
201            refresh_token_config_path (Sequence[str]): Dpath to the refresh_token field in the connector configuration. Defaults to ("credentials", "refresh_token").
202            token_expiry_date_config_path (Sequence[str]): Dpath to the token_expiry_date field in the connector configuration. Defaults to ("credentials", "token_expiry_date").
203            token_expiry_date_format (Optional[str]): Date format of the token expiry date field (set by expires_in_name). If not specified the token expiry date is interpreted as number of seconds until expiration.
204            token_expiry_is_time_of_expiration bool: set True it if expires_in is returned as time of expiration instead of the number seconds until expiration
205            message_repository (MessageRepository): the message repository used to emit logs on HTTP requests and control message on config update
206        """
207        self._connector_config = connector_config
208        self._client_id: str = self._get_config_value_by_path(
209            ("credentials", "client_id"), client_id
210        )
211        self._client_secret: str = self._get_config_value_by_path(
212            ("credentials", "client_secret"), client_secret
213        )
214        self._client_id_name = client_id_name
215        self._client_secret_name = client_secret_name
216        self._access_token_config_path = access_token_config_path
217        self._refresh_token_config_path = refresh_token_config_path
218        self._token_expiry_date_config_path = token_expiry_date_config_path
219        self._token_expiry_date_format = token_expiry_date_format
220        self._refresh_token_name = refresh_token_name
221        self._grant_type_name = grant_type_name
222        self._connector_config = connector_config
223        self.__message_repository = message_repository
224        super().__init__(
225            token_refresh_endpoint=token_refresh_endpoint,
226            client_id_name=self._client_id_name,
227            client_id=self._client_id,
228            client_secret_name=self._client_secret_name,
229            client_secret=self._client_secret,
230            refresh_token=self.get_refresh_token(),
231            refresh_token_name=self._refresh_token_name,
232            scopes=scopes,
233            token_expiry_date=self.get_token_expiry_date(),
234            access_token_name=access_token_name,
235            expires_in_name=expires_in_name,
236            refresh_request_body=refresh_request_body,
237            refresh_request_headers=refresh_request_headers,
238            send_refresh_request_as_query_params=send_refresh_request_as_query_params,
239            grant_type_name=self._grant_type_name,
240            grant_type=grant_type,
241            token_expiry_date_format=token_expiry_date_format,
242            token_expiry_is_time_of_expiration=token_expiry_is_time_of_expiration,
243            refresh_token_error_status_codes=refresh_token_error_status_codes,
244            refresh_token_error_key=refresh_token_error_key,
245            refresh_token_error_values=refresh_token_error_values,
246        )
Arguments:
  • connector_config (Mapping[str, Any]): The full connector configuration
  • token_refresh_endpoint (str): Full URL to the token refresh endpoint
  • scopes (List[str], optional): List of OAuth scopes to pass in the refresh token request body. Defaults to None.
  • access_token_name (str, optional): Name of the access token field, used to parse the refresh token response. Defaults to "access_token".
  • expires_in_name (str, optional): Name of the name of the field that characterizes when the current access token will expire, used to parse the refresh token response. Defaults to "expires_in".
  • refresh_token_name (str, optional): Name of the name of the refresh token field, used to parse the refresh token response. Defaults to "refresh_token".
  • refresh_request_body (Mapping[str, Any], optional): Custom key value pair that will be added to the refresh token request body. Defaults to None.
  • refresh_request_headers (Mapping[str, Any], optional): Custom key value pair that will be added to the refresh token request headers. Defaults to None.
  • send_refresh_request_as_query_params (bool, optional): When True, the standard refresh args (grant_type, refresh_token, client credentials when not in an Authorization header, scopes, plus any refresh_request_body extras) are sent on the URL query string and the request body is emitted empty. Use this for OAuth providers like Gong that document their refresh endpoint with refresh args on the URL query string. Defaults to False.
  • grant_type (str, optional): OAuth grant type. Defaults to "refresh_token".
  • client_id (Optional[str]): The client id to authenticate. If not specified, defaults to credentials.client_id in the config object.
  • client_secret (Optional[str]): The client secret to authenticate. If not specified, defaults to credentials.client_secret in the config object.
  • access_token_config_path (Sequence[str]): Dpath to the access_token field in the connector configuration. Defaults to ("credentials", "access_token").
  • refresh_token_config_path (Sequence[str]): Dpath to the refresh_token field in the connector configuration. Defaults to ("credentials", "refresh_token").
  • token_expiry_date_config_path (Sequence[str]): Dpath to the token_expiry_date field in the connector configuration. Defaults to ("credentials", "token_expiry_date").
  • token_expiry_date_format (Optional[str]): Date format of the token expiry date field (set by expires_in_name). If not specified the token expiry date is interpreted as number of seconds until expiration.
  • token_expiry_is_time_of_expiration bool: set True it if expires_in is returned as time of expiration instead of the number seconds until expiration
  • message_repository (MessageRepository): the message repository used to emit logs on HTTP requests and control message on config update
access_token: str
248    @property
249    def access_token(self) -> str:
250        """
251        Retrieve the access token from the configuration.
252
253        Returns:
254            str: The access token.
255        """
256        return self._get_config_value_by_path(self._access_token_config_path)  # type: ignore[return-value]

Retrieve the access token from the configuration.

Returns:

str: The access token.

def get_refresh_token(self) -> str:
268    def get_refresh_token(self) -> str:
269        """
270        Retrieve the refresh token from the configuration.
271
272        This method fetches the refresh token using the configuration path specified
273        by `_refresh_token_config_path`.
274
275        Returns:
276            str: The refresh token as a string.
277        """
278        return self._get_config_value_by_path(self._refresh_token_config_path)  # type: ignore[return-value]

Retrieve the refresh token from the configuration.

This method fetches the refresh token using the configuration path specified by _refresh_token_config_path.

Returns:

str: The refresh token as a string.

def set_refresh_token(self, new_refresh_token: str) -> None:
280    def set_refresh_token(self, new_refresh_token: str) -> None:
281        """
282        Updates the refresh token in the configuration.
283
284        Args:
285            new_refresh_token (str): The new refresh token to be set.
286        """
287        self._set_config_value_by_path(self._refresh_token_config_path, new_refresh_token)

Updates the refresh token in the configuration.

Arguments:
  • new_refresh_token (str): The new refresh token to be set.
def get_token_expiry_date(self) -> airbyte_cdk.utils.datetime_helpers.AirbyteDateTime:
289    def get_token_expiry_date(self) -> AirbyteDateTime:
290        """
291        Retrieves the token expiry date from the configuration.
292
293        This method fetches the token expiry date from the configuration using the specified path.
294        If the expiry date is an empty string, it returns the current date and time minus one day.
295        Otherwise, it parses the expiry date string into an AirbyteDateTime object.
296
297        Returns:
298            AirbyteDateTime: The parsed or calculated token expiry date.
299
300        Raises:
301            TypeError: If the result is not an instance of AirbyteDateTime.
302        """
303        expiry_date = self._get_config_value_by_path(self._token_expiry_date_config_path)
304        result = (
305            ab_datetime_now() - timedelta(days=1)
306            if expiry_date == ""
307            else ab_datetime_parse(str(expiry_date))
308        )
309        if isinstance(result, AirbyteDateTime):
310            return result
311        raise TypeError("Invalid datetime conversion")

Retrieves the token expiry date from the configuration.

This method fetches the token expiry date from the configuration using the specified path. If the expiry date is an empty string, it returns the current date and time minus one day. Otherwise, it parses the expiry date string into an AirbyteDateTime object.

Returns:

AirbyteDateTime: The parsed or calculated token expiry date.

Raises:
  • TypeError: If the result is not an instance of AirbyteDateTime.
def set_token_expiry_date( self, new_token_expiry_date: airbyte_cdk.utils.datetime_helpers.AirbyteDateTime) -> None:
313    def set_token_expiry_date(self, new_token_expiry_date: AirbyteDateTime) -> None:  # type: ignore[override]
314        """
315        Sets the token expiry date in the configuration.
316
317        Args:
318            new_token_expiry_date (AirbyteDateTime): The new expiry date for the token.
319        """
320        self._set_config_value_by_path(
321            self._token_expiry_date_config_path, str(new_token_expiry_date)
322        )

Sets the token expiry date in the configuration.

Arguments:
  • new_token_expiry_date (AirbyteDateTime): The new expiry date for the token.
def token_has_expired(self) -> bool:
324    def token_has_expired(self) -> bool:
325        """Returns True if the token is expired"""
326        return ab_datetime_now() > self.get_token_expiry_date()

Returns True if the token is expired

def get_access_token(self) -> str:
328    def get_access_token(self) -> str:
329        """Retrieve new access and refresh token if the access token has expired.
330
331        This method uses double-checked locking to ensure thread-safe token refresh.
332        This is especially critical for single-use refresh tokens where concurrent
333        refresh attempts would cause failures as the refresh token is invalidated
334        after first use.
335
336        The new refresh token is persisted with the set_refresh_token function.
337
338        Returns:
339            str: The current access_token, updated if it was previously expired.
340        """
341        if self.token_has_expired():
342            with self._token_refresh_lock:
343                # Double-check after acquiring lock - another thread may have already refreshed
344                if self.token_has_expired():
345                    self.refresh_and_set_access_token()
346        return self.access_token

Retrieve new access and refresh token if the access token has expired.

This method uses double-checked locking to ensure thread-safe token refresh. This is especially critical for single-use refresh tokens where concurrent refresh attempts would cause failures as the refresh token is invalidated after first use.

The new refresh token is persisted with the set_refresh_token function.

Returns:

str: The current access_token, updated if it was previously expired.

def refresh_and_set_access_token(self) -> None:
348    def refresh_and_set_access_token(self) -> None:
349        """Force refresh the access token and update internal state.
350
351        For single-use refresh tokens, this also persists the new refresh token
352        and emits a control message to update the connector config. If the
353        response omits a refresh token, the existing one is preserved.
354        """
355        new_access_token, access_token_expires_in, new_refresh_token = self.refresh_access_token()
356        self.access_token = new_access_token
357        if new_refresh_token is not None:
358            self.set_refresh_token(new_refresh_token)
359        self.set_token_expiry_date(access_token_expires_in)
360        self._emit_control_message()

Force refresh the access token and update internal state.

For single-use refresh tokens, this also persists the new refresh token and emits a control message to update the connector config. If the response omits a refresh token, the existing one is preserved.

def refresh_access_token( self) -> Tuple[str, airbyte_cdk.utils.datetime_helpers.AirbyteDateTime, Optional[str]]:
362    def refresh_access_token(self) -> Tuple[str, AirbyteDateTime, Optional[str]]:  # type: ignore[override]
363        """
364        Refreshes the access token by making a handled request and extracting the necessary token information.
365
366        Returns:
367            A tuple of (access_token, token_expiry_date, refresh_token). The refresh token
368            is `None` when the OAuth provider omits it from the response.
369        """
370        response_json = self._make_handled_request()
371        return (
372            self._extract_access_token(response_json),
373            self._extract_token_expiry_date(response_json),
374            self._extract_refresh_token(response_json),
375        )

Refreshes the access token by making a handled request and extracting the necessary token information.

Returns:

A tuple of (access_token, token_expiry_date, refresh_token). The refresh token is None when the OAuth provider omits it from the response.

39class TokenAuthenticator(AbstractHeaderAuthenticator):
40    """
41    Builds auth header, based on the token provided.
42    The token is attached to each request via the `auth_header` header.
43    """
44
45    @property
46    def auth_header(self) -> str:
47        return self._auth_header
48
49    @property
50    def token(self) -> str:
51        return f"{self._auth_method} {self._token}"
52
53    def __init__(self, token: str, auth_method: str = "Bearer", auth_header: str = "Authorization"):
54        self._auth_header = auth_header
55        self._auth_method = auth_method
56        self._token = token

Builds auth header, based on the token provided. The token is attached to each request via the auth_header header.

TokenAuthenticator( token: str, auth_method: str = 'Bearer', auth_header: str = 'Authorization')
53    def __init__(self, token: str, auth_method: str = "Bearer", auth_header: str = "Authorization"):
54        self._auth_header = auth_header
55        self._auth_method = auth_method
56        self._token = token
auth_header: str
45    @property
46    def auth_header(self) -> str:
47        return self._auth_header

HTTP header to set on the requests

token: str
49    @property
50    def token(self) -> str:
51        return f"{self._auth_method} {self._token}"

The header value to set on outgoing HTTP requests

15class MultipleTokenAuthenticator(AbstractHeaderAuthenticator):
16    """
17    Builds auth header, based on the list of tokens provided.
18    Auth header is changed per each `get_auth_header` call, using each token in cycle.
19    The token is attached to each request via the `auth_header` header.
20    """
21
22    @property
23    def auth_header(self) -> str:
24        return self._auth_header
25
26    @property
27    def token(self) -> str:
28        return f"{self._auth_method} {next(self._tokens_iter)}"
29
30    def __init__(
31        self, tokens: List[str], auth_method: str = "Bearer", auth_header: str = "Authorization"
32    ):
33        self._auth_method = auth_method
34        self._auth_header = auth_header
35        self._tokens = tokens
36        self._tokens_iter = cycle(self._tokens)

Builds auth header, based on the list of tokens provided. Auth header is changed per each get_auth_header call, using each token in cycle. The token is attached to each request via the auth_header header.

MultipleTokenAuthenticator( tokens: List[str], auth_method: str = 'Bearer', auth_header: str = 'Authorization')
30    def __init__(
31        self, tokens: List[str], auth_method: str = "Bearer", auth_header: str = "Authorization"
32    ):
33        self._auth_method = auth_method
34        self._auth_header = auth_header
35        self._tokens = tokens
36        self._tokens_iter = cycle(self._tokens)
auth_header: str
22    @property
23    def auth_header(self) -> str:
24        return self._auth_header

HTTP header to set on the requests

token: str
26    @property
27    def token(self) -> str:
28        return f"{self._auth_method} {next(self._tokens_iter)}"

The header value to set on outgoing HTTP requests

59class BasicHttpAuthenticator(AbstractHeaderAuthenticator):
60    """
61    Builds auth based off the basic authentication scheme as defined by RFC 7617, which transmits credentials as USER ID/password pairs, encoded using bas64
62    https://developer.mozilla.org/en-US/docs/Web/HTTP/Authentication#basic_authentication_scheme
63    """
64
65    @property
66    def auth_header(self) -> str:
67        return self._auth_header
68
69    @property
70    def token(self) -> str:
71        return f"{self._auth_method} {self._token}"
72
73    def __init__(
74        self,
75        username: str,
76        password: str = "",
77        auth_method: str = "Basic",
78        auth_header: str = "Authorization",
79    ):
80        auth_string = f"{username}:{password}".encode("utf8")
81        b64_encoded = base64.b64encode(auth_string).decode("utf8")
82        self._auth_header = auth_header
83        self._auth_method = auth_method
84        self._token = b64_encoded

Builds auth based off the basic authentication scheme as defined by RFC 7617, which transmits credentials as USER ID/password pairs, encoded using bas64 https://developer.mozilla.org/en-US/docs/Web/HTTP/Authentication#basic_authentication_scheme

BasicHttpAuthenticator( username: str, password: str = '', auth_method: str = 'Basic', auth_header: str = 'Authorization')
73    def __init__(
74        self,
75        username: str,
76        password: str = "",
77        auth_method: str = "Basic",
78        auth_header: str = "Authorization",
79    ):
80        auth_string = f"{username}:{password}".encode("utf8")
81        b64_encoded = base64.b64encode(auth_string).decode("utf8")
82        self._auth_header = auth_header
83        self._auth_method = auth_method
84        self._token = b64_encoded
auth_header: str
65    @property
66    def auth_header(self) -> str:
67        return self._auth_header

HTTP header to set on the requests

token: str
69    @property
70    def token(self) -> str:
71        return f"{self._auth_method} {self._token}"

The header value to set on outgoing HTTP requests