airbyte_cdk.sources.streams.http.requests_native_auth

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5from .oauth import Oauth2Authenticator, SingleUseRefreshTokenOauth2Authenticator
 6from .token import BasicHttpAuthenticator, MultipleTokenAuthenticator, TokenAuthenticator
 7
 8__all__ = [
 9    "Oauth2Authenticator",
10    "SingleUseRefreshTokenOauth2Authenticator",
11    "TokenAuthenticator",
12    "MultipleTokenAuthenticator",
13    "BasicHttpAuthenticator",
14]
 26class Oauth2Authenticator(AbstractOauth2Authenticator):
 27    """
 28    Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials.
 29    The generated access token is attached to each request via the Authorization header.
 30    If a connector_config is provided any mutation of it's value in the scope of this class will emit AirbyteControlConnectorConfigMessage.
 31    """
 32
 33    def __init__(
 34        self,
 35        token_refresh_endpoint: str,
 36        client_id: str,
 37        client_secret: str,
 38        refresh_token: str,
 39        client_id_name: str = "client_id",
 40        client_secret_name: str = "client_secret",
 41        refresh_token_name: str = "refresh_token",
 42        scopes: List[str] | None = None,
 43        token_expiry_date: AirbyteDateTime | None = None,
 44        token_expiry_date_format: str | None = None,
 45        access_token_name: str = "access_token",
 46        expires_in_name: str = "expires_in",
 47        refresh_request_body: Mapping[str, Any] | None = None,
 48        refresh_request_headers: Mapping[str, Any] | None = None,
 49        grant_type_name: str = "grant_type",
 50        grant_type: str = "refresh_token",
 51        token_expiry_is_time_of_expiration: bool = False,
 52        refresh_token_error_status_codes: Tuple[int, ...] = (),
 53        refresh_token_error_key: str = "",
 54        refresh_token_error_values: Tuple[str, ...] = (),
 55    ) -> None:
 56        self._token_refresh_endpoint = token_refresh_endpoint
 57        self._client_secret_name = client_secret_name
 58        self._client_secret = client_secret
 59        self._client_id_name = client_id_name
 60        self._client_id = client_id
 61        self._refresh_token_name = refresh_token_name
 62        self._refresh_token = refresh_token
 63        self._scopes = scopes
 64        self._access_token_name = access_token_name
 65        self._expires_in_name = expires_in_name
 66        self._refresh_request_body = refresh_request_body
 67        self._refresh_request_headers = refresh_request_headers
 68        self._grant_type_name = grant_type_name
 69        self._grant_type = grant_type
 70
 71        self._token_expiry_date = token_expiry_date or (ab_datetime_now() - timedelta(days=1))
 72        self._token_expiry_date_format = token_expiry_date_format
 73        self._token_expiry_is_time_of_expiration = token_expiry_is_time_of_expiration
 74        self._access_token = None
 75        super().__init__(
 76            refresh_token_error_status_codes, refresh_token_error_key, refresh_token_error_values
 77        )
 78
 79    def get_token_refresh_endpoint(self) -> str:
 80        return self._token_refresh_endpoint
 81
 82    def get_client_id_name(self) -> str:
 83        return self._client_id_name
 84
 85    def get_client_id(self) -> str:
 86        return self._client_id
 87
 88    def get_client_secret_name(self) -> str:
 89        return self._client_secret_name
 90
 91    def get_client_secret(self) -> str:
 92        return self._client_secret
 93
 94    def get_refresh_token_name(self) -> str:
 95        return self._refresh_token_name
 96
 97    def get_refresh_token(self) -> str:
 98        return self._refresh_token
 99
100    def get_access_token_name(self) -> str:
101        return self._access_token_name
102
103    def get_scopes(self) -> list[str]:
104        return self._scopes  # type: ignore[return-value]
105
106    def get_expires_in_name(self) -> str:
107        return self._expires_in_name
108
109    def get_refresh_request_body(self) -> Mapping[str, Any]:
110        return self._refresh_request_body  # type: ignore[return-value]
111
112    def get_refresh_request_headers(self) -> Mapping[str, Any]:
113        return self._refresh_request_headers  # type: ignore[return-value]
114
115    def get_grant_type_name(self) -> str:
116        return self._grant_type_name
117
118    def get_grant_type(self) -> str:
119        return self._grant_type
120
121    def get_token_expiry_date(self) -> AirbyteDateTime:
122        return self._token_expiry_date
123
124    def set_token_expiry_date(self, value: Union[str, int]) -> None:
125        self._token_expiry_date = self._parse_token_expiration_date(value)
126
127    @property
128    def token_expiry_is_time_of_expiration(self) -> bool:
129        return self._token_expiry_is_time_of_expiration
130
131    @property
132    def token_expiry_date_format(self) -> Optional[str]:
133        return self._token_expiry_date_format
134
135    @property
136    def access_token(self) -> str:
137        return self._access_token  # type: ignore[return-value]
138
139    @access_token.setter
140    def access_token(self, value: str) -> None:
141        self._access_token = value  # type: ignore[assignment]  # Incorrect type for assignment

Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials. The generated access token is attached to each request via the Authorization header. If a connector_config is provided any mutation of it's value in the scope of this class will emit AirbyteControlConnectorConfigMessage.

