airbyte_cdk.sources.declarative.auth

1#
2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3#
4
5from airbyte_cdk.sources.declarative.auth.jwt import JwtAuthenticator
6from airbyte_cdk.sources.declarative.auth.oauth import DeclarativeOauth2Authenticator
7
8__all__ = ["DeclarativeOauth2Authenticator", "JwtAuthenticator"]
 24@dataclass
 25class DeclarativeOauth2Authenticator(AbstractOauth2Authenticator, DeclarativeAuthenticator):
 26    """
 27    Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials based on
 28    a declarative connector configuration file. Credentials can be defined explicitly or via interpolation
 29    at runtime. The generated access token is attached to each request via the Authorization header.
 30
 31    Attributes:
 32        token_refresh_endpoint (Union[InterpolatedString, str]): The endpoint to refresh the access token
 33        client_id (Union[InterpolatedString, str]): The client id
 34        client_secret (Union[InterpolatedString, str]): Client secret
 35        refresh_token (Union[InterpolatedString, str]): The token used to refresh the access token
 36        access_token_name (Union[InterpolatedString, str]): THe field to extract access token from in the response
 37        expires_in_name (Union[InterpolatedString, str]): The field to extract expires_in from in the response
 38        config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
 39        scopes (Optional[List[str]]): The scopes to request
 40        token_expiry_date (Optional[Union[InterpolatedString, str]]): The access token expiration date
 41        token_expiry_date_format str: format of the datetime; provide it if expires_in is returned in datetime instead of seconds
 42        token_expiry_is_time_of_expiration bool: set True it if expires_in is returned as time of expiration instead of the number seconds until expiration
 43        refresh_request_body (Optional[Mapping[str, Any]]): The request body to send in the refresh request
 44        refresh_request_headers (Optional[Mapping[str, Any]]): The request headers to send in the refresh request
 45        grant_type: The grant_type to request for access_token. If set to refresh_token, the refresh_token parameter has to be provided
 46        message_repository (MessageRepository): the message repository used to emit logs on HTTP requests
 47    """
 48
 49    config: Mapping[str, Any]
 50    parameters: InitVar[Mapping[str, Any]]
 51    client_id: Optional[Union[InterpolatedString, str]] = None
 52    client_secret: Optional[Union[InterpolatedString, str]] = None
 53    token_refresh_endpoint: Optional[Union[InterpolatedString, str]] = None
 54    refresh_token: Optional[Union[InterpolatedString, str]] = None
 55    scopes: Optional[List[str]] = None
 56    token_expiry_date: Optional[Union[InterpolatedString, str]] = None
 57    _token_expiry_date: Optional[AirbyteDateTime] = field(init=False, repr=False, default=None)
 58    token_expiry_date_format: Optional[str] = None
 59    token_expiry_is_time_of_expiration: bool = False
 60    access_token_name: Union[InterpolatedString, str] = "access_token"
 61    access_token_value: Optional[Union[InterpolatedString, str]] = None
 62    client_id_name: Union[InterpolatedString, str] = "client_id"
 63    client_secret_name: Union[InterpolatedString, str] = "client_secret"
 64    expires_in_name: Union[InterpolatedString, str] = "expires_in"
 65    refresh_token_name: Union[InterpolatedString, str] = "refresh_token"
 66    refresh_request_body: Optional[Mapping[str, Any]] = None
 67    refresh_request_headers: Optional[Mapping[str, Any]] = None
 68    grant_type_name: Union[InterpolatedString, str] = "grant_type"
 69    grant_type: Union[InterpolatedString, str] = "refresh_token"
 70    message_repository: MessageRepository = NoopMessageRepository()
 71    profile_assertion: Optional[DeclarativeAuthenticator] = None
 72    use_profile_assertion: Optional[Union[InterpolatedBoolean, str, bool]] = False
 73
 74    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 75        super().__init__()
 76        if self.token_refresh_endpoint is not None:
 77            self._token_refresh_endpoint: Optional[InterpolatedString] = InterpolatedString.create(
 78                self.token_refresh_endpoint, parameters=parameters
 79            )
 80        else:
 81            self._token_refresh_endpoint = None
 82        self._client_id_name = InterpolatedString.create(self.client_id_name, parameters=parameters)
 83        self._client_id = (
 84            InterpolatedString.create(self.client_id, parameters=parameters)
 85            if self.client_id
 86            else self.client_id
 87        )
 88        self._client_secret_name = InterpolatedString.create(
 89            self.client_secret_name, parameters=parameters
 90        )
 91        self._client_secret = (
 92            InterpolatedString.create(self.client_secret, parameters=parameters)
 93            if self.client_secret
 94            else self.client_secret
 95        )
 96        self._refresh_token_name = InterpolatedString.create(
 97            self.refresh_token_name, parameters=parameters
 98        )
 99        if self.refresh_token is not None:
