
2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
5from airbyte_cdk.sources.declarative.auth.jwt import JwtAuthenticator
6from airbyte_cdk.sources.declarative.auth.oauth import DeclarativeOauth2Authenticator
8__all__ = ["DeclarativeOauth2Authenticator", "JwtAuthenticator"]
 25class DeclarativeOauth2Authenticator(AbstractOauth2Authenticator, DeclarativeAuthenticator):
 26    """
 27    Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials based on
 28    a declarative connector configuration file. Credentials can be defined explicitly or via interpolation
 29    at runtime. The generated access token is attached to each request via the Authorization header.
 31    Attributes:
 32        token_refresh_endpoint (Union[InterpolatedString, str]): The endpoint to refresh the access token
 33        client_id (Union[InterpolatedString, str]): The client id
 34        client_secret (Union[InterpolatedString, str]): Client secret
 35        refresh_token (Union[InterpolatedString, str]): The token used to refresh the access token
 36        access_token_name (Union[InterpolatedString, str]): THe field to extract access token from in the response
 37        expires_in_name (Union[InterpolatedString, str]): The field to extract expires_in from in the response
 38        config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
 39        scopes (Optional[List[str]]): The scopes to request
 40        token_expiry_date (Optional[Union[InterpolatedString, str]]): The access token expiration date
 41        token_expiry_date_format str: format of the datetime; provide it if expires_in is returned in datetime instead of seconds
 42        token_expiry_is_time_of_expiration bool: set True it if expires_in is returned as time of expiration instead of the number seconds until expiration
 43        refresh_request_body (Optional[Mapping[str, Any]]): The request body to send in the refresh request
 44        refresh_request_headers (Optional[Mapping[str, Any]]): The request headers to send in the refresh request
 45        grant_type: The grant_type to request for access_token. If set to refresh_token, the refresh_token parameter has to be provided
 46        message_repository (MessageRepository): the message repository used to emit logs on HTTP requests
 47    """
 49    config: Mapping[str, Any]
 50    parameters: InitVar[Mapping[str, Any]]
 51    client_id: Optional[Union[InterpolatedString, str]] = None
 52    client_secret: Optional[Union[InterpolatedString, str]] = None
 53    token_refresh_endpoint: Optional[Union[InterpolatedString, str]] = None
 54    refresh_token: Optional[Union[InterpolatedString, str]] = None
 55    scopes: Optional[List[str]] = None
 56    token_expiry_date: Optional[Union[InterpolatedString, str]] = None
 57    _token_expiry_date: Optional[AirbyteDateTime] = field(init=False, repr=False, default=None)
 58    token_expiry_date_format: Optional[str] = None
 59    token_expiry_is_time_of_expiration: bool = False
 60    access_token_name: Union[InterpolatedString, str] = "access_token"
 61    access_token_value: Optional[Union[InterpolatedString, str]] = None
 62    client_id_name: Union[InterpolatedString, str] = "client_id"
 63    client_secret_name: Union[InterpolatedString, str] = "client_secret"
 64    expires_in_name: Union[InterpolatedString, str] = "expires_in"
 65    refresh_token_name: Union[InterpolatedString, str] = "refresh_token"
 66    refresh_request_body: Optional[Mapping[str, Any]] = None
 67    refresh_request_headers: Optional[Mapping[str, Any]] = None
 68    grant_type_name: Union[InterpolatedString, str] = "grant_type"
 69    grant_type: Union[InterpolatedString, str] = "refresh_token"
 70    message_repository: MessageRepository = NoopMessageRepository()
 71    profile_assertion: Optional[DeclarativeAuthenticator] = None
 72    use_profile_assertion: Optional[Union[InterpolatedBoolean, str, bool]] = False
 74    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 75        super().__init__()
 76        if self.token_refresh_endpoint is not None:
 77            self._token_refresh_endpoint: Optional[InterpolatedString] = InterpolatedString.create(
 78                self.token_refresh_endpoint, parameters=parameters
 79            )
 80        else:
 81            self._token_refresh_endpoint = None
 82        self._client_id_name = InterpolatedString.create(self.client_id_name, parameters=parameters)
 83        self._client_id = (
 84            InterpolatedString.create(self.client_id, parameters=parameters)
 85            if self.client_id
 86            else self.client_id
 87        )
 88        self._client_secret_name = InterpolatedString.create(
 89            self.client_secret_name, parameters=parameters
 90        )
 91        self._client_secret = (
 92            InterpolatedString.create(self.client_secret, parameters=parameters)
 93            if self.client_secret
 94            else self.client_secret
 95        )
 96        self._refresh_token_name = InterpolatedString.create(
 97            self.refresh_token_name, parameters=parameters
 98        )
 99        if self.refresh_token is not None:
100            self._refresh_token: Optional[InterpolatedString] = InterpolatedString.create(
101                self.refresh_token, parameters=parameters
102            )
103        else:
104            self._refresh_token = None
105        self.access_token_name = InterpolatedString.create(
106            self.access_token_name, parameters=parameters
107        )
108        self.expires_in_name = InterpolatedString.create(
109            self.expires_in_name, parameters=parameters
110        )
111        self.grant_type_name = InterpolatedString.create(
112            self.grant_type_name, parameters=parameters
113        )
114        self.grant_type = InterpolatedString.create(
115            "urn:ietf:params:oauth:grant-type:jwt-bearer"
116            if self.use_profile_assertion
117            else self.grant_type,
118            parameters=parameters,
119        )
120        self._refresh_request_body = InterpolatedMapping(
121            self.refresh_request_body or {}, parameters=parameters
122        )
123        self._refresh_request_headers = InterpolatedMapping(
124            self.refresh_request_headers or {}, parameters=parameters
125        )
126        try:
127            if (
128                isinstance(self.token_expiry_date, (int, str))
129                and str(self.token_expiry_date).isdigit()
130            ):
131                self._token_expiry_date = ab_datetime_parse(self.token_expiry_date)
132            else:
133                self._token_expiry_date = (
134                    ab_datetime_parse(
135                        InterpolatedString.create(
136                            self.token_expiry_date, parameters=parameters
137                        ).eval(self.config)
138                    )
139                    if self.token_expiry_date
140                    else ab_datetime_now() - timedelta(days=1)
141                )
142        except ValueError as e:
143            raise ValueError(f"Invalid token expiry date format: {e}")
144        self.use_profile_assertion = (
145            InterpolatedBoolean(self.use_profile_assertion, parameters=parameters)
146            if isinstance(self.use_profile_assertion, str)
147            else self.use_profile_assertion
148        )
149        self.assertion_name = "assertion"
151        if self.access_token_value is not None:
152            self._access_token_value = InterpolatedString.create(
153                self.access_token_value, parameters=parameters
154            ).eval(self.config)
155        else:
156            self._access_token_value = None
158        self._access_token: Optional[str] = (
159            self._access_token_value if self.access_token_value else None
160        )
162        if not self.use_profile_assertion and any(
163            client_creds is None for client_creds in [self.client_id, self.client_secret]
164        ):
165            raise ValueError(
166                "OAuthAuthenticator configuration error: Both 'client_id' and 'client_secret' are required for the "
167                "basic OAuth flow."
168            )
169        if self.profile_assertion is None and self.use_profile_assertion:
170            raise ValueError(
171                "OAuthAuthenticator configuration error: 'profile_assertion' is required when using the profile assertion flow."
172            )
173        if self.get_grant_type() == "refresh_token" and self._refresh_token is None:
174            raise ValueError(
175                "OAuthAuthenticator configuration error: A 'refresh_token' is required when the 'grant_type' is set to 'refresh_token'."
176            )
178    def get_token_refresh_endpoint(self) -> Optional[str]:
179        if self._token_refresh_endpoint is not None:
180            refresh_token_endpoint: str = self._token_refresh_endpoint.eval(self.config)
181            if not refresh_token_endpoint:
182                raise ValueError(
183                    "OAuthAuthenticator was unable to evaluate token_refresh_endpoint parameter"
184                )
185            return refresh_token_endpoint
186        return None
188    def get_client_id_name(self) -> str:
189        return self._client_id_name.eval(self.config)  # type: ignore # eval returns a string in this context
191    def get_client_id(self) -> str:
192        client_id = self._client_id.eval(self.config) if self._client_id else self._client_id
193        if not client_id:
194            raise ValueError("OAuthAuthenticator was unable to evaluate client_id parameter")
195        return client_id  # type: ignore # value will be returned as a string, or an error will be raised
197    def get_client_secret_name(self) -> str:
198        return self._client_secret_name.eval(self.config)  # type: ignore # eval returns a string in this context
200    def get_client_secret(self) -> str:
201        client_secret = (
202            self._client_secret.eval(self.config) if self._client_secret else self._client_secret
203        )
204        if not client_secret:
205            raise ValueError("OAuthAuthenticator was unable to evaluate client_secret parameter")
206        return client_secret  # type: ignore # value will be returned as a string, or an error will be raised
208    def get_refresh_token_name(self) -> str:
209        return self._refresh_token_name.eval(self.config)  # type: ignore # eval returns a string in this context
211    def get_refresh_token(self) -> Optional[str]:
212        return None if self._refresh_token is None else str(self._refresh_token.eval(self.config))
214    def get_scopes(self) -> List[str]:
215        return self.scopes or []
217    def get_access_token_name(self) -> str:
218        return self.access_token_name.eval(self.config)  # type: ignore # eval returns a string in this context
220    def get_expires_in_name(self) -> str:
221        return self.expires_in_name.eval(self.config)  # type: ignore # eval returns a string in this context
223    def get_grant_type_name(self) -> str:
224        return self.grant_type_name.eval(self.config)  # type: ignore # eval returns a string in this context
226    def get_grant_type(self) -> str:
227        return self.grant_type.eval(self.config)  # type: ignore # eval returns a string in this context
229    def get_refresh_request_body(self) -> Mapping[str, Any]:
230        return self._refresh_request_body.eval(self.config)
232    def get_refresh_request_headers(self) -> Mapping[str, Any]:
233        return self._refresh_request_headers.eval(self.config)
235    def get_token_expiry_date(self) -> AirbyteDateTime:
236        if not self._has_access_token_been_initialized():
237            return AirbyteDateTime.from_datetime(datetime.min)
238        return self._token_expiry_date  # type: ignore # _token_expiry_date is an AirbyteDateTime. It is never None despite what mypy thinks
240    def _has_access_token_been_initialized(self) -> bool:
241        return self._access_token is not None
243    def set_token_expiry_date(self, value: Union[str, int]) -> None:
244        self._token_expiry_date = self._parse_token_expiration_date(value)
246    def get_assertion_name(self) -> str:
247        return self.assertion_name
249    def get_assertion(self) -> str:
250        if self.profile_assertion is None:
251            raise ValueError("profile_assertion is not set")
252        return self.profile_assertion.token
254    def build_refresh_request_body(self) -> Mapping[str, Any]:
255        """
256        Returns the request body to set on the refresh request
258        Override to define additional parameters
259        """
260        if self.use_profile_assertion:
261            return {
262                self.get_grant_type_name(): self.get_grant_type(),
263                self.get_assertion_name(): self.get_assertion(),
264            }
265        return super().build_refresh_request_body()
267    @property
268    def access_token(self) -> str:
269        if self._access_token is None:
270            raise ValueError("access_token is not set")
271        return self._access_token
273    @access_token.setter
274    def access_token(self, value: str) -> None:
275        self._access_token = value
277    @property
278    def _message_repository(self) -> MessageRepository:
279        """
280        Overriding AbstractOauth2Authenticator._message_repository to allow for HTTP request logs
281        """
282        return self.message_repository

Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials based on a declarative connector configuration file. Credentials can be defined explicitly or via interpolation at runtime. The generated access token is attached to each request via the Authorization header.

  • token_refresh_endpoint (Union[InterpolatedString, str]): The endpoint to refresh the access token
  • client_id (Union[InterpolatedString, str]): The client id
  • client_secret (Union[InterpolatedString, str]): Client secret
  • refresh_token (Union[InterpolatedString, str]): The token used to refresh the access token
  • access_token_name (Union[InterpolatedString, str]): THe field to extract access token from in the response
  • expires_in_name (Union[InterpolatedString, str]): The field to extract expires_in from in the response
  • config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
  • scopes (Optional[List[str]]): The scopes to request
  • token_expiry_date (Optional[Union[InterpolatedString, str]]): The access token expiration date
  • token_expiry_date_format str: format of the datetime; provide it if expires_in is returned in datetime instead of seconds
  • token_expiry_is_time_of_expiration bool: set True it if expires_in is returned as time of expiration instead of the number seconds until expiration
  • refresh_request_body (Optional[Mapping[str, Any]]): The request body to send in the refresh request
  • refresh_request_headers (Optional[Mapping[str, Any]]): The request headers to send in the refresh request
  • grant_type: The grant_type to request for access_token. If set to refresh_token, the refresh_token parameter has to be provided
  • message_repository (MessageRepository): the message repository used to emit logs on HTTP requests