Oauth2Authenticator( token_refresh_endpoint: str, client_id: str, client_secret: str, refresh_token: str, client_id_name: str = 'client_id', client_secret_name: str = 'client_secret', refresh_token_name: str = 'refresh_token', scopes: Optional[List[str]] = None, token_expiry_date: airbyte_cdk.utils.datetime_helpers.AirbyteDateTime | None = None, token_expiry_date_format: str | None = None, access_token_name: str = 'access_token', expires_in_name: str = 'expires_in', refresh_request_body: Optional[Mapping[str, Any]] = None, refresh_request_headers: Optional[Mapping[str, Any]] = None, grant_type_name: str = 'grant_type', grant_type: str = 'refresh_token', token_expiry_is_time_of_expiration: bool = False, refresh_token_error_status_codes: Tuple[int, ...] = (), refresh_token_error_key: str = '', refresh_token_error_values: Tuple[str, ...] = ())
33    def __init__(
34        self,
35        token_refresh_endpoint: str,
36        client_id: str,
37        client_secret: str,
38        refresh_token: str,
39        client_id_name: str = "client_id",
40        client_secret_name: str = "client_secret",
41        refresh_token_name: str = "refresh_token",
42        scopes: List[str] | None = None,
43        token_expiry_date: AirbyteDateTime | None = None,
44        token_expiry_date_format: str | None = None,
45        access_token_name: str = "access_token",
46        expires_in_name: str = "expires_in",
47        refresh_request_body: Mapping[str, Any] | None = None,
48        refresh_request_headers: Mapping[str, Any] | None = None,
49        grant_type_name: str = "grant_type",
50        grant_type: str = "refresh_token",
51        token_expiry_is_time_of_expiration: bool = False,
52        refresh_token_error_status_codes: Tuple[int, ...] = (),
53        refresh_token_error_key: str = "",
54        refresh_token_error_values: Tuple[str, ...] = (),
55    ) -> None:
56        self._token_refresh_endpoint = token_refresh_endpoint
57        self._client_secret_name = client_secret_name
58        self._client_secret = client_secret
59        self._client_id_name = client_id_name
60        self._client_id = client_id
61        self._refresh_token_name = refresh_token_name
62        self._refresh_token = refresh_token
63        self._scopes = scopes
64        self._access_token_name = access_token_name
65        self._expires_in_name = expires_in_name
66        self._refresh_request_body = refresh_request_body
67        self._refresh_request_headers = refresh_request_headers
68        self._grant_type_name = grant_type_name
69        self._grant_type = grant_type
70
71        self._token_expiry_date = token_expiry_date or (ab_datetime_now() - timedelta(days=1))
72        self._token_expiry_date_format = token_expiry_date_format
73        self._token_expiry_is_time_of_expiration = token_expiry_is_time_of_expiration
74        self._access_token = None
75        super().__init__(
76            refresh_token_error_status_codes, refresh_token_error_key, refresh_token_error_values
77        )

If all of refresh_token_error_status_codes, refresh_token_error_key, and refresh_token_error_values are set, then http errors with such params will be wrapped in AirbyteTracedException.

def get_token_refresh_endpoint(self) -> str:
79    def get_token_refresh_endpoint(self) -> str:
80        return self._token_refresh_endpoint

Returns the endpoint to refresh the access token

def get_client_id_name(self) -> str:
82    def get_client_id_name(self) -> str:
83        return self._client_id_name

The client id name to authenticate

def get_client_id(self) -> str:
85    def get_client_id(self) -> str:
86        return self._client_id

The client id to authenticate

def get_client_secret_name(self) -> str:
88    def get_client_secret_name(self) -> str:
89        return self._client_secret_name

The client secret name to authenticate

def get_client_secret(self) -> str:
91    def get_client_secret(self) -> str:
92        return self._client_secret

The client secret to authenticate

def get_refresh_token_name(self) -> str:
94    def get_refresh_token_name(self) -> str:
95        return self._refresh_token_name

The refresh token name to authenticate

def get_refresh_token(self) -> str:
97    def get_refresh_token(self) -> str:
98        return self._refresh_token

The token used to refresh the access token when it expires

def get_access_token_name(self) -> str:
100    def get_access_token_name(self) -> str:
101        return self._access_token_name

Field to extract access token from in the response

def get_scopes(self) -> list[str]:
103    def get_scopes(self) -> list[str]:
104        return self._scopes  # type: ignore[return-value]

List of requested scopes

def get_expires_in_name(self) -> str:
106    def get_expires_in_name(self) -> str:
107        return self._expires_in_name

Returns the expires_in field name

def get_refresh_request_body(self) -> Mapping[str, Any]:
109    def get_refresh_request_body(self) -> Mapping[str, Any]:
110        return self._refresh_request_body  # type: ignore[return-value]

Returns the request body to set on the refresh request

def get_refresh_request_headers(self) -> Mapping[str, Any]:
112    def get_refresh_request_headers(self) -> Mapping[str, Any]:
113        return self._refresh_request_headers  # type: ignore[return-value]

Returns the request headers to set on the refresh request

def get_grant_type_name(self) -> str:
115    def get_grant_type_name(self) -> str:
116        return self._grant_type_name

Returns grant_type specified name for requesting access_token

def get_grant_type(self) -> str:
118    def get_grant_type(self) -> str:
119        return self._grant_type

Returns grant_type specified for requesting access_token

def get_token_expiry_date(self) -> airbyte_cdk.utils.datetime_helpers.AirbyteDateTime:
121    def get_token_expiry_date(self) -> AirbyteDateTime:
122        return self._token_expiry_date

Expiration date of the access token

def set_token_expiry_date(self, value: Union[str, int]) -> None:
124    def set_token_expiry_date(self, value: Union[str, int]) -> None:
125        self._token_expiry_date = self._parse_token_expiration_date(value)

Setter for access token expiration date

token_expiry_is_time_of_expiration: bool
127    @property
128    def token_expiry_is_time_of_expiration(self) -> bool:
129        return self._token_expiry_is_time_of_expiration

Indicates that the Token Expiry returns the date until which the token will be valid, not the amount of time it will be valid.

