airbyte_cdk.sources.declarative.auth

1#
2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3#
4
5from airbyte_cdk.sources.declarative.auth.jwt import JwtAuthenticator
6from airbyte_cdk.sources.declarative.auth.oauth import DeclarativeOauth2Authenticator
7
8__all__ = ["DeclarativeOauth2Authenticator", "JwtAuthenticator"]
 27@dataclass
 28class DeclarativeOauth2Authenticator(AbstractOauth2Authenticator, DeclarativeAuthenticator):
 29    """
 30    Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials based on
 31    a declarative connector configuration file. Credentials can be defined explicitly or via interpolation
 32    at runtime. The generated access token is attached to each request via the Authorization header.
 33
 34    Attributes:
 35        token_refresh_endpoint (Union[InterpolatedString, str]): The endpoint to refresh the access token
 36        client_id (Union[InterpolatedString, str]): The client id
 37        client_secret (Union[InterpolatedString, str]): Client secret (can be empty for APIs that support this)
 38        refresh_token (Union[InterpolatedString, str]): The token used to refresh the access token
 39        access_token_name (Union[InterpolatedString, str]): THe field to extract access token from in the response
 40        expires_in_name (Union[InterpolatedString, str]): The field to extract expires_in from in the response
 41        config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
 42        scopes (Optional[List[str]]): The scopes to request
 43        token_expiry_date (Optional[Union[InterpolatedString, str]]): The access token expiration date
 44        token_expiry_date_format str: format of the datetime; provide it if expires_in is returned in datetime instead of seconds
 45        token_expiry_is_time_of_expiration bool: set True it if expires_in is returned as time of expiration instead of the number seconds until expiration
 46        refresh_request_body (Optional[Mapping[str, Any]]): The request body to send in the refresh request
 47        refresh_request_headers (Optional[Mapping[str, Any]]): The request headers to send in the refresh request
 48        grant_type: The grant_type to request for access_token. If set to refresh_token, the refresh_token parameter has to be provided
 49        message_repository (MessageRepository): the message repository used to emit logs on HTTP requests
 50        refresh_token_error_status_codes (Tuple[int, ...]): Status codes to identify refresh token errors in response
 51        refresh_token_error_key (str): Key to identify refresh token error in response
 52        refresh_token_error_values (Tuple[str, ...]): List of values to check for exception during token refresh process
 53    """
 54
 55    config: Mapping[str, Any]
 56    parameters: InitVar[Mapping[str, Any]]
 57    client_id: Optional[Union[InterpolatedString, str]] = None
 58    client_secret: Optional[Union[InterpolatedString, str]] = None
 59    token_refresh_endpoint: Optional[Union[InterpolatedString, str]] = None
 60    refresh_token: Optional[Union[InterpolatedString, str]] = None
 61    scopes: Optional[List[str]] = None
 62    token_expiry_date: Optional[Union[InterpolatedString, str]] = None
 63    _token_expiry_date: Optional[AirbyteDateTime] = field(init=False, repr=False, default=None)
 64    token_expiry_date_format: Optional[str] = None
 65    token_expiry_is_time_of_expiration: bool = False
 66    access_token_name: Union[InterpolatedString, str] = "access_token"
 67    access_token_value: Optional[Union[InterpolatedString, str]] = None
 68    client_id_name: Union[InterpolatedString, str] = "client_id"
 69    client_secret_name: Union[InterpolatedString, str] = "client_secret"
 70    expires_in_name: Union[InterpolatedString, str] = "expires_in"
 71    refresh_token_name: Union[InterpolatedString, str] = "refresh_token"
 72    refresh_request_body: Optional[Mapping[str, Any]] = None
 73    refresh_request_headers: Optional[Mapping[str, Any]] = None
 74    grant_type_name: Union[InterpolatedString, str] = "grant_type"
 75    grant_type: Union[InterpolatedString, str] = "refresh_token"
 76    message_repository: MessageRepository = NoopMessageRepository()
 77    profile_assertion: Optional[DeclarativeAuthenticator] = None
 78    use_profile_assertion: Optional[Union[InterpolatedBoolean, str, bool]] = False
 79    refresh_token_error_status_codes: Tuple[int, ...] = ()
 80    refresh_token_error_key: str = ""
 81    refresh_token_error_values: Tuple[str, ...] = ()
 82
 83    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 84        super().__init__(
 85            refresh_token_error_status_codes=self.refresh_token_error_status_codes,
 86            refresh_token_error_key=self.refresh_token_error_key,
 87            refresh_token_error_values=self.refresh_token_error_values,
 88        )
 89        if self.token_refresh_endpoint is not None:
 90            self._token_refresh_endpoint: Optional[InterpolatedString] = InterpolatedString.create(
 91                self.token_refresh_endpoint, parameters=parameters
 92            )
 93        else:
 94            self._token_refresh_endpoint = None
 95        self._client_id_name = InterpolatedString.create(self.client_id_name, parameters=parameters)
 96        self._client_id = (
 97            InterpolatedString.create(self.client_id, parameters=parameters)
 98            if self.client_id
 99            else self.client_id
100        )
101        self._client_secret_name = InterpolatedString.create(
102            self.client_secret_name, parameters=parameters
103        )
104        self._client_secret = (
105            InterpolatedString.create(self.client_secret, parameters=parameters)
106            if self.client_secret
107            else self.client_secret
108        )
109        self._refresh_token_name = InterpolatedString.create(
110            self.refresh_token_name, parameters=parameters
111        )
112        if self.refresh_token is not None:
113            self._refresh_token: Optional[InterpolatedString] = InterpolatedString.create(
114                self.refresh_token, parameters=parameters
115            )
116        else:
117            self._refresh_token = None
118        self.access_token_name = InterpolatedString.create(
119            self.access_token_name, parameters=parameters
120        )
121        self.expires_in_name = InterpolatedString.create(
122            self.expires_in_name, parameters=parameters
123        )
124        self.grant_type_name = InterpolatedString.create(
125            self.grant_type_name, parameters=parameters
126        )
127        self.grant_type = InterpolatedString.create(
128            "urn:ietf:params:oauth:grant-type:jwt-bearer"
129            if self.use_profile_assertion
130            else self.grant_type,
131            parameters=parameters,
132        )
133        self._refresh_request_body = InterpolatedMapping(
134            self.refresh_request_body or {}, parameters=parameters
135        )
136        self._refresh_request_headers = InterpolatedMapping(
137            self.refresh_request_headers or {}, parameters=parameters
138        )
139        try:
140            if (
141                isinstance(self.token_expiry_date, (int, str))
142                and str(self.token_expiry_date).isdigit()
143            ):
144                self._token_expiry_date = ab_datetime_parse(self.token_expiry_date)
145            else:
146                self._token_expiry_date = (
147                    ab_datetime_parse(
148                        InterpolatedString.create(
149                            self.token_expiry_date, parameters=parameters
150                        ).eval(self.config)
151                    )
152                    if self.token_expiry_date
153                    else ab_datetime_now() - timedelta(days=1)
154                )
155        except ValueError as e:
156            raise ValueError(f"Invalid token expiry date format: {e}")
157        self.use_profile_assertion = (
158            InterpolatedBoolean(self.use_profile_assertion, parameters=parameters)
159            if isinstance(self.use_profile_assertion, str)
160            else self.use_profile_assertion
161        )
162        self.assertion_name = "assertion"
163
164        if self.access_token_value is not None:
165            self._access_token_value = InterpolatedString.create(
166                self.access_token_value, parameters=parameters
167            ).eval(self.config)
168        else:
169            self._access_token_value = None
170
171        self._access_token: Optional[str] = (
172            self._access_token_value if self.access_token_value else None
173        )
174
175        if not self.use_profile_assertion and any(
176            client_creds is None for client_creds in [self.client_id, self.client_secret]
177        ):
178            raise ValueError(
179                "OAuthAuthenticator configuration error: Both 'client_id' and 'client_secret' are required for the "
180                "basic OAuth flow."
181            )
182        if self.profile_assertion is None and self.use_profile_assertion:
183            raise ValueError(
184                "OAuthAuthenticator configuration error: 'profile_assertion' is required when using the profile assertion flow."
185            )
186        if self.get_grant_type() == "refresh_token" and self._refresh_token is None:
187            raise ValueError(
188                "OAuthAuthenticator configuration error: A 'refresh_token' is required when the 'grant_type' is set to 'refresh_token'."
189            )
190
191    def get_token_refresh_endpoint(self) -> Optional[str]:
192        if self._token_refresh_endpoint is not None:
193            refresh_token_endpoint: str = self._token_refresh_endpoint.eval(self.config)
194            if not refresh_token_endpoint:
195                raise ValueError(
196                    "OAuthAuthenticator was unable to evaluate token_refresh_endpoint parameter"
197                )
198            return refresh_token_endpoint
199        return None
200
201    def get_client_id_name(self) -> str:
202        return self._client_id_name.eval(self.config)  # type: ignore # eval returns a string in this context
203
204    def get_client_id(self) -> str:
205        client_id = self._client_id.eval(self.config) if self._client_id else self._client_id
206        if not client_id:
207            raise ValueError("OAuthAuthenticator was unable to evaluate client_id parameter")
208        return client_id  # type: ignore # value will be returned as a string, or an error will be raised
209
210    def get_client_secret_name(self) -> str:
211        return self._client_secret_name.eval(self.config)  # type: ignore # eval returns a string in this context
212
213    def get_client_secret(self) -> str:
214        client_secret = (
215            self._client_secret.eval(self.config) if self._client_secret else self._client_secret
216        )
217        if not client_secret:
218            # We've seen some APIs allowing empty client_secret so we will only log here
219            logger.warning(
220                "OAuthAuthenticator was unable to evaluate client_secret parameter hence it'll be empty"
221            )
222        return client_secret  # type: ignore # value will be returned as a string, which might be empty
223
224    def get_refresh_token_name(self) -> str:
225        return self._refresh_token_name.eval(self.config)  # type: ignore # eval returns a string in this context
226
227    def get_refresh_token(self) -> Optional[str]:
228        return None if self._refresh_token is None else str(self._refresh_token.eval(self.config))
229
230    def get_scopes(self) -> List[str]:
231        return self.scopes or []
232
233    def get_access_token_name(self) -> str:
234        return self.access_token_name.eval(self.config)  # type: ignore # eval returns a string in this context
235
236    def get_expires_in_name(self) -> str:
237        return self.expires_in_name.eval(self.config)  # type: ignore # eval returns a string in this context
238
239    def get_grant_type_name(self) -> str:
240        return self.grant_type_name.eval(self.config)  # type: ignore # eval returns a string in this context
241
242    def get_grant_type(self) -> str:
243        return self.grant_type.eval(self.config)  # type: ignore # eval returns a string in this context
244
245    def get_refresh_request_body(self) -> Mapping[str, Any]:
246        return self._refresh_request_body.eval(self.config)
247
248    def get_refresh_request_headers(self) -> Mapping[str, Any]:
249        return self._refresh_request_headers.eval(self.config)
250
251    def get_token_expiry_date(self) -> AirbyteDateTime:
252        if not self._has_access_token_been_initialized():
253            return AirbyteDateTime.from_datetime(datetime.min)
254        return self._token_expiry_date  # type: ignore # _token_expiry_date is an AirbyteDateTime. It is never None despite what mypy thinks
255
256    def _has_access_token_been_initialized(self) -> bool:
257        return self._access_token is not None
258
259    def set_token_expiry_date(self, value: AirbyteDateTime) -> None:
260        self._token_expiry_date = value
261
262    def get_assertion_name(self) -> str:
263        return self.assertion_name
264
265    def get_assertion(self) -> str:
266        if self.profile_assertion is None:
267            raise ValueError("profile_assertion is not set")
268        return self.profile_assertion.token
269
270    def build_refresh_request_body(self) -> Mapping[str, Any]:
271        """
272        Returns the request body to set on the refresh request
273
274        Override to define additional parameters
275        """
276        if self.use_profile_assertion:
277            return {
278                self.get_grant_type_name(): self.get_grant_type(),
279                self.get_assertion_name(): self.get_assertion(),
280            }
281        return super().build_refresh_request_body()
282
283    @property
284    def access_token(self) -> str:
285        if self._access_token is None:
286            raise ValueError("access_token is not set")
287        return self._access_token
288
289    @access_token.setter
290    def access_token(self, value: str) -> None:
291        self._access_token = value
292
293    @property
294    def _message_repository(self) -> MessageRepository:
295        """
296        Overriding AbstractOauth2Authenticator._message_repository to allow for HTTP request logs
297        """
298        return self.message_repository

Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials based on a declarative connector configuration file. Credentials can be defined explicitly or via interpolation at runtime. The generated access token is attached to each request via the Authorization header.