DeclarativeOauth2Authenticator( config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], client_id: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, client_secret: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, token_refresh_endpoint: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, refresh_token: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, scopes: Optional[List[str]] = None, token_expiry_date: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, token_expiry_date_format: Optional[str] = None, token_expiry_is_time_of_expiration: bool = False, access_token_name: Union[airbyte_cdk.InterpolatedString, str] = 'access_token', access_token_value: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, client_id_name: Union[airbyte_cdk.InterpolatedString, str] = 'client_id', client_secret_name: Union[airbyte_cdk.InterpolatedString, str] = 'client_secret', expires_in_name: Union[airbyte_cdk.InterpolatedString, str] = 'expires_in', refresh_token_name: Union[airbyte_cdk.InterpolatedString, str] = 'refresh_token', refresh_request_body: Optional[Mapping[str, Any]] = None, refresh_request_headers: Optional[Mapping[str, Any]] = None, grant_type_name: Union[airbyte_cdk.InterpolatedString, str] = 'grant_type', grant_type: Union[airbyte_cdk.InterpolatedString, str] = 'refresh_token', message_repository: airbyte_cdk.MessageRepository = <airbyte_cdk.sources.message.NoopMessageRepository object>, profile_assertion: Optional[airbyte_cdk.DeclarativeAuthenticator] = None, use_profile_assertion: Union[airbyte_cdk.InterpolatedBoolean, str, bool, NoneType] = False)