token_expiry_date_format: Optional[str]
131    @property
132    def token_expiry_date_format(self) -> Optional[str]:
133        return self._token_expiry_date_format

Format of the datetime; exists it if expires_in is returned as the expiration datetime instead of seconds until it expires

access_token: str
135    @property
136    def access_token(self) -> str:
137        return self._access_token  # type: ignore[return-value]

Returns the access token

class SingleUseRefreshTokenOauth2Authenticator(airbyte_cdk.sources.streams.http.requests_native_auth.Oauth2Authenticator):
144class SingleUseRefreshTokenOauth2Authenticator(Oauth2Authenticator):
145    """
146    Authenticator that should be used for API implementing single use refresh tokens:
147    when refreshing access token some API returns a new refresh token that needs to used in the next refresh flow.
148    This authenticator updates the configuration with new refresh token by emitting Airbyte control message from an observed mutation.
149    By default, this authenticator expects a connector config with a "credentials" field with the following nested fields: client_id,
150    client_secret, refresh_token. This behavior can be changed by defining custom config path (using dpath paths) in client_id_config_path,
151    client_secret_config_path, refresh_token_config_path constructor arguments.
152    """
153
154    def __init__(
155        self,
156        connector_config: Mapping[str, Any],
157        token_refresh_endpoint: str,
158        scopes: List[str] | None = None,
159        access_token_name: str = "access_token",
160        expires_in_name: str = "expires_in",
161        refresh_token_name: str = "refresh_token",
162        refresh_request_body: Mapping[str, Any] | None = None,
163        refresh_request_headers: Mapping[str, Any] | None = None,
164        grant_type_name: str = "grant_type",
165        grant_type: str = "refresh_token",
166        client_id_name: str = "client_id",
167        client_id: Optional[str] = None,
168        client_secret_name: str = "client_secret",
169        client_secret: Optional[str] = None,
170        access_token_config_path: Sequence[str] = ("credentials", "access_token"),
171        refresh_token_config_path: Sequence[str] = ("credentials", "refresh_token"),
172        token_expiry_date_config_path: Sequence[str] = ("credentials", "token_expiry_date"),
173        token_expiry_date_format: Optional[str] = None,
174        message_repository: MessageRepository = NoopMessageRepository(),
175        token_expiry_is_time_of_expiration: bool = False,
176        refresh_token_error_status_codes: Tuple[int, ...] = (),
177        refresh_token_error_key: str = "",
178        refresh_token_error_values: Tuple[str, ...] = (),
179    ) -> None:
180        """
181        Args:
182            connector_config (Mapping[str, Any]): The full connector configuration
183            token_refresh_endpoint (str): Full URL to the token refresh endpoint
184            scopes (List[str], optional): List of OAuth scopes to pass in the refresh token request body. Defaults to None.
185            access_token_name (str, optional): Name of the access token field, used to parse the refresh token response. Defaults to "access_token".
186            expires_in_name (str, optional): Name of the name of the field that characterizes when the current access token will expire, used to parse the refresh token response. Defaults to "expires_in".
187            refresh_token_name (str, optional): Name of the name of the refresh token field, used to parse the refresh token response. Defaults to "refresh_token".
188            refresh_request_body (Mapping[str, Any], optional): Custom key value pair that will be added to the refresh token request body. Defaults to None.
189            refresh_request_headers (Mapping[str, Any], optional): Custom key value pair that will be added to the refresh token request headers. Defaults to None.
190            grant_type (str, optional): OAuth grant type. Defaults to "refresh_token".
191            client_id (Optional[str]): The client id to authenticate. If not specified, defaults to credentials.client_id in the config object.
192            client_secret (Optional[str]): The client secret to authenticate. If not specified, defaults to credentials.client_secret in the config object.
193            access_token_config_path (Sequence[str]): Dpath to the access_token field in the connector configuration. Defaults to ("credentials", "access_token").
194            refresh_token_config_path (Sequence[str]): Dpath to the refresh_token field in the connector configuration. Defaults to ("credentials", "refresh_token").
195            token_expiry_date_config_path (Sequence[str]): Dpath to the token_expiry_date field in the connector configuration. Defaults to ("credentials", "token_expiry_date").
196            token_expiry_date_format (Optional[str]): Date format of the token expiry date field (set by expires_in_name). If not specified the token expiry date is interpreted as number of seconds until expiration.
197            token_expiry_is_time_of_expiration bool: set True it if expires_in is returned as time of expiration instead of the number seconds until expiration
198            message_repository (MessageRepository): the message repository used to emit logs on HTTP requests and control message on config update
199        """
200        self._connector_config = connector_config
201        self._client_id: str = self._get_config_value_by_path(
202            ("credentials", "client_id"), client_id
203        )
204        self._client_secret: str = self._get_config_value_by_path(
205            ("credentials", "client_secret"), client_secret
206        )
207        self._client_id_name = client_id_name
208        self._client_secret_name = client_secret_name
209        self._access_token_config_path = access_token_config_path
210        self._refresh_token_config_path = refresh_token_config_path
211        self._token_expiry_date_config_path = token_expiry_date_config_path
212        self._token_expiry_date_format = token_expiry_date_format
213        self._refresh_token_name = refresh_token_name
214        self._grant_type_name = grant_type_name
215        self._connector_config = connector_config
216        self.__message_repository = message_repository
217        super().__init__(
218            token_refresh_endpoint=token_refresh_endpoint,
219            client_id_name=self._client_id_name,
220            client_id=self._client_id,
221            client_secret_name=self._client_secret_name,
222            client_secret=self._client_secret,
223            refresh_token=self.get_refresh_token(),
224            refresh_token_name=self._refresh_token_name,
225            scopes=scopes,
226            token_expiry_date=self.get_token_expiry_date(),
227            access_token_name=access_token_name,
228            expires_in_name=expires_in_name,
229            refresh_request_body=refresh_request_body,
230            refresh_request_headers=refresh_request_headers,
231            grant_type_name=self._grant_type_name,
232            grant_type=grant_type,
233            token_expiry_date_format=token_expiry_date_format,
234            token_expiry_is_time_of_expiration=token_expiry_is_time_of_expiration,
235            refresh_token_error_status_codes=refresh_token_error_status_codes,
236            refresh_token_error_key=refresh_token_error_key,
237            refresh_token_error_values=refresh_token_error_values,
238        )
239
240    @property
241    def access_token(self) -> str:
242        """
243        Retrieve the access token from the configuration.
244
245        Returns:
246            str: The access token.
247        """
248        return self._get_config_value_by_path(self._access_token_config_path)  # type: ignore[return-value]
249
250    @access_token.setter
251    def access_token(self, new_access_token: str) -> None:
252        """
253        Sets a new access token.
254
255        Args:
256            new_access_token (str): The new access token to be set.
257        """
258        self._set_config_value_by_path(self._access_token_config_path, new_access_token)
259
260    def get_refresh_token(self) -> str:
261        """
262        Retrieve the refresh token from the configuration.
263
264        This method fetches the refresh token using the configuration path specified
265        by `_refresh_token_config_path`.
266
267        Returns:
268            str: The refresh token as a string.
269        """
270        return self._get_config_value_by_path(self._refresh_token_config_path)  # type: ignore[return-value]
271
272    def set_refresh_token(self, new_refresh_token: str) -> None:
273        """
274        Updates the refresh token in the configuration.
275
276        Args:
277            new_refresh_token (str): The new refresh token to be set.
278        """
279        self._set_config_value_by_path(self._refresh_token_config_path, new_refresh_token)
280
281    def get_token_expiry_date(self) -> AirbyteDateTime:
282        """
283        Retrieves the token expiry date from the configuration.
284
285        This method fetches the token expiry date from the configuration using the specified path.
286        If the expiry date is an empty string, it returns the current date and time minus one day.
287        Otherwise, it parses the expiry date string into an AirbyteDateTime object.
288
289        Returns:
290            AirbyteDateTime: The parsed or calculated token expiry date.
291
292        Raises:
293            TypeError: If the result is not an instance of AirbyteDateTime.
294        """
295        expiry_date = self._get_config_value_by_path(self._token_expiry_date_config_path)
296        result = (
297            ab_datetime_now() - timedelta(days=1)
298            if expiry_date == ""
299            else ab_datetime_parse(str(expiry_date))
300        )
301        if isinstance(result, AirbyteDateTime):
302            return result
303        raise TypeError("Invalid datetime conversion")
304
305    def set_token_expiry_date(self, new_token_expiry_date: AirbyteDateTime) -> None:  # type: ignore[override]
306        """
307        Sets the token expiry date in the configuration.
308
309        Args:
310            new_token_expiry_date (AirbyteDateTime): The new expiry date for the token.
311        """
312        self._set_config_value_by_path(
313            self._token_expiry_date_config_path, str(new_token_expiry_date)
314        )
315
316    def token_has_expired(self) -> bool:
317        """Returns True if the token is expired"""
318        return ab_datetime_now() > self.get_token_expiry_date()
319
320    @staticmethod
321    def get_new_token_expiry_date(
322        access_token_expires_in: str,
323        token_expiry_date_format: str | None = None,
324    ) -> AirbyteDateTime:
325        """
326        Calculate the new token expiry date based on the provided expiration duration or format.
327
328        Args:
329            access_token_expires_in (str): The duration (in seconds) until the access token expires, or the expiry date in a specific format.
330            token_expiry_date_format (str | None, optional): The format of the expiry date if provided. Defaults to None.
331
332        Returns:
333            AirbyteDateTime: The calculated expiry date of the access token.
334        """
335        if token_expiry_date_format:
336            return ab_datetime_parse(access_token_expires_in)
337        else:
338            return ab_datetime_now() + timedelta(seconds=int(access_token_expires_in))
339
340    def get_access_token(self) -> str:
341        """Retrieve new access and refresh token if the access token has expired.
342        The new refresh token is persisted with the set_refresh_token function
343        Returns:
344            str: The current access_token, updated if it was previously expired.
345        """
346        if self.token_has_expired():
347            new_access_token, access_token_expires_in, new_refresh_token = (
348                self.refresh_access_token()
349            )
350            new_token_expiry_date: AirbyteDateTime = self.get_new_token_expiry_date(
351                access_token_expires_in, self._token_expiry_date_format
352            )
353            self.access_token = new_access_token
354            self.set_refresh_token(new_refresh_token)
355            self.set_token_expiry_date(new_token_expiry_date)
356            self._emit_control_message()
357        return self.access_token
358
359    def refresh_access_token(self) -> Tuple[str, str, str]:  # type: ignore[override]
360        """
361        Refreshes the access token by making a handled request and extracting the necessary token information.
362
363        Returns:
364            Tuple[str, str, str]: A tuple containing the new access token, token expiry date, and refresh token.
365        """
366        response_json = self._make_handled_request()
367        return (
368            self._extract_access_token(response_json),
369            self._extract_token_expiry_date(response_json),
370            self._extract_refresh_token(response_json),
371        )
372
373    def _set_config_value_by_path(self, config_path: Union[str, Sequence[str]], value: Any) -> None:
374        """
375        Set a value in the connector configuration at the specified path.
376
377        Args:
378            config_path (Union[str, Sequence[str]]): The path within the configuration where the value should be set.
379                This can be a string representing a single key or a sequence of strings representing a nested path.
380            value (Any): The value to set at the specified path in the configuration.
381
382        Returns:
383            None
384        """
385        dpath.new(self._connector_config, config_path, value)  # type: ignore[arg-type]
386
387    def _get_config_value_by_path(
388        self, config_path: Union[str, Sequence[str]], default: Optional[str] = None
389    ) -> str | Any:
390        """
391        Retrieve a value from the connector configuration using a specified path.
392
393        Args:
394            config_path (Union[str, Sequence[str]]): The path to the desired configuration value. This can be a string or a sequence of strings.
395            default (Optional[str], optional): The default value to return if the specified path does not exist in the configuration. Defaults to None.
396
397        Returns:
398            Any: The value from the configuration at the specified path, or the default value if the path does not exist.
399        """
400        return dpath.get(
401            self._connector_config,  # type: ignore[arg-type]
402            config_path,
403            default=default if default is not None else "",
404        )
405
406    def _emit_control_message(self) -> None:
407        """
408        Emits a control message based on the connector configuration.
409
410        This method checks if the message repository is not a NoopMessageRepository.
411        If it is not, it emits a message using the message repository. Otherwise,
412        it falls back to emitting the configuration as an Airbyte control message
413        directly to the console for backward compatibility.
414
415        Note:
416            The function `emit_configuration_as_airbyte_control_message` has been deprecated
417            in favor of the package `airbyte_cdk.sources.message`.
418
419        Raises:
420            TypeError: If the argument types are incorrect.
421        """
422        # FIXME emit_configuration_as_airbyte_control_message as been deprecated in favor of package airbyte_cdk.sources.message
423        # Usually, a class shouldn't care about the implementation details but to keep backward compatibility where we print the
424        # message directly in the console, this is needed
425        if not isinstance(self._message_repository, NoopMessageRepository):
426            self._message_repository.emit_message(
427                create_connector_config_control_message(self._connector_config)  # type: ignore[arg-type]
428            )
429        else:
430            emit_configuration_as_airbyte_control_message(self._connector_config)  # type: ignore[arg-type]
431
432    @property
433    def _message_repository(self) -> MessageRepository:
434        """
435        Overriding AbstractOauth2Authenticator._message_repository to allow for HTTP request logs
436        """
437        return self.__message_repository