Attributes:
  • token_refresh_endpoint (Union[InterpolatedString, str]): The endpoint to refresh the access token
  • client_id (Union[InterpolatedString, str]): The client id
  • client_secret (Union[InterpolatedString, str]): Client secret (can be empty for APIs that support this)
  • refresh_token (Union[InterpolatedString, str]): The token used to refresh the access token
  • access_token_name (Union[InterpolatedString, str]): THe field to extract access token from in the response
  • expires_in_name (Union[InterpolatedString, str]): The field to extract expires_in from in the response
  • config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
  • scopes (Optional[List[str]]): The scopes to request
  • token_expiry_date (Optional[Union[InterpolatedString, str]]): The access token expiration date
  • token_expiry_date_format str: format of the datetime; provide it if expires_in is returned in datetime instead of seconds
  • token_expiry_is_time_of_expiration bool: set True it if expires_in is returned as time of expiration instead of the number seconds until expiration
  • refresh_request_body (Optional[Mapping[str, Any]]): The request body to send in the refresh request
  • refresh_request_headers (Optional[Mapping[str, Any]]): The request headers to send in the refresh request
  • grant_type: The grant_type to request for access_token. If set to refresh_token, the refresh_token parameter has to be provided
  • message_repository (MessageRepository): the message repository used to emit logs on HTTP requests
  • refresh_token_error_status_codes (Tuple[int, ...]): Status codes to identify refresh token errors in response
  • refresh_token_error_key (str): Key to identify refresh token error in response
  • refresh_token_error_values (Tuple[str, ...]): List of values to check for exception during token refresh process