If all of refresh_token_error_status_codes, refresh_token_error_key, and refresh_token_error_values are set, then http errors with such params will be wrapped in AirbyteTracedException.

config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
client_id: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
client_secret: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
token_refresh_endpoint: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
refresh_token: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
scopes: Optional[List[str]] = None
token_expiry_date: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
token_expiry_date_format: Optional[str] = None

Format of the datetime; exists it if expires_in is returned as the expiration datetime instead of seconds until it expires

token_expiry_is_time_of_expiration: bool = False

Indicates that the Token Expiry returns the date until which the token will be valid, not the amount of time it will be valid.

access_token_name: Union[airbyte_cdk.InterpolatedString, str] = 'access_token'
access_token_value: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
client_id_name: Union[airbyte_cdk.InterpolatedString, str] = 'client_id'
client_secret_name: Union[airbyte_cdk.InterpolatedString, str] = 'client_secret'
expires_in_name: Union[airbyte_cdk.InterpolatedString, str] = 'expires_in'
refresh_token_name: Union[airbyte_cdk.InterpolatedString, str] = 'refresh_token'
refresh_request_body: Optional[Mapping[str, Any]] = None
refresh_request_headers: Optional[Mapping[str, Any]] = None
grant_type_name: Union[airbyte_cdk.InterpolatedString, str] = 'grant_type'
grant_type: Union[airbyte_cdk.InterpolatedString, str] = 'refresh_token'
profile_assertion: Optional[airbyte_cdk.DeclarativeAuthenticator] = None
use_profile_assertion: Union[airbyte_cdk.InterpolatedBoolean, str, bool, NoneType] = False
def get_token_refresh_endpoint(self) -> Optional[str]:
178    def get_token_refresh_endpoint(self) -> Optional[str]:
179        if self._token_refresh_endpoint is not None:
180            refresh_token_endpoint: str = self._token_refresh_endpoint.eval(self.config)
181            if not refresh_token_endpoint:
182                raise ValueError(
183                    "OAuthAuthenticator was unable to evaluate token_refresh_endpoint parameter"
184                )
185            return refresh_token_endpoint
186        return None