Authenticator that should be used for API implementing single use refresh tokens: when refreshing access token some API returns a new refresh token that needs to used in the next refresh flow. This authenticator updates the configuration with new refresh token by emitting Airbyte control message from an observed mutation. By default, this authenticator expects a connector config with a "credentials" field with the following nested fields: client_id, client_secret, refresh_token. This behavior can be changed by defining custom config path (using dpath paths) in client_id_config_path, client_secret_config_path, refresh_token_config_path constructor arguments.

SingleUseRefreshTokenOauth2Authenticator( connector_config: Mapping[str, Any], token_refresh_endpoint: str, scopes: Optional[List[str]] = None, access_token_name: str = 'access_token', expires_in_name: str = 'expires_in', refresh_token_name: str = 'refresh_token', refresh_request_body: Optional[Mapping[str, Any]] = None, refresh_request_headers: Optional[Mapping[str, Any]] = None, grant_type_name: str = 'grant_type', grant_type: str = 'refresh_token', client_id_name: str = 'client_id', client_id: Optional[str] = None, client_secret_name: str = 'client_secret', client_secret: Optional[str] = None, access_token_config_path: Sequence[str] = ('credentials', 'access_token'), refresh_token_config_path: Sequence[str] = ('credentials', 'refresh_token'), token_expiry_date_config_path: Sequence[str] = ('credentials', 'token_expiry_date'), token_expiry_date_format: Optional[str] = None, message_repository: airbyte_cdk.MessageRepository = <airbyte_cdk.sources.message.NoopMessageRepository object>, token_expiry_is_time_of_expiration: bool = False, refresh_token_error_status_codes: Tuple[int, ...] = (), refresh_token_error_key: str = '', refresh_token_error_values: Tuple[str, ...] = ())
154    def __init__(
155        self,
156        connector_config: Mapping[str, Any],
157        token_refresh_endpoint: str,
158        scopes: List[str] | None = None,
159        access_token_name: str = "access_token",
160        expires_in_name: str = "expires_in",
161        refresh_token_name: str = "refresh_token",
162        refresh_request_body: Mapping[str, Any] | None = None,
163        refresh_request_headers: Mapping[str, Any] | None = None,
164        grant_type_name: str = "grant_type",
165        grant_type: str = "refresh_token",
166        client_id_name: str = "client_id",
167        client_id: Optional[str] = None,
168        client_secret_name: str = "client_secret",
169        client_secret: Optional[str] = None,
170        access_token_config_path: Sequence[str] = ("credentials", "access_token"),
171        refresh_token_config_path: Sequence[str] = ("credentials", "refresh_token"),
172        token_expiry_date_config_path: Sequence[str] = ("credentials", "token_expiry_date"),
173        token_expiry_date_format: Optional[str] = None,
174        message_repository: MessageRepository = NoopMessageRepository(),
175        token_expiry_is_time_of_expiration: bool = False,
176        refresh_token_error_status_codes: Tuple[int, ...] = (),
177        refresh_token_error_key: str = "",
178        refresh_token_error_values: Tuple[str, ...] = (),
179    ) -> None:
180        """
181        Args:
182            connector_config (Mapping[str, Any]): The full connector configuration
183            token_refresh_endpoint (str): Full URL to the token refresh endpoint
184            scopes (List[str], optional): List of OAuth scopes to pass in the refresh token request body. Defaults to None.
185            access_token_name (str, optional): Name of the access token field, used to parse the refresh token response. Defaults to "access_token".
186            expires_in_name (str, optional): Name of the name of the field that characterizes when the current access token will expire, used to parse the refresh token response. Defaults to "expires_in".
187            refresh_token_name (str, optional): Name of the name of the refresh token field, used to parse the refresh token response. Defaults to "refresh_token".
188            refresh_request_body (Mapping[str, Any], optional): Custom key value pair that will be added to the refresh token request body. Defaults to None.
189            refresh_request_headers (Mapping[str, Any], optional): Custom key value pair that will be added to the refresh token request headers. Defaults to None.
190            grant_type (str, optional): OAuth grant type. Defaults to "refresh_token".
191            client_id (Optional[str]): The client id to authenticate. If not specified, defaults to credentials.client_id in the config object.
192            client_secret (Optional[str]): The client secret to authenticate. If not specified, defaults to credentials.client_secret in the config object.
193            access_token_config_path (Sequence[str]): Dpath to the access_token field in the connector configuration. Defaults to ("credentials", "access_token").
194            refresh_token_config_path (Sequence[str]): Dpath to the refresh_token field in the connector configuration. Defaults to ("credentials", "refresh_token").
195            token_expiry_date_config_path (Sequence[str]): Dpath to the token_expiry_date field in the connector configuration. Defaults to ("credentials", "token_expiry_date").
196            token_expiry_date_format (Optional[str]): Date format of the token expiry date field (set by expires_in_name). If not specified the token expiry date is interpreted as number of seconds until expiration.
197            token_expiry_is_time_of_expiration bool: set True it if expires_in is returned as time of expiration instead of the number seconds until expiration
198            message_repository (MessageRepository): the message repository used to emit logs on HTTP requests and control message on config update
199        """
200        self._connector_config = connector_config
201        self._client_id: str = self._get_config_value_by_path(
202            ("credentials", "client_id"), client_id
203        )
204        self._client_secret: str = self._get_config_value_by_path(
205            ("credentials", "client_secret"), client_secret
206        )
207        self._client_id_name = client_id_name
208        self._client_secret_name = client_secret_name
209        self._access_token_config_path = access_token_config_path
210        self._refresh_token_config_path = refresh_token_config_path
211        self._token_expiry_date_config_path = token_expiry_date_config_path
212        self._token_expiry_date_format = token_expiry_date_format
213        self._refresh_token_name = refresh_token_name
214        self._grant_type_name = grant_type_name
215        self._connector_config = connector_config
216        self.__message_repository = message_repository
217        super().__init__(
218            token_refresh_endpoint=token_refresh_endpoint,
219            client_id_name=self._client_id_name,
220            client_id=self._client_id,
221            client_secret_name=self._client_secret_name,
222            client_secret=self._client_secret,
223            refresh_token=self.get_refresh_token(),
224            refresh_token_name=self._refresh_token_name,
225            scopes=scopes,
226            token_expiry_date=self.get_token_expiry_date(),
227            access_token_name=access_token_name,
228            expires_in_name=expires_in_name,
229            refresh_request_body=refresh_request_body,
230            refresh_request_headers=refresh_request_headers,
231            grant_type_name=self._grant_type_name,
232            grant_type=grant_type,
233            token_expiry_date_format=token_expiry_date_format,
234            token_expiry_is_time_of_expiration=token_expiry_is_time_of_expiration,
235            refresh_token_error_status_codes=refresh_token_error_status_codes,
236            refresh_token_error_key=refresh_token_error_key,
237            refresh_token_error_values=refresh_token_error_values,
238        )
Arguments:
  • connector_config (Mapping[str, Any]): The full connector configuration
  • token_refresh_endpoint (str): Full URL to the token refresh endpoint
  • scopes (List[str], optional): List of OAuth scopes to pass in the refresh token request body. Defaults to None.
  • access_token_name (str, optional): Name of the access token field, used to parse the refresh token response. Defaults to "access_token".
  • expires_in_name (str, optional): Name of the name of the field that characterizes when the current access token will expire, used to parse the refresh token response. Defaults to "expires_in".
  • refresh_token_name (str, optional): Name of the name of the refresh token field, used to parse the refresh token response. Defaults to "refresh_token".
  • refresh_request_body (Mapping[str, Any], optional): Custom key value pair that will be added to the refresh token request body. Defaults to None.
  • refresh_request_headers (Mapping[str, Any], optional): Custom key value pair that will be added to the refresh token request headers. Defaults to None.
  • grant_type (str, optional): OAuth grant type. Defaults to "refresh_token".
  • client_id (Optional[str]): The client id to authenticate. If not specified, defaults to credentials.client_id in the config object.
  • client_secret (Optional[str]): The client secret to authenticate. If not specified, defaults to credentials.client_secret in the config object.
  • access_token_config_path (Sequence[str]): Dpath to the access_token field in the connector configuration. Defaults to ("credentials", "access_token").
  • refresh_token_config_path (Sequence[str]): Dpath to the refresh_token field in the connector configuration. Defaults to ("credentials", "refresh_token").
  • token_expiry_date_config_path (Sequence[str]): Dpath to the token_expiry_date field in the connector configuration. Defaults to ("credentials", "token_expiry_date").
  • token_expiry_date_format (Optional[str]): Date format of the token expiry date field (set by expires_in_name). If not specified the token expiry date is interpreted as number of seconds until expiration.
  • token_expiry_is_time_of_expiration bool: set True it if expires_in is returned as time of expiration instead of the number seconds until expiration
  • message_repository (MessageRepository): the message repository used to emit logs on HTTP requests and control message on config update