100            self._refresh_token: Optional[InterpolatedString] = InterpolatedString.create(
101                self.refresh_token, parameters=parameters
102            )
103        else:
104            self._refresh_token = None
105        self.access_token_name = InterpolatedString.create(
106            self.access_token_name, parameters=parameters
107        )
108        self.expires_in_name = InterpolatedString.create(
109            self.expires_in_name, parameters=parameters
110        )
111        self.grant_type_name = InterpolatedString.create(
112            self.grant_type_name, parameters=parameters
113        )
114        self.grant_type = InterpolatedString.create(
115            "urn:ietf:params:oauth:grant-type:jwt-bearer"
116            if self.use_profile_assertion
117            else self.grant_type,
118            parameters=parameters,
119        )
120        self._refresh_request_body = InterpolatedMapping(
121            self.refresh_request_body or {}, parameters=parameters
122        )
123        self._refresh_request_headers = InterpolatedMapping(
124            self.refresh_request_headers or {}, parameters=parameters
125        )
126        try:
127            if (
128                isinstance(self.token_expiry_date, (int, str))
129                and str(self.token_expiry_date).isdigit()
130            ):
131                self._token_expiry_date = ab_datetime_parse(self.token_expiry_date)
132            else:
133                self._token_expiry_date = (
134                    ab_datetime_parse(
135                        InterpolatedString.create(
136                            self.token_expiry_date, parameters=parameters
137                        ).eval(self.config)
138                    )
139                    if self.token_expiry_date
140                    else ab_datetime_now() - timedelta(days=1)
141                )
142        except ValueError as e:
143            raise ValueError(f"Invalid token expiry date format: {e}")
144        self.use_profile_assertion = (
145            InterpolatedBoolean(self.use_profile_assertion, parameters=parameters)
146            if isinstance(self.use_profile_assertion, str)
147            else self.use_profile_assertion
148        )
149        self.assertion_name = "assertion"
150
151        if self.access_token_value is not None:
152            self._access_token_value = InterpolatedString.create(
153                self.access_token_value, parameters=parameters
154            ).eval(self.config)
155        else:
156            self._access_token_value = None
157
158        self._access_token: Optional[str] = (
159            self._access_token_value if self.access_token_value else None
160        )
161
162        if not self.use_profile_assertion and any(
163            client_creds is None for client_creds in [self.client_id, self.client_secret]
164        ):
165            raise ValueError(
166                "OAuthAuthenticator configuration error: Both 'client_id' and 'client_secret' are required for the "
167                "basic OAuth flow."
168            )
169        if self.profile_assertion is None and self.use_profile_assertion:
170            raise ValueError(
171                "OAuthAuthenticator configuration error: 'profile_assertion' is required when using the profile assertion flow."
172            )
173        if self.get_grant_type() == "refresh_token" and self._refresh_token is None:
174            raise ValueError(
175                "OAuthAuthenticator configuration error: A 'refresh_token' is required when the 'grant_type' is set to 'refresh_token'."
176            )
177
178    def get_token_refresh_endpoint(self) -> Optional[str]:
179        if self._token_refresh_endpoint is not None:
180            refresh_token_endpoint: str = self._token_refresh_endpoint.eval(self.config)
181            if not refresh_token_endpoint:
182                raise ValueError(
183                    "OAuthAuthenticator was unable to evaluate token_refresh_endpoint parameter"
184                )
185            return refresh_token_endpoint
186        return None
187
188    def get_client_id_name(self) -> str:
189        return self._client_id_name.eval(self.config)  # type: ignore # eval returns a string in this context
190
191    def get_client_id(self) -> str:
192        client_id = self._client_id.eval(self.config) if self._client_id else self._client_id
193        if not client_id:
194            raise ValueError("OAuthAuthenticator was unable to evaluate client_id parameter")
195        return client_id  # type: ignore # value will be returned as a string, or an error will be raised
196
197    def get_client_secret_name(self) -> str:
198        return self._client_secret_name.eval(self.config)  # type: ignore # eval returns a string in this context
199
200    def get_client_secret(self) -> str:
201        client_secret = (
202            self._client_secret.eval(self.config) if self._client_secret else self._client_secret
203        )
204        if not client_secret:
205            raise ValueError("OAuthAuthenticator was unable to evaluate client_secret parameter")
206        return client_secret  # type: ignore # value will be returned as a string, or an error will be raised
207
208    def get_refresh_token_name(self) -> str:
209        return self._refresh_token_name.eval(self.config)  # type: ignore # eval returns a string in this context
210
211    def get_refresh_token(self) -> Optional[str]:
212        return None if self._refresh_token is None else str(self._refresh_token.eval(self.config))
213
214    def get_scopes(self) -> List[str]:
215        return self.scopes or []
216
217    def get_access_token_name(self) -> str:
218        return self.access_token_name.eval(self.config)  # type: ignore # eval returns a string in this context
219
220    def get_expires_in_name(self) -> str:
221        return self.expires_in_name.eval(self.config)  # type: ignore # eval returns a string in this context
222
223    def get_grant_type_name(self) -> str:
224        return self.grant_type_name.eval(self.config)  # type: ignore # eval returns a string in this context
225
226    def get_grant_type(self) -> str:
227        return self.grant_type.eval(self.config)  # type: ignore # eval returns a string in this context
228
229    def get_refresh_request_body(self) -> Mapping[str, Any]:
230        return self._refresh_request_body.eval(self.config)
231
232    def get_refresh_request_headers(self) -> Mapping[str, Any]:
233        return self._refresh_request_headers.eval(self.config)
234
235    def get_token_expiry_date(self) -> AirbyteDateTime:
236        if not self._has_access_token_been_initialized():
237            return AirbyteDateTime.from_datetime(datetime.min)
238        return self._token_expiry_date  # type: ignore # _token_expiry_date is an AirbyteDateTime. It is never None despite what mypy thinks
239
240    def _has_access_token_been_initialized(self) -> bool:
241        return self._access_token is not None
242
243    def set_token_expiry_date(self, value: Union[str, int]) -> None:
244        self._token_expiry_date = self._parse_token_expiration_date(value)
245
246    def get_assertion_name(self) -> str:
247        return self.assertion_name
248
249    def get_assertion(self) -> str:
250        if self.profile_assertion is None:
251            raise ValueError("profile_assertion is not set")
252        return self.profile_assertion.token
253
254    def build_refresh_request_body(self) -> Mapping[str, Any]:
255        """
256        Returns the request body to set on the refresh request
257
258        Override to define additional parameters
259        """
260        if self.use_profile_assertion:
261            return {
262                self.get_grant_type_name(): self.get_grant_type(),
263                self.get_assertion_name(): self.get_assertion(),
264            }
265        return super().build_refresh_request_body()
266
267    @property
268    def access_token(self) -> str:
269        if self._access_token is None:
270            raise ValueError("access_token is not set")
271        return self._access_token
272
273    @access_token.setter
274    def access_token(self, value: str) -> None:
275        self._access_token = value
276
277    @property
278    def _message_repository(self) -> MessageRepository:
279        """
280        Overriding AbstractOauth2Authenticator._message_repository to allow for HTTP request logs
281        """
282        return self.message_repository

Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials based on a declarative connector configuration file. Credentials can be defined explicitly or via interpolation at runtime. The generated access token is attached to each request via the Authorization header.