Returns the endpoint to refresh the access token

def get_client_id_name(self) -> str:
188    def get_client_id_name(self) -> str:
189        return self._client_id_name.eval(self.config)  # type: ignore # eval returns a string in this context

The client id name to authenticate

def get_client_id(self) -> str:
191    def get_client_id(self) -> str:
192        client_id = self._client_id.eval(self.config) if self._client_id else self._client_id
193        if not client_id:
194            raise ValueError("OAuthAuthenticator was unable to evaluate client_id parameter")
195        return client_id  # type: ignore # value will be returned as a string, or an error will be raised

The client id to authenticate

def get_client_secret_name(self) -> str:
197    def get_client_secret_name(self) -> str:
198        return self._client_secret_name.eval(self.config)  # type: ignore # eval returns a string in this context

The client secret name to authenticate

def get_client_secret(self) -> str:
200    def get_client_secret(self) -> str:
201        client_secret = (
202            self._client_secret.eval(self.config) if self._client_secret else self._client_secret
203        )
204        if not client_secret:
205            raise ValueError("OAuthAuthenticator was unable to evaluate client_secret parameter")
206        return client_secret  # type: ignore # value will be returned as a string, or an error will be raised

The client secret to authenticate

def get_refresh_token_name(self) -> str:
208    def get_refresh_token_name(self) -> str:
209        return self._refresh_token_name.eval(self.config)  # type: ignore # eval returns a string in this context

The refresh token name to authenticate

def get_refresh_token(self) -> Optional[str]:
211    def get_refresh_token(self) -> Optional[str]:
212        return None if self._refresh_token is None else str(self._refresh_token.eval(self.config))

The token used to refresh the access token when it expires

def get_scopes(self) -> List[str]:
214    def get_scopes(self) -> List[str]:
215        return self.scopes or []

List of requested scopes

def get_access_token_name(self) -> str:
217    def get_access_token_name(self) -> str:
218        return self.access_token_name.eval(self.config)  # type: ignore # eval returns a string in this context

Field to extract access token from in the response

def get_expires_in_name(self) -> str:
220    def get_expires_in_name(self) -> str:
221        return self.expires_in_name.eval(self.config)  # type: ignore # eval returns a string in this context

Returns the expires_in field name

def get_grant_type_name(self) -> str:
223    def get_grant_type_name(self) -> str:
224        return self.grant_type_name.eval(self.config)  # type: ignore # eval returns a string in this context

Returns grant_type specified name for requesting access_token

def get_grant_type(self) -> str:
226    def get_grant_type(self) -> str:
227        return self.grant_type.eval(self.config)  # type: ignore # eval returns a string in this context

Returns grant_type specified for requesting access_token

def get_refresh_request_body(self) -> Mapping[str, Any]:
229    def get_refresh_request_body(self) -> Mapping[str, Any]:
230        return self._refresh_request_body.eval(self.config)

Returns the request body to set on the refresh request

def get_refresh_request_headers(self) -> Mapping[str, Any]:
232    def get_refresh_request_headers(self) -> Mapping[str, Any]:
233        return self._refresh_request_headers.eval(self.config)