DeclarativeOauth2Authenticator( config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], client_id: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, client_secret: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, token_refresh_endpoint: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, refresh_token: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, scopes: Optional[List[str]] = None, token_expiry_date: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, token_expiry_date_format: Optional[str] = None, token_expiry_is_time_of_expiration: bool = False, access_token_name: Union[airbyte_cdk.InterpolatedString, str] = 'access_token', access_token_value: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, client_id_name: Union[airbyte_cdk.InterpolatedString, str] = 'client_id', client_secret_name: Union[airbyte_cdk.InterpolatedString, str] = 'client_secret', expires_in_name: Union[airbyte_cdk.InterpolatedString, str] = 'expires_in', refresh_token_name: Union[airbyte_cdk.InterpolatedString, str] = 'refresh_token', refresh_request_body: Optional[Mapping[str, Any]] = None, refresh_request_headers: Optional[Mapping[str, Any]] = None, grant_type_name: Union[airbyte_cdk.InterpolatedString, str] = 'grant_type', grant_type: Union[airbyte_cdk.InterpolatedString, str] = 'refresh_token', message_repository: airbyte_cdk.MessageRepository = <airbyte_cdk.sources.message.NoopMessageRepository object>, profile_assertion: Optional[airbyte_cdk.DeclarativeAuthenticator] = None, use_profile_assertion: Union[airbyte_cdk.InterpolatedBoolean, str, bool, NoneType] = False, refresh_token_error_status_codes: Tuple[int, ...] = (), refresh_token_error_key: str = '', refresh_token_error_values: Tuple[str, ...] = ())