Attributes:
  • token_refresh_endpoint (Union[InterpolatedString, str]): The endpoint to refresh the access token
  • client_id (Union[InterpolatedString, str]): The client id
  • client_secret (Union[InterpolatedString, str]): Client secret
  • refresh_token (Union[InterpolatedString, str]): The token used to refresh the access token
  • access_token_name (Union[InterpolatedString, str]): THe field to extract access token from in the response
  • expires_in_name (Union[InterpolatedString, str]): The field to extract expires_in from in the response
  • config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
  • scopes (Optional[List[str]]): The scopes to request
  • token_expiry_date (Optional[Union[InterpolatedString, str]]): The access token expiration date
  • token_expiry_date_format str: format of the datetime; provide it if expires_in is returned in datetime instead of seconds
  • token_expiry_is_time_of_expiration bool: set True it if expires_in is returned as time of expiration instead of the number seconds until expiration
  • refresh_request_body (Optional[Mapping[str, Any]]): The request body to send in the refresh request
  • refresh_request_headers (Optional[Mapping[str, Any]]): The request headers to send in the refresh request
  • grant_type: The grant_type to request for access_token. If set to refresh_token, the refresh_token parameter has to be provided
  • message_repository (MessageRepository): the message repository used to emit logs on HTTP requests
DeclarativeOauth2Authenticator( config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], client_id: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, client_secret: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, token_refresh_endpoint: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, refresh_token: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, scopes: Optional[List[str]] = None, token_expiry_date: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, token_expiry_date_format: Optional[str] = None, token_expiry_is_time_of_expiration: bool = False, access_token_name: Union[airbyte_cdk.InterpolatedString, str] = 'access_token', access_token_value: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, client_id_name: Union[airbyte_cdk.InterpolatedString, str] = 'client_id', client_secret_name: Union[airbyte_cdk.InterpolatedString, str] = 'client_secret', expires_in_name: Union[airbyte_cdk.InterpolatedString, str] = 'expires_in', refresh_token_name: Union[airbyte_cdk.InterpolatedString, str] = 'refresh_token', refresh_request_body: Optional[Mapping[str, Any]] = None, refresh_request_headers: Optional[Mapping[str, Any]] = None, grant_type_name: Union[airbyte_cdk.InterpolatedString, str] = 'grant_type', grant_type: Union[airbyte_cdk.InterpolatedString, str] = 'refresh_token', message_repository: airbyte_cdk.MessageRepository = <airbyte_cdk.sources.message.NoopMessageRepository object>, profile_assertion: Optional[airbyte_cdk.DeclarativeAuthenticator] = None, use_profile_assertion: Union[airbyte_cdk.InterpolatedBoolean, str, bool, NoneType] = False)