access_token: str
240    @property
241    def access_token(self) -> str:
242        """
243        Retrieve the access token from the configuration.
244
245        Returns:
246            str: The access token.
247        """
248        return self._get_config_value_by_path(self._access_token_config_path)  # type: ignore[return-value]

Retrieve the access token from the configuration.

Returns:

str: The access token.

def get_refresh_token(self) -> str:
260    def get_refresh_token(self) -> str:
261        """
262        Retrieve the refresh token from the configuration.
263
264        This method fetches the refresh token using the configuration path specified
265        by `_refresh_token_config_path`.
266
267        Returns:
268            str: The refresh token as a string.
269        """
270        return self._get_config_value_by_path(self._refresh_token_config_path)  # type: ignore[return-value]

Retrieve the refresh token from the configuration.

This method fetches the refresh token using the configuration path specified by _refresh_token_config_path.

Returns:

str: The refresh token as a string.

def set_refresh_token(self, new_refresh_token: str) -> None:
272    def set_refresh_token(self, new_refresh_token: str) -> None:
273        """
274        Updates the refresh token in the configuration.
275
276        Args:
277            new_refresh_token (str): The new refresh token to be set.
278        """
279        self._set_config_value_by_path(self._refresh_token_config_path, new_refresh_token)

Updates the refresh token in the configuration.