Returns the request headers to set on the refresh request

def get_token_expiry_date(self) -> airbyte_cdk.utils.datetime_helpers.AirbyteDateTime:
235    def get_token_expiry_date(self) -> AirbyteDateTime:
236        if not self._has_access_token_been_initialized():
237            return AirbyteDateTime.from_datetime(datetime.min)
238        return self._token_expiry_date  # type: ignore # _token_expiry_date is an AirbyteDateTime. It is never None despite what mypy thinks

Expiration date of the access token

def set_token_expiry_date(self, value: Union[str, int]) -> None:
243    def set_token_expiry_date(self, value: Union[str, int]) -> None:
244        self._token_expiry_date = self._parse_token_expiration_date(value)

Setter for access token expiration date

def get_assertion_name(self) -> str:
246    def get_assertion_name(self) -> str:
247        return self.assertion_name
def get_assertion(self) -> str:
249    def get_assertion(self) -> str:
250        if self.profile_assertion is None:
251            raise ValueError("profile_assertion is not set")
252        return self.profile_assertion.token
def build_refresh_request_body(self) -> Mapping[str, Any]:
254    def build_refresh_request_body(self) -> Mapping[str, Any]:
255        """
256        Returns the request body to set on the refresh request
258        Override to define additional parameters
259        """
260        if self.use_profile_assertion:
261            return {
262                self.get_grant_type_name(): self.get_grant_type(),
263                self.get_assertion_name(): self.get_assertion(),
264            }
265        return super().build_refresh_request_body()

Returns the request body to set on the refresh request

Override to define additional parameters

access_token: str
267    @property
268    def access_token(self) -> str:
269        if self._access_token is None:
270            raise ValueError("access_token is not set")
271        return self._access_token

Returns the access token

 42class JwtAuthenticator(DeclarativeAuthenticator):
 43    """
 44    Generates a JSON Web Token (JWT) based on a declarative connector configuration file. The generated token is attached to each request via the Authorization header.
 46    Attributes:
 47        config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
 48        secret_key (Union[InterpolatedString, str]): The secret key used to sign the JWT
 49        algorithm (Union[str, JwtAlgorithm]): The algorithm used to sign the JWT
 50        token_duration (Optional[int]): The duration in seconds for which the token is valid
 51        base64_encode_secret_key (Optional[Union[InterpolatedBoolean, str, bool]]): Whether to base64 encode the secret key
 52        header_prefix (Optional[Union[InterpolatedString, str]]): The prefix to add to the Authorization header
 53        kid (Optional[Union[InterpolatedString, str]]): The key identifier to be included in the JWT header
 54        typ (Optional[Union[InterpolatedString, str]]): The type of the JWT.
 55        cty (Optional[Union[InterpolatedString, str]]): The content type of the JWT.
 56        iss (Optional[Union[InterpolatedString, str]]): The issuer of the JWT.
 57        sub (Optional[Union[InterpolatedString, str]]): The subject of the JWT.
 58        aud (Optional[Union[InterpolatedString, str]]): The audience of the JWT.
 59        additional_jwt_headers (Optional[Mapping[str, Any]]): Additional headers to include in the JWT.
 60        additional_jwt_payload (Optional[Mapping[str, Any]]): Additional payload to include in the JWT.
 61    """
 63    config: Mapping[str, Any]
 64    parameters: InitVar[Mapping[str, Any]]
 65    secret_key: Union[InterpolatedString, str]
 66    algorithm: Union[str, JwtAlgorithm]
 67    token_duration: Optional[int]
 68    base64_encode_secret_key: Optional[Union[InterpolatedBoolean, str, bool]] = False
 69    header_prefix: Optional[Union[InterpolatedString, str]] = None
 70    kid: Optional[Union[InterpolatedString, str]] = None
 71    typ: Optional[Union[InterpolatedString, str]] = None
 72    cty: Optional[Union[InterpolatedString, str]] = None
 73    iss: Optional[Union[InterpolatedString, str]] = None
 74    sub: Optional[Union[InterpolatedString, str]] = None
 75    aud: Optional[Union[InterpolatedString, str]] = None
 76    additional_jwt_headers: Optional[Mapping[str, Any]] = None
 77    additional_jwt_payload: Optional[Mapping[str, Any]] = None
 79    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 80        self._secret_key = InterpolatedString.create(self.secret_key, parameters=parameters)
 81        self._algorithm = (
 82            JwtAlgorithm(self.algorithm) if isinstance(self.algorithm, str) else self.algorithm
 83        )
 84        self._base64_encode_secret_key = (
 85            InterpolatedBoolean(self.base64_encode_secret_key, parameters=parameters)
 86            if isinstance(self.base64_encode_secret_key, str)
 87            else self.base64_encode_secret_key
 88        )
 89        self._token_duration = self.token_duration
 90        self._header_prefix = (
 91            InterpolatedString.create(self.header_prefix, parameters=parameters)
 92            if self.header_prefix
 93            else None
 94        )
 95        self._kid = InterpolatedString.create(self.kid, parameters=parameters) if self.kid else None
 96        self._typ = InterpolatedString.create(self.typ, parameters=parameters) if self.typ else None
 97        self._cty = InterpolatedString.create(self.cty, parameters=parameters) if self.cty else None
 98        self._iss = InterpolatedString.create(self.iss, parameters=parameters) if self.iss else None
 99        self._sub = InterpolatedString.create(self.sub, parameters=parameters) if self.sub else None