If all of refresh_token_error_status_codes, refresh_token_error_key, and refresh_token_error_values are set, then http errors with such params will be wrapped in AirbyteTracedException.

config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
client_id: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
client_secret: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
token_refresh_endpoint: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
refresh_token: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
scopes: Optional[List[str]] = None
token_expiry_date: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
token_expiry_date_format: Optional[str] = None

Format of the datetime; exists it if expires_in is returned as the expiration datetime instead of seconds until it expires

token_expiry_is_time_of_expiration: bool = False

Indicates that the Token Expiry returns the date until which the token will be valid, not the amount of time it will be valid.

access_token_name: Union[airbyte_cdk.InterpolatedString, str] = 'access_token'
access_token_value: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
client_id_name: Union[airbyte_cdk.InterpolatedString, str] = 'client_id'
client_secret_name: Union[airbyte_cdk.InterpolatedString, str] = 'client_secret'
expires_in_name: Union[airbyte_cdk.InterpolatedString, str] = 'expires_in'
refresh_token_name: Union[airbyte_cdk.InterpolatedString, str] = 'refresh_token'
refresh_request_body: Optional[Mapping[str, Any]] = None
refresh_request_headers: Optional[Mapping[str, Any]] = None
grant_type_name: Union[airbyte_cdk.InterpolatedString, str] = 'grant_type'
grant_type: Union[airbyte_cdk.InterpolatedString, str] = 'refresh_token'
profile_assertion: Optional[airbyte_cdk.DeclarativeAuthenticator] = None
use_profile_assertion: Union[airbyte_cdk.InterpolatedBoolean, str, bool, NoneType] = False
refresh_token_error_status_codes: Tuple[int, ...] = ()
refresh_token_error_key: str = ''
refresh_token_error_values: Tuple[str, ...] = ()
def get_token_refresh_endpoint(self) -> Optional[str]:
191    def get_token_refresh_endpoint(self) -> Optional[str]:
192        if self._token_refresh_endpoint is not None:
193            refresh_token_endpoint: str = self._token_refresh_endpoint.eval(self.config)
194            if not refresh_token_endpoint:
195                raise ValueError(
196                    "OAuthAuthenticator was unable to evaluate token_refresh_endpoint parameter"
197                )
198            return refresh_token_endpoint
199        return None