Arguments:
  • new_refresh_token (str): The new refresh token to be set.
def get_token_expiry_date(self) -> airbyte_cdk.utils.datetime_helpers.AirbyteDateTime:
281    def get_token_expiry_date(self) -> AirbyteDateTime:
282        """
283        Retrieves the token expiry date from the configuration.
284
285        This method fetches the token expiry date from the configuration using the specified path.
286        If the expiry date is an empty string, it returns the current date and time minus one day.
287        Otherwise, it parses the expiry date string into an AirbyteDateTime object.
288
289        Returns:
290            AirbyteDateTime: The parsed or calculated token expiry date.
291
292        Raises:
293            TypeError: If the result is not an instance of AirbyteDateTime.
294        """
295        expiry_date = self._get_config_value_by_path(self._token_expiry_date_config_path)
296        result = (
297            ab_datetime_now() - timedelta(days=1)
298            if expiry_date == ""
299            else ab_datetime_parse(str(expiry_date))
300        )
301        if isinstance(result, AirbyteDateTime):
302            return result
303        raise TypeError("Invalid datetime conversion")

Retrieves the token expiry date from the configuration.

This method fetches the token expiry date from the configuration using the specified path. If the expiry date is an empty string, it returns the current date and time minus one day. Otherwise, it parses the expiry date string into an AirbyteDateTime object.