If all of refresh_token_error_status_codes, refresh_token_error_key, and refresh_token_error_values are set, then http errors with such params will be wrapped in AirbyteTracedException.

config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
client_id: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
client_secret: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
token_refresh_endpoint: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
refresh_token: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
scopes: Optional[List[str]] = None
token_expiry_date: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
token_expiry_date_format: Optional[str] = None

Format of the datetime; exists it if expires_in is returned as the expiration datetime instead of seconds until it expires

token_expiry_is_time_of_expiration: bool = False

Indicates that the Token Expiry returns the date until which the token will be valid, not the amount of time it will be valid.

access_token_name: Union[airbyte_cdk.InterpolatedString, str] = 'access_token'
access_token_value: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
client_id_name: Union[airbyte_cdk.InterpolatedString, str] = 'client_id'
client_secret_name: Union[airbyte_cdk.InterpolatedString, str] = 'client_secret'
expires_in_name: Union[airbyte_cdk.InterpolatedString, str] = 'expires_in'
refresh_token_name: Union[airbyte_cdk.InterpolatedString, str] = 'refresh_token'
refresh_request_body: Optional[Mapping[str, Any]] = None
refresh_request_headers: Optional[Mapping[str, Any]] = None
grant_type_name: Union[airbyte_cdk.InterpolatedString, str] = 'grant_type'
grant_type: Union[airbyte_cdk.InterpolatedString, str] = 'refresh_token'
profile_assertion: Optional[airbyte_cdk.DeclarativeAuthenticator] = None
use_profile_assertion: Union[airbyte_cdk.InterpolatedBoolean, str, bool, NoneType] = False
def get_token_refresh_endpoint(self) -> Optional[str]:
178    def get_token_refresh_endpoint(self) -> Optional[str]:
179        if self._token_refresh_endpoint is not None:
180            refresh_token_endpoint: str = self._token_refresh_endpoint.eval(self.config)
181            if not refresh_token_endpoint:
182                raise ValueError(
183                    "OAuthAuthenticator was unable to evaluate token_refresh_endpoint parameter"
184                )
185            return refresh_token_endpoint
186        return None