Returns the endpoint to refresh the access token

def get_client_id_name(self) -> str:
201    def get_client_id_name(self) -> str:
202        return self._client_id_name.eval(self.config)  # type: ignore # eval returns a string in this context

The client id name to authenticate

def get_client_id(self) -> str:
204    def get_client_id(self) -> str:
205        client_id = self._client_id.eval(self.config) if self._client_id else self._client_id
206        if not client_id:
207            raise ValueError("OAuthAuthenticator was unable to evaluate client_id parameter")
208        return client_id  # type: ignore # value will be returned as a string, or an error will be raised

The client id to authenticate

def get_client_secret_name(self) -> str:
210    def get_client_secret_name(self) -> str:
211        return self._client_secret_name.eval(self.config)  # type: ignore # eval returns a string in this context

The client secret name to authenticate

def get_client_secret(self) -> str:
213    def get_client_secret(self) -> str:
214        client_secret = (
215            self._client_secret.eval(self.config) if self._client_secret else self._client_secret
216        )
217        if not client_secret:
218            # We've seen some APIs allowing empty client_secret so we will only log here
219            logger.warning(
220                "OAuthAuthenticator was unable to evaluate client_secret parameter hence it'll be empty"
221            )
222        return client_secret  # type: ignore # value will be returned as a string, which might be empty

The client secret to authenticate

def get_refresh_token_name(self) -> str:
224    def get_refresh_token_name(self) -> str:
225        return self._refresh_token_name.eval(self.config)  # type: ignore # eval returns a string in this context

The refresh token name to authenticate

def get_refresh_token(self) -> Optional[str]:
227    def get_refresh_token(self) -> Optional[str]:
228        return None if self._refresh_token is None else str(self._refresh_token.eval(self.config))

The token used to refresh the access token when it expires

def get_scopes(self) -> List[str]:
230    def get_scopes(self) -> List[str]:
231        return self.scopes or []

List of requested scopes

def get_access_token_name(self) -> str:
233    def get_access_token_name(self) -> str:
234        return self.access_token_name.eval(self.config)  # type: ignore # eval returns a string in this context

Field to extract access token from in the response

def get_expires_in_name(self) -> str:
236    def get_expires_in_name(self) -> str:
237        return self.expires_in_name.eval(self.config)  # type: ignore # eval returns a string in this context

Returns the expires_in field name

def get_grant_type_name(self) -> str:
239    def get_grant_type_name(self) -> str:
240        return self.grant_type_name.eval(self.config)  # type: ignore # eval returns a string in this context

Returns grant_type specified name for requesting access_token

def get_grant_type(self) -> str:
242    def get_grant_type(self) -> str:
243        return self.grant_type.eval(self.config)  # type: ignore # eval returns a string in this context

Returns grant_type specified for requesting access_token

def get_refresh_request_body(self) -> Mapping[str, Any]:
245    def get_refresh_request_body(self) -> Mapping[str, Any]:
246        return self._refresh_request_body.eval(self.config)

Returns the request body to set on the refresh request

def get_refresh_request_headers(self) -> Mapping[str, Any]:
248    def get_refresh_request_headers(self) -> Mapping[str, Any]:
249        return self._refresh_request_headers.eval(self.config)

Returns the request headers to set on the refresh request

def get_token_expiry_date(self) -> airbyte_cdk.utils.datetime_helpers.AirbyteDateTime:
251    def get_token_expiry_date(self) -> AirbyteDateTime:
252        if not self._has_access_token_been_initialized():
253            return AirbyteDateTime.from_datetime(datetime.min)
254        return self._token_expiry_date  # type: ignore # _token_expiry_date is an AirbyteDateTime. It is never None despite what mypy thinks

Expiration date of the access token

def set_token_expiry_date(self, value: airbyte_cdk.utils.datetime_helpers.AirbyteDateTime) -> None:
259    def set_token_expiry_date(self, value: AirbyteDateTime) -> None:
260        self._token_expiry_date = value

Setter for access token expiration date