Returns:

AirbyteDateTime: The parsed or calculated token expiry date.

Raises:
  • TypeError: If the result is not an instance of AirbyteDateTime.
def set_token_expiry_date( self, new_token_expiry_date: airbyte_cdk.utils.datetime_helpers.AirbyteDateTime) -> None:
305    def set_token_expiry_date(self, new_token_expiry_date: AirbyteDateTime) -> None:  # type: ignore[override]
306        """
307        Sets the token expiry date in the configuration.
308
309        Args:
310            new_token_expiry_date (AirbyteDateTime): The new expiry date for the token.
311        """
312        self._set_config_value_by_path(
313            self._token_expiry_date_config_path, str(new_token_expiry_date)
314        )

Sets the token expiry date in the configuration.

Arguments:
  • new_token_expiry_date (AirbyteDateTime): The new expiry date for the token.
def token_has_expired(self) -> bool:
316    def token_has_expired(self) -> bool:
317        """Returns True if the token is expired"""
318        return ab_datetime_now() > self.get_token_expiry_date()

Returns True if the token is expired

@staticmethod
def get_new_token_expiry_date( access_token_expires_in: str, token_expiry_date_format: str | None = None) -> airbyte_cdk.utils.datetime_helpers.AirbyteDateTime:
320    @staticmethod
321    def get_new_token_expiry_date(
322        access_token_expires_in: str,
323        token_expiry_date_format: str | None = None,
324    ) -> AirbyteDateTime:
325        """
326        Calculate the new token expiry date based on the provided expiration duration or format.
327
328        Args:
329            access_token_expires_in (str): The duration (in seconds) until the access token expires, or the expiry date in a specific format.
330            token_expiry_date_format (str | None, optional): The format of the expiry date if provided. Defaults to None.
331
332        Returns:
333            AirbyteDateTime: The calculated expiry date of the access token.
334        """
335        if token_expiry_date_format:
336            return ab_datetime_parse(access_token_expires_in)
337        else:
338            return ab_datetime_now() + timedelta(seconds=int(access_token_expires_in))

Calculate the new token expiry date based on the provided expiration duration or format.

Arguments:
  • access_token_expires_in (str): The duration (in seconds) until the access token expires, or the expiry date in a specific format.
  • token_expiry_date_format (str | None, optional): The format of the expiry date if provided. Defaults to None.
Returns:

AirbyteDateTime: The calculated expiry date of the access token.

def get_access_token(self) -> str:
340    def get_access_token(self) -> str:
341        """Retrieve new access and refresh token if the access token has expired.
342        The new refresh token is persisted with the set_refresh_token function
343        Returns:
344            str: The current access_token, updated if it was previously expired.
345        """
346        if self.token_has_expired():
347            new_access_token, access_token_expires_in, new_refresh_token = (
348                self.refresh_access_token()
349            )
350            new_token_expiry_date: AirbyteDateTime = self.get_new_token_expiry_date(
351                access_token_expires_in, self._token_expiry_date_format
352            )
353            self.access_token = new_access_token
354            self.set_refresh_token(new_refresh_token)
355            self.set_token_expiry_date(new_token_expiry_date)
356            self._emit_control_message()
357        return self.access_token