100        self._aud = InterpolatedString.create(self.aud, parameters=parameters) if self.aud else None
101        self._additional_jwt_headers = InterpolatedMapping(
102            self.additional_jwt_headers or {}, parameters=parameters
103        )
104        self._additional_jwt_payload = InterpolatedMapping(
105            self.additional_jwt_payload or {}, parameters=parameters
106        )
108    def _get_jwt_headers(self) -> dict[str, Any]:
109        """
110        Builds and returns the headers used when signing the JWT.
111        """
112        headers = self._additional_jwt_headers.eval(self.config, json_loads=json.loads)
113        if any(prop in headers for prop in ["kid", "alg", "typ", "cty"]):
114            raise ValueError(
115                "'kid', 'alg', 'typ', 'cty' are reserved headers and should not be set as part of 'additional_jwt_headers'"
116            )
118        if self._kid:
119            headers["kid"] = self._kid.eval(self.config, json_loads=json.loads)
120        if self._typ:
121            headers["typ"] = self._typ.eval(self.config, json_loads=json.loads)
122        if self._cty:
123            headers["cty"] = self._cty.eval(self.config, json_loads=json.loads)
124        headers["alg"] = self._algorithm
125        return headers
127    def _get_jwt_payload(self) -> dict[str, Any]:
128        """
129        Builds and returns the payload used when signing the JWT.
130        """
131        now = int(
132        exp = now + self._token_duration if isinstance(self._token_duration, int) else now
133        nbf = now
135        payload = self._additional_jwt_payload.eval(self.config, json_loads=json.loads)
136        if any(prop in payload for prop in ["iss", "sub", "aud", "iat", "exp", "nbf"]):
137            raise ValueError(
138                "'iss', 'sub', 'aud', 'iat', 'exp', 'nbf' are reserved properties and should not be set as part of 'additional_jwt_payload'"
139            )
141        if self._iss:
142            payload["iss"] = self._iss.eval(self.config, json_loads=json.loads)
143        if self._sub:
144            payload["sub"] = self._sub.eval(self.config, json_loads=json.loads)
145        if self._aud:
146            payload["aud"] = self._aud.eval(self.config, json_loads=json.loads)
148        payload["iat"] = now
149        payload["exp"] = exp
150        payload["nbf"] = nbf
151        return payload
153    def _get_secret_key(self) -> str:
154        """
155        Returns the secret key used to sign the JWT.
156        """
157        secret_key: str = self._secret_key.eval(self.config, json_loads=json.loads)
158        return (
159            base64.b64encode(secret_key.encode()).decode()
160            if self._base64_encode_secret_key
161            else secret_key
162        )
164    def _get_signed_token(self) -> Union[str, Any]:
165        """
166        Signed the JWT using the provided secret key and algorithm and the generated headers and payload. For additional information on PyJWT see:
167        """
168        try:
169            return jwt.encode(
170                payload=self._get_jwt_payload(),
171                key=self._get_secret_key(),
172                algorithm=self._algorithm,
173                headers=self._get_jwt_headers(),
174            )
175        except Exception as e:
176            raise ValueError(f"Failed to sign token: {e}")
178    def _get_header_prefix(self) -> Union[str, None]:
179        """
180        Returns the header prefix to be used when attaching the token to the request.
181        """
182        return (
183            self._header_prefix.eval(self.config, json_loads=json.loads)
184            if self._header_prefix
185            else None
186        )
188    @property
189    def auth_header(self) -> str:
190        return "Authorization"
192    @property
193    def token(self) -> str:
194        return (
195            f"{self._get_header_prefix()} {self._get_signed_token()}"
196            if self._get_header_prefix()
197            else self._get_signed_token()
198        )

Generates a JSON Web Token (JWT) based on a declarative connector configuration file. The generated token is attached to each request via the Authorization header.

  • config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
  • secret_key (Union[InterpolatedString, str]): The secret key used to sign the JWT
  • algorithm (Union[str, JwtAlgorithm]): The algorithm used to sign the JWT
  • token_duration (Optional[int]): The duration in seconds for which the token is valid
  • base64_encode_secret_key (Optional[Union[InterpolatedBoolean, str, bool]]): Whether to base64 encode the secret key
  • header_prefix (Optional[Union[InterpolatedString, str]]): The prefix to add to the Authorization header
  • kid (Optional[Union[InterpolatedString, str]]): The key identifier to be included in the JWT header
  • typ (Optional[Union[InterpolatedString, str]]): The type of the JWT.
  • cty (Optional[Union[InterpolatedString, str]]): The content type of the JWT.
  • iss (Optional[Union[InterpolatedString, str]]): The issuer of the JWT.
  • sub (Optional[Union[InterpolatedString, str]]): The subject of the JWT.
  • aud (Optional[Union[InterpolatedString, str]]): The audience of the JWT.
  • additional_jwt_headers (Optional[Mapping[str, Any]]): Additional headers to include in the JWT.
  • additional_jwt_payload (Optional[Mapping[str, Any]]): Additional payload to include in the JWT.
JwtAuthenticator( config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], secret_key: Union[airbyte_cdk.InterpolatedString, str], algorithm: Union[str, airbyte_cdk.sources.declarative.auth.jwt.JwtAlgorithm], token_duration: Optional[int], base64_encode_secret_key: Union[airbyte_cdk.InterpolatedBoolean, str, bool, NoneType] = False, header_prefix: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, kid: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, typ: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, cty: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, iss: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, sub: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, aud: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, additional_jwt_headers: Optional[Mapping[str, Any]] = None, additional_jwt_payload: Optional[Mapping[str, Any]] = None)
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
secret_key: Union[airbyte_cdk.InterpolatedString, str]
token_duration: Optional[int]
base64_encode_secret_key: Union[airbyte_cdk.InterpolatedBoolean, str, bool, NoneType] = False
header_prefix: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
kid: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
typ: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
cty: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
iss: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
sub: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
aud: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
additional_jwt_headers: Optional[Mapping[str, Any]] = None
additional_jwt_payload: Optional[Mapping[str, Any]] = None
auth_header: str
188    @property
189    def auth_header(self) -> str:
190        return "Authorization"

HTTP header to set on the requests

token: str
192    @property
193    def token(self) -> str:
194        return (
195            f"{self._get_header_prefix()} {self._get_signed_token()}"
196            if self._get_header_prefix()
197            else self._get_signed_token()
198        )

The header value to set on outgoing HTTP requests