def get_assertion_name(self) -> str:
262    def get_assertion_name(self) -> str:
263        return self.assertion_name
def get_assertion(self) -> str:
265    def get_assertion(self) -> str:
266        if self.profile_assertion is None:
267            raise ValueError("profile_assertion is not set")
268        return self.profile_assertion.token
def build_refresh_request_body(self) -> Mapping[str, Any]:
270    def build_refresh_request_body(self) -> Mapping[str, Any]:
271        """
272        Returns the request body to set on the refresh request
273
274        Override to define additional parameters
275        """
276        if self.use_profile_assertion:
277            return {
278                self.get_grant_type_name(): self.get_grant_type(),
279                self.get_assertion_name(): self.get_assertion(),
280            }
281        return super().build_refresh_request_body()

Returns the request body to set on the refresh request

Override to define additional parameters

access_token: str
283    @property
284    def access_token(self) -> str:
285        if self._access_token is None:
286            raise ValueError("access_token is not set")
287        return self._access_token

Returns the access token

 55@dataclass
 56class JwtAuthenticator(DeclarativeAuthenticator):
 57    """
 58    Generates a JSON Web Token (JWT) based on a declarative connector configuration file. The generated token is attached to each request via the Authorization header.
 59
 60    Attributes:
 61        config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
 62        secret_key (Union[InterpolatedString, str]): The secret key used to sign the JWT
 63        algorithm (Union[str, JwtAlgorithm]): The algorithm used to sign the JWT
 64        token_duration (Optional[int]): The duration in seconds for which the token is valid
 65        base64_encode_secret_key (Optional[Union[InterpolatedBoolean, str, bool]]): Whether to base64 encode the secret key
 66        header_prefix (Optional[Union[InterpolatedString, str]]): The prefix to add to the Authorization header
 67        kid (Optional[Union[InterpolatedString, str]]): The key identifier to be included in the JWT header
 68        typ (Optional[Union[InterpolatedString, str]]): The type of the JWT.
 69        cty (Optional[Union[InterpolatedString, str]]): The content type of the JWT.
 70        iss (Optional[Union[InterpolatedString, str]]): The issuer of the JWT.
 71        sub (Optional[Union[InterpolatedString, str]]): The subject of the JWT.
 72        aud (Optional[Union[InterpolatedString, str]]): The audience of the JWT.
 73        additional_jwt_headers (Optional[Mapping[str, Any]]): Additional headers to include in the JWT.
 74        additional_jwt_payload (Optional[Mapping[str, Any]]): Additional payload to include in the JWT.
 75    """
 76
 77    config: Mapping[str, Any]
 78    parameters: InitVar[Mapping[str, Any]]
 79    secret_key: Union[InterpolatedString, str]
 80    algorithm: Union[str, JwtAlgorithm]
 81    token_duration: Optional[int]
 82    base64_encode_secret_key: Optional[Union[InterpolatedBoolean, str, bool]] = False
 83    header_prefix: Optional[Union[InterpolatedString, str]] = None
 84    kid: Optional[Union[InterpolatedString, str]] = None
 85    typ: Optional[Union[InterpolatedString, str]] = None
 86    cty: Optional[Union[InterpolatedString, str]] = None
 87    iss: Optional[Union[InterpolatedString, str]] = None
 88    sub: Optional[Union[InterpolatedString, str]] = None
 89    aud: Optional[Union[InterpolatedString, str]] = None
 90    additional_jwt_headers: Optional[Mapping[str, Any]] = None
 91    additional_jwt_payload: Optional[Mapping[str, Any]] = None
 92    passphrase: Optional[Union[InterpolatedString, str]] = None
 93    request_option: Optional[RequestOption] = None
 94
 95    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 96        self._secret_key = InterpolatedString.create(self.secret_key, parameters=parameters)
 97        self._algorithm = (
 98            JwtAlgorithm(self.algorithm) if isinstance(self.algorithm, str) else self.algorithm
 99        )