Retrieve new access and refresh token if the access token has expired. The new refresh token is persisted with the set_refresh_token function

Returns:

str: The current access_token, updated if it was previously expired.

def refresh_access_token(self) -> Tuple[str, str, str]:
359    def refresh_access_token(self) -> Tuple[str, str, str]:  # type: ignore[override]
360        """
361        Refreshes the access token by making a handled request and extracting the necessary token information.
362
363        Returns:
364            Tuple[str, str, str]: A tuple containing the new access token, token expiry date, and refresh token.
365        """
366        response_json = self._make_handled_request()
367        return (
368            self._extract_access_token(response_json),
369            self._extract_token_expiry_date(response_json),
370            self._extract_refresh_token(response_json),
371        )

Refreshes the access token by making a handled request and extracting the necessary token information.

Returns:

Tuple[str, str, str]: A tuple containing the new access token, token expiry date, and refresh token.

39class TokenAuthenticator(AbstractHeaderAuthenticator):
40    """
41    Builds auth header, based on the token provided.
42    The token is attached to each request via the `auth_header` header.
43    """
44
45    @property
46    def auth_header(self) -> str:
47        return self._auth_header
48
49    @property
50    def token(self) -> str:
51        return f"{self._auth_method} {self._token}"
52
53    def __init__(self, token: str, auth_method: str = "Bearer", auth_header: str = "Authorization"):
54        self._auth_header = auth_header
55        self._auth_method = auth_method
56        self._token = token

Builds auth header, based on the token provided. The token is attached to each request via the auth_header header.

TokenAuthenticator( token: str, auth_method: str = 'Bearer', auth_header: str = 'Authorization')
53    def __init__(self, token: str, auth_method: str = "Bearer", auth_header: str = "Authorization"):
54        self._auth_header = auth_header
55        self._auth_method = auth_method
56        self._token = token
auth_header: str
45    @property
46    def auth_header(self) -> str:
47        return self._auth_header

HTTP header to set on the requests

token: str
49    @property
50    def token(self) -> str:
51        return f"{self._auth_method} {self._token}"

The header value to set on outgoing HTTP requests

15class MultipleTokenAuthenticator(AbstractHeaderAuthenticator):
16    """
17    Builds auth header, based on the list of tokens provided.
18    Auth header is changed per each `get_auth_header` call, using each token in cycle.
19    The token is attached to each request via the `auth_header` header.
20    """
21
22    @property
23    def auth_header(self) -> str:
24        return self._auth_header
25
26    @property
27    def token(self) -> str:
28        return f"{self._auth_method} {next(self._tokens_iter)}"
29
30    def __init__(
31        self, tokens: List[str], auth_method: str = "Bearer", auth_header: str = "Authorization"
32    ):
33        self._auth_method = auth_method
34        self._auth_header = auth_header
35        self._tokens = tokens
36        self._tokens_iter = cycle(self._tokens)

Builds auth header, based on the list of tokens provided. Auth header is changed per each get_auth_header call, using each token in cycle. The token is attached to each request via the auth_header header.

MultipleTokenAuthenticator( tokens: List[str], auth_method: str = 'Bearer', auth_header: str = 'Authorization')
30    def __init__(
31        self, tokens: List[str], auth_method: str = "Bearer", auth_header: str = "Authorization"
32    ):
33        self._auth_method = auth_method
34        self._auth_header = auth_header
35        self._tokens = tokens
36        self._tokens_iter = cycle(self._tokens)
auth_header: str
22    @property
23    def auth_header(self) -> str:
24        return self._auth_header

HTTP header to set on the requests

token: str
26    @property
27    def token(self) -> str:
28        return f"{self._auth_method} {next(self._tokens_iter)}"

The header value to set on outgoing HTTP requests

59class BasicHttpAuthenticator(AbstractHeaderAuthenticator):
60    """
61    Builds auth based off the basic authentication scheme as defined by RFC 7617, which transmits credentials as USER ID/password pairs, encoded using bas64
62    https://developer.mozilla.org/en-US/docs/Web/HTTP/Authentication#basic_authentication_scheme
63    """
64
65    @property
66    def auth_header(self) -> str:
67        return self._auth_header
68
69    @property
70    def token(self) -> str:
71        return f"{self._auth_method} {self._token}"
72
73    def __init__(
74        self,
75        username: str,
76        password: str = "",
77        auth_method: str = "Basic",
78        auth_header: str = "Authorization",
79    ):
80        auth_string = f"{username}:{password}".encode("utf8")
81        b64_encoded = base64.b64encode(auth_string).decode("utf8")
82        self._auth_header = auth_header
83        self._auth_method = auth_method
84        self._token = b64_encoded

Builds auth based off the basic authentication scheme as defined by RFC 7617, which transmits credentials as USER ID/password pairs, encoded using bas64 https://developer.mozilla.org/en-US/docs/Web/HTTP/Authentication#basic_authentication_scheme

BasicHttpAuthenticator( username: str, password: str = '', auth_method: str = 'Basic', auth_header: str = 'Authorization')
73    def __init__(
74        self,
75        username: str,
76        password: str = "",
77        auth_method: str = "Basic",
78        auth_header: str = "Authorization",
79    ):
80        auth_string = f"{username}:{password}".encode("utf8")
81        b64_encoded = base64.b64encode(auth_string).decode("utf8")
82        self._auth_header = auth_header
83        self._auth_method = auth_method
84        self._token = b64_encoded
auth_header: str
65    @property
66    def auth_header(self) -> str:
67        return self._auth_header

HTTP header to set on the requests

token: str
69    @property
70    def token(self) -> str:
71        return f"{self._auth_method} {self._token}"

The header value to set on outgoing HTTP requests