Returns the endpoint to refresh the access token

def get_client_id_name(self) -> str:
188    def get_client_id_name(self) -> str:
189        return self._client_id_name.eval(self.config)  # type: ignore # eval returns a string in this context

The client id name to authenticate

def get_client_id(self) -> str:
191    def get_client_id(self) -> str:
192        client_id = self._client_id.eval(self.config) if self._client_id else self._client_id
193        if not client_id:
194            raise ValueError("OAuthAuthenticator was unable to evaluate client_id parameter")
195        return client_id  # type: ignore # value will be returned as a string, or an error will be raised

The client id to authenticate

def get_client_secret_name(self) -> str:
197    def get_client_secret_name(self) -> str:
198        return self._client_secret_name.eval(self.config)  # type: ignore # eval returns a string in this context

The client secret name to authenticate

def get_client_secret(self) -> str:
200    def get_client_secret(self) -> str:
201        client_secret = (
202            self._client_secret.eval(self.config) if self._client_secret else self._client_secret
203        )
204        if not client_secret:
205            raise ValueError("OAuthAuthenticator was unable to evaluate client_secret parameter")
206        return client_secret  # type: ignore # value will be returned as a string, or an error will be raised

The client secret to authenticate

def get_refresh_token_name(self) -> str:
208    def get_refresh_token_name(self) -> str:
209        return self._refresh_token_name.eval(self.config)  # type: ignore # eval returns a string in this context

The refresh token name to authenticate

def get_refresh_token(self) -> Optional[str]:
211    def get_refresh_token(self) -> Optional[str]:
212        return None if self._refresh_token is None else str(self._refresh_token.eval(self.config))

The token used to refresh the access token when it expires

def get_scopes(self) -> List[str]:
214    def get_scopes(self) -> List[str]:
215        return self.scopes or []

List of requested scopes

def get_access_token_name(self) -> str:
217    def get_access_token_name(self) -> str:
218        return self.access_token_name.eval(self.config)  # type: ignore # eval returns a string in this context

Field to extract access token from in the response

def get_expires_in_name(self) -> str:
220    def get_expires_in_name(self) -> str:
221        return self.expires_in_name.eval(self.config)  # type: ignore # eval returns a string in this context

Returns the expires_in field name

def get_grant_type_name(self) -> str:
223    def get_grant_type_name(self) -> str:
224        return self.grant_type_name.eval(self.config)  # type: ignore # eval returns a string in this context

Returns grant_type specified name for requesting access_token

def get_grant_type(self) -> str:
226    def get_grant_type(self) -> str:
227        return self.grant_type.eval(self.config)  # type: ignore # eval returns a string in this context

Returns grant_type specified for requesting access_token

def get_refresh_request_body(self) -> Mapping[str, Any]:
229    def get_refresh_request_body(self) -> Mapping[str, Any]:
230        return self._refresh_request_body.eval(self.config)

Returns the request body to set on the refresh request

def get_refresh_request_headers(self) -> Mapping[str, Any]:
232    def get_refresh_request_headers(self) -> Mapping[str, Any]:
233        return self._refresh_request_headers.eval(self.config)