100        self._base64_encode_secret_key = (
101            InterpolatedBoolean(self.base64_encode_secret_key, parameters=parameters)
102            if isinstance(self.base64_encode_secret_key, str)
103            else self.base64_encode_secret_key
104        )
105        self._token_duration = self.token_duration
106        self._header_prefix = (
107            InterpolatedString.create(self.header_prefix, parameters=parameters)
108            if self.header_prefix
109            else None
110        )
111        self._kid = InterpolatedString.create(self.kid, parameters=parameters) if self.kid else None
112        self._typ = InterpolatedString.create(self.typ, parameters=parameters) if self.typ else None
113        self._cty = InterpolatedString.create(self.cty, parameters=parameters) if self.cty else None
114        self._iss = InterpolatedString.create(self.iss, parameters=parameters) if self.iss else None
115        self._sub = InterpolatedString.create(self.sub, parameters=parameters) if self.sub else None
116        self._aud = InterpolatedString.create(self.aud, parameters=parameters) if self.aud else None
117        self._additional_jwt_headers = InterpolatedMapping(
118            self.additional_jwt_headers or {}, parameters=parameters
119        )
120        self._additional_jwt_payload = InterpolatedMapping(
121            self.additional_jwt_payload or {}, parameters=parameters
122        )
123        self._passphrase = (
124            InterpolatedString.create(self.passphrase, parameters=parameters)
125            if self.passphrase
126            else None
127        )
128
129        # When we first implemented the JWT authenticator, we assumed that the signed token was always supposed
130        # to be loaded into the request headers under the `Authorization` key. This is not always the case, but
131        # this default option allows for backwards compatibility to be retained for existing connectors
132        self._request_option = self.request_option or RequestOption(
133            inject_into=RequestOptionType.header, field_name="Authorization", parameters=parameters
134        )
135
136    def _get_jwt_headers(self) -> dict[str, Any]:
137        """
138        Builds and returns the headers used when signing the JWT.
139        """
140        headers = self._additional_jwt_headers.eval(self.config, json_loads=json.loads)
141        if any(prop in headers for prop in ["kid", "alg", "typ", "cty"]):
142            raise ValueError(
143                "'kid', 'alg', 'typ', 'cty' are reserved headers and should not be set as part of 'additional_jwt_headers'"
144            )
145
146        if self._kid:
147            headers["kid"] = self._kid.eval(self.config, json_loads=json.loads)
148        if self._typ:
149            headers["typ"] = self._typ.eval(self.config, json_loads=json.loads)
150        if self._cty:
151            headers["cty"] = self._cty.eval(self.config, json_loads=json.loads)
152        headers["alg"] = self._algorithm
153        return headers
154
155    def _get_jwt_payload(self) -> dict[str, Any]:
156        """
157        Builds and returns the payload used when signing the JWT.
158        """
159        now = int(datetime.now().timestamp())
160        exp = now + self._token_duration if isinstance(self._token_duration, int) else now
161        nbf = now
162
163        payload = self._additional_jwt_payload.eval(self.config, json_loads=json.loads)
164        if any(prop in payload for prop in ["iss", "sub", "aud", "iat", "exp", "nbf"]):
165            raise ValueError(
166                "'iss', 'sub', 'aud', 'iat', 'exp', 'nbf' are reserved properties and should not be set as part of 'additional_jwt_payload'"
167            )
168
169        if self._iss:
170            payload["iss"] = self._iss.eval(self.config, json_loads=json.loads)
171        if self._sub:
172            payload["sub"] = self._sub.eval(self.config, json_loads=json.loads)
173        if self._aud:
174            payload["aud"] = self._aud.eval(self.config, json_loads=json.loads)
175
176        payload["iat"] = now
177        payload["exp"] = exp
178        payload["nbf"] = nbf
179        return payload
180
181    def _get_secret_key(self) -> JwtKeyTypes:
182        """
183        Returns the secret key used to sign the JWT.
184        """
185        secret_key: str = self._secret_key.eval(self.config, json_loads=json.loads)
186
187        if self._passphrase:
188            passphrase_value = self._passphrase.eval(self.config, json_loads=json.loads)
189            if passphrase_value:
190                private_key = serialization.load_pem_private_key(
191                    secret_key.encode(),
192                    password=passphrase_value.encode(),
193                )
194                return cast(JwtKeyTypes, private_key)
195
196        return (
197            base64.b64encode(secret_key.encode()).decode()
198            if self._base64_encode_secret_key
199            else secret_key
200        )
201
202    def _get_signed_token(self) -> Union[str, Any]:
203        """
204        Signed the JWT using the provided secret key and algorithm and the generated headers and payload. For additional information on PyJWT see: https://pyjwt.readthedocs.io/en/stable/
205        """
206        try:
207            return jwt.encode(
208                payload=self._get_jwt_payload(),
209                key=self._get_secret_key(),
210                algorithm=self._algorithm,
211                headers=self._get_jwt_headers(),
212            )
213        except Exception as e:
214            raise ValueError(f"Failed to sign token: {e}")
215
216    def _get_header_prefix(self) -> Union[str, None]:
217        """
218        Returns the header prefix to be used when attaching the token to the request.
219        """
220        return (
221            self._header_prefix.eval(self.config, json_loads=json.loads)
222            if self._header_prefix
223            else None
224        )
225
226    @property
227    def auth_header(self) -> str:
228        options = self._get_request_options(RequestOptionType.header)
229        return next(iter(options.keys()), "")
230
231    @property
232    def token(self) -> str:
233        return (
234            f"{self._get_header_prefix()} {self._get_signed_token()}"
235            if self._get_header_prefix()
236            else self._get_signed_token()
237        )
238
239    def get_request_params(self) -> Mapping[str, Any]:
240        return self._get_request_options(RequestOptionType.request_parameter)
241
242    def get_request_body_data(self) -> Union[Mapping[str, Any], str]:
243        return self._get_request_options(RequestOptionType.body_data)
244
245    def get_request_body_json(self) -> Mapping[str, Any]:
246        return self._get_request_options(RequestOptionType.body_json)
247
248    def _get_request_options(self, option_type: RequestOptionType) -> Mapping[str, Any]:
249        options: MutableMapping[str, Any] = {}
250        if self._request_option.inject_into == option_type:
251            self._request_option.inject_into_request(options, self.token, self.config)
252        return options