Returns the request headers to set on the refresh request

def get_token_expiry_date(self) -> airbyte_cdk.utils.datetime_helpers.AirbyteDateTime:
235    def get_token_expiry_date(self) -> AirbyteDateTime:
236        if not self._has_access_token_been_initialized():
237            return AirbyteDateTime.from_datetime(datetime.min)
238        return self._token_expiry_date  # type: ignore # _token_expiry_date is an AirbyteDateTime. It is never None despite what mypy thinks

Expiration date of the access token

def set_token_expiry_date(self, value: Union[str, int]) -> None:
243    def set_token_expiry_date(self, value: Union[str, int]) -> None:
244        self._token_expiry_date = self._parse_token_expiration_date(value)

Setter for access token expiration date

def get_assertion_name(self) -> str:
246    def get_assertion_name(self) -> str:
247        return self.assertion_name
def get_assertion(self) -> str:
249    def get_assertion(self) -> str:
250        if self.profile_assertion is None:
251            raise ValueError("profile_assertion is not set")
252        return self.profile_assertion.token
def build_refresh_request_body(self) -> Mapping[str, Any]:
254    def build_refresh_request_body(self) -> Mapping[str, Any]:
255        """
256        Returns the request body to set on the refresh request
257
258        Override to define additional parameters
259        """
260        if self.use_profile_assertion:
261            return {
262                self.get_grant_type_name(): self.get_grant_type(),
263                self.get_assertion_name(): self.get_assertion(),
264            }
265        return super().build_refresh_request_body()

Returns the request body to set on the refresh request

Override to define additional parameters

access_token: str
267    @property
268    def access_token(self) -> str:
269        if self._access_token is None:
270            raise ValueError("access_token is not set")
271        return self._access_token

Returns the access token

 41@dataclass
 42class JwtAuthenticator(DeclarativeAuthenticator):
 43    """
 44    Generates a JSON Web Token (JWT) based on a declarative connector configuration file. The generated token is attached to each request via the Authorization header.
 45
 46    Attributes:
 47        config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
 48        secret_key (Union[InterpolatedString, str]): The secret key used to sign the JWT
 49        algorithm (Union[str, JwtAlgorithm]): The algorithm used to sign the JWT
 50        token_duration (Optional[int]): The duration in seconds for which the token is valid
 51        base64_encode_secret_key (Optional[Union[InterpolatedBoolean, str, bool]]): Whether to base64 encode the secret key
 52        header_prefix (Optional[Union[InterpolatedString, str]]): The prefix to add to the Authorization header
 53        kid (Optional[Union[InterpolatedString, str]]): The key identifier to be included in the JWT header
 54        typ (Optional[Union[InterpolatedString, str]]): The type of the JWT.
 55        cty (Optional[Union[InterpolatedString, str]]): The content type of the JWT.
 56        iss (Optional[Union[InterpolatedString, str]]): The issuer of the JWT.
 57        sub (Optional[Union[InterpolatedString, str]]): The subject of the JWT.
 58        aud (Optional[Union[InterpolatedString, str]]): The audience of the JWT.
 59        additional_jwt_headers (Optional[Mapping[str, Any]]): Additional headers to include in the JWT.
 60        additional_jwt_payload (Optional[Mapping[str, Any]]): Additional payload to include in the JWT.
 61    """
 62
 63    config: Mapping[str, Any]
 64    parameters: InitVar[Mapping[str, Any]]
 65    secret_key: Union[InterpolatedString, str]
 66    algorithm: Union[str, JwtAlgorithm]
 67    token_duration: Optional[int]
 68    base64_encode_secret_key: Optional[Union[InterpolatedBoolean, str, bool]] = False
 69    header_prefix: Optional[Union[InterpolatedString, str]] = None
 70    kid: Optional[Union[InterpolatedString, str]] = None
 71    typ: Optional[Union[InterpolatedString, str]] = None
 72    cty: Optional[Union[InterpolatedString, str]] = None
 73    iss: Optional[Union[InterpolatedString, str]] = None
 74    sub: Optional[Union[InterpolatedString, str]] = None
 75    aud: Optional[Union[InterpolatedString, str]] = None
 76    additional_jwt_headers: Optional[Mapping[str, Any]] = None
 77    additional_jwt_payload: Optional[Mapping[str, Any]] = None
 78
 79    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 80        self._secret_key = InterpolatedString.create(self.secret_key, parameters=parameters)
 81        self._algorithm = (
 82            JwtAlgorithm(self.algorithm) if isinstance(self.algorithm, str) else self.algorithm
 83        )
 84        self._base64_encode_secret_key = (
 85            InterpolatedBoolean(self.base64_encode_secret_key, parameters=parameters)
 86            if isinstance(self.base64_encode_secret_key, str)
 87            else self.base64_encode_secret_key
 88        )
 89        self._token_duration = self.token_duration
 90        self._header_prefix = (
 91            InterpolatedString.create(self.header_prefix, parameters=parameters)
 92            if self.header_prefix
 93            else None
 94        )
 95        self._kid = InterpolatedString.create(self.kid, parameters=parameters) if self.kid else None
 96        self._typ = InterpolatedString.create(self.typ, parameters=parameters) if self.typ else None
 97        self._cty = InterpolatedString.create(self.cty, parameters=parameters) if self.cty else None
 98        self._iss = InterpolatedString.create(self.iss, parameters=parameters) if self.iss else None
 99        self._sub = InterpolatedString.create(self.sub, parameters=parameters) if self.sub else None
100        self._aud = InterpolatedString.create(self.aud, parameters=parameters) if self.aud else None
101        self._additional_jwt_headers = InterpolatedMapping(
102            self.additional_jwt_headers or {}, parameters=parameters
103        )
104        self._additional_jwt_payload = InterpolatedMapping(
105            self.additional_jwt_payload or {}, parameters=parameters
106        )
107
108    def _get_jwt_headers(self) -> dict[str, Any]:
109        """
110        Builds and returns the headers used when signing the JWT.
111        """
112        headers = self._additional_jwt_headers.eval(self.config, json_loads=json.loads)
113        if any(prop in headers for prop in ["kid", "alg", "typ", "cty"]):
114            raise ValueError(
115                "'kid', 'alg', 'typ', 'cty' are reserved headers and should not be set as part of 'additional_jwt_headers'"
116            )
117
118        if self._kid:
119            headers["kid"] = self._kid.eval(self.config, json_loads=json.loads)
120        if self._typ:
121            headers["typ"] = self._typ.eval(self.config, json_loads=json.loads)
122        if self._cty:
123            headers["cty"] = self._cty.eval(self.config, json_loads=json.loads)
124        headers["alg"] = self._algorithm
125        return headers
126
127    def _get_jwt_payload(self) -> dict[str, Any]:
128        """
129        Builds and returns the payload used when signing the JWT.
130        """
131        now = int(datetime.now().timestamp())
132        exp = now + self._token_duration if isinstance(self._token_duration, int) else now
133        nbf = now
134
135        payload = self._additional_jwt_payload.eval(self.config, json_loads=json.loads)
136        if any(prop in payload for prop in ["iss", "sub", "aud", "iat", "exp", "nbf"]):
137            raise ValueError(
138                "'iss', 'sub', 'aud', 'iat', 'exp', 'nbf' are reserved properties and should not be set as part of 'additional_jwt_payload'"
139            )
140
141        if self._iss:
142            payload["iss"] = self._iss.eval(self.config, json_loads=json.loads)
143        if self._sub:
144            payload["sub"] = self._sub.eval(self.config, json_loads=json.loads)
145        if self._aud:
146            payload["aud"] = self._aud.eval(self.config, json_loads=json.loads)
147
148        payload["iat"] = now
149        payload["exp"] = exp
150        payload["nbf"] = nbf
151        return payload
152
153    def _get_secret_key(self) -> str:
154        """
155        Returns the secret key used to sign the JWT.
156        """
157        secret_key: str = self._secret_key.eval(self.config, json_loads=json.loads)
158        return (
159            base64.b64encode(secret_key.encode()).decode()
160            if self._base64_encode_secret_key
161            else secret_key
162        )
163
164    def _get_signed_token(self) -> Union[str, Any]:
165        """
166        Signed the JWT using the provided secret key and algorithm and the generated headers and payload. For additional information on PyJWT see: https://pyjwt.readthedocs.io/en/stable/
167        """
168        try:
169            return jwt.encode(
170                payload=self._get_jwt_payload(),
171                key=self._get_secret_key(),
172                algorithm=self._algorithm,
173                headers=self._get_jwt_headers(),
174            )
175        except Exception as e:
176            raise ValueError(f"Failed to sign token: {e}")
177
178    def _get_header_prefix(self) -> Union[str, None]:
179        """
180        Returns the header prefix to be used when attaching the token to the request.
181        """
182        return (
183            self._header_prefix.eval(self.config, json_loads=json.loads)
184            if self._header_prefix
185            else None
186        )
187
188    @property
189    def auth_header(self) -> str:
190        return "Authorization"
191
192    @property
193    def token(self) -> str:
194        return (
195            f"{self._get_header_prefix()} {self._get_signed_token()}"
196            if self._get_header_prefix()
197            else self._get_signed_token()
198        )