Generates a JSON Web Token (JWT) based on a declarative connector configuration file. The generated token is attached to each request via the Authorization header.

Attributes:
  • config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
  • secret_key (Union[InterpolatedString, str]): The secret key used to sign the JWT
  • algorithm (Union[str, JwtAlgorithm]): The algorithm used to sign the JWT
  • token_duration (Optional[int]): The duration in seconds for which the token is valid
  • base64_encode_secret_key (Optional[Union[InterpolatedBoolean, str, bool]]): Whether to base64 encode the secret key
  • header_prefix (Optional[Union[InterpolatedString, str]]): The prefix to add to the Authorization header
  • kid (Optional[Union[InterpolatedString, str]]): The key identifier to be included in the JWT header
  • typ (Optional[Union[InterpolatedString, str]]): The type of the JWT.
  • cty (Optional[Union[InterpolatedString, str]]): The content type of the JWT.
  • iss (Optional[Union[InterpolatedString, str]]): The issuer of the JWT.
  • sub (Optional[Union[InterpolatedString, str]]): The subject of the JWT.
  • aud (Optional[Union[InterpolatedString, str]]): The audience of the JWT.
  • additional_jwt_headers (Optional[Mapping[str, Any]]): Additional headers to include in the JWT.
  • additional_jwt_payload (Optional[Mapping[str, Any]]): Additional payload to include in the JWT.
JwtAuthenticator( config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], secret_key: Union[airbyte_cdk.InterpolatedString, str], algorithm: Union[str, airbyte_cdk.sources.declarative.auth.jwt.JwtAlgorithm], token_duration: Optional[int], base64_encode_secret_key: Union[airbyte_cdk.InterpolatedBoolean, str, bool, NoneType] = False, header_prefix: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, kid: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, typ: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, cty: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, iss: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, sub: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, aud: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, additional_jwt_headers: Optional[Mapping[str, Any]] = None, additional_jwt_payload: Optional[Mapping[str, Any]] = None, passphrase: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, request_option: Optional[airbyte_cdk.RequestOption] = None)
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
secret_key: Union[airbyte_cdk.InterpolatedString, str]
token_duration: Optional[int]
base64_encode_secret_key: Union[airbyte_cdk.InterpolatedBoolean, str, bool, NoneType] = False
header_prefix: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
kid: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
typ: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
cty: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
iss: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
sub: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
aud: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
additional_jwt_headers: Optional[Mapping[str, Any]] = None
additional_jwt_payload: Optional[Mapping[str, Any]] = None
passphrase: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
request_option: Optional[airbyte_cdk.RequestOption] = None
auth_header: str
226    @property
227    def auth_header(self) -> str:
228        options = self._get_request_options(RequestOptionType.header)
229        return next(iter(options.keys()), "")

HTTP header to set on the requests

token: str
231    @property
232    def token(self) -> str:
233        return (
234            f"{self._get_header_prefix()} {self._get_signed_token()}"
235            if self._get_header_prefix()
236            else self._get_signed_token()
237        )

The header value to set on outgoing HTTP requests

def get_request_params(self) -> Mapping[str, Any]:
239    def get_request_params(self) -> Mapping[str, Any]:
240        return self._get_request_options(RequestOptionType.request_parameter)

HTTP request parameter to add to the requests

def get_request_body_data(self) -> Union[Mapping[str, Any], str]:
242    def get_request_body_data(self) -> Union[Mapping[str, Any], str]:
243        return self._get_request_options(RequestOptionType.body_data)

Form-encoded body data to set on the requests

def get_request_body_json(self) -> Mapping[str, Any]:
245    def get_request_body_json(self) -> Mapping[str, Any]:
246        return self._get_request_options(RequestOptionType.body_json)

JSON-encoded body data to set on the requests