Generates a JSON Web Token (JWT) based on a declarative connector configuration file. The generated token is attached to each request via the Authorization header.

Attributes:
  • config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
  • secret_key (Union[InterpolatedString, str]): The secret key used to sign the JWT
  • algorithm (Union[str, JwtAlgorithm]): The algorithm used to sign the JWT
  • token_duration (Optional[int]): The duration in seconds for which the token is valid
  • base64_encode_secret_key (Optional[Union[InterpolatedBoolean, str, bool]]): Whether to base64 encode the secret key
  • header_prefix (Optional[Union[InterpolatedString, str]]): The prefix to add to the Authorization header
  • kid (Optional[Union[InterpolatedString, str]]): The key identifier to be included in the JWT header
  • typ (Optional[Union[InterpolatedString, str]]): The type of the JWT.
  • cty (Optional[Union[InterpolatedString, str]]): The content type of the JWT.
  • iss (Optional[Union[InterpolatedString, str]]): The issuer of the JWT.
  • sub (Optional[Union[InterpolatedString, str]]): The subject of the JWT.
  • aud (Optional[Union[InterpolatedString, str]]): The audience of the JWT.
  • additional_jwt_headers (Optional[Mapping[str, Any]]): Additional headers to include in the JWT.
  • additional_jwt_payload (Optional[Mapping[str, Any]]): Additional payload to include in the JWT.
JwtAuthenticator( config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], secret_key: Union[airbyte_cdk.InterpolatedString, str], algorithm: Union[str, airbyte_cdk.sources.declarative.auth.jwt.JwtAlgorithm], token_duration: Optional[int], base64_encode_secret_key: Union[airbyte_cdk.InterpolatedBoolean, str, bool, NoneType] = False, header_prefix: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, kid: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, typ: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, cty: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, iss: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, sub: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, aud: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, additional_jwt_headers: Optional[Mapping[str, Any]] = None, additional_jwt_payload: Optional[Mapping[str, Any]] = None)
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
secret_key: Union[airbyte_cdk.InterpolatedString, str]
token_duration: Optional[int]
base64_encode_secret_key: Union[airbyte_cdk.InterpolatedBoolean, str, bool, NoneType] = False
header_prefix: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
kid: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
typ: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
cty: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
iss: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
sub: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
aud: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
additional_jwt_headers: Optional[Mapping[str, Any]] = None
additional_jwt_payload: Optional[Mapping[str, Any]] = None
auth_header: str
188    @property
189    def auth_header(self) -> str:
190        return "Authorization"

HTTP header to set on the requests

token: str
192    @property
193    def token(self) -> str:
194        return (
195            f"{self._get_header_prefix()} {self._get_signed_token()}"
196            if self._get_header_prefix()
197            else self._get_signed_token()
198        )

The header value to set on outgoing HTTP requests