airbyte_cdk.sources.declarative.auth

1#
2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3#
4
5from airbyte_cdk.sources.declarative.auth.jwt import JwtAuthenticator
6from airbyte_cdk.sources.declarative.auth.oauth import DeclarativeOauth2Authenticator
7
8__all__ = ["DeclarativeOauth2Authenticator", "JwtAuthenticator"]
 27@dataclass
 28class DeclarativeOauth2Authenticator(AbstractOauth2Authenticator, DeclarativeAuthenticator):
 29    """
 30    Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials based on
 31    a declarative connector configuration file. Credentials can be defined explicitly or via interpolation
 32    at runtime. The generated access token is attached to each request via the Authorization header.
 33
 34    Attributes:
 35        token_refresh_endpoint (Union[InterpolatedString, str]): The endpoint to refresh the access token
 36        client_id (Union[InterpolatedString, str]): The client id
 37        client_secret (Union[InterpolatedString, str]): Client secret (can be empty for APIs that support this)
 38        refresh_token (Union[InterpolatedString, str]): The token used to refresh the access token
 39        access_token_name (Union[InterpolatedString, str]): THe field to extract access token from in the response
 40        expires_in_name (Union[InterpolatedString, str]): The field to extract expires_in from in the response
 41        config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
 42        scopes (Optional[List[str]]): The scopes to request
 43        token_expiry_date (Optional[Union[InterpolatedString, str]]): The access token expiration date
 44        token_expiry_date_format str: format of the datetime; provide it if expires_in is returned in datetime instead of seconds
 45        token_expiry_is_time_of_expiration bool: set True it if expires_in is returned as time of expiration instead of the number seconds until expiration
 46        refresh_request_body (Optional[Mapping[str, Any]]): The request body to send in the refresh request
 47        refresh_request_headers (Optional[Mapping[str, Any]]): The request headers to send in the refresh request
 48        grant_type: The grant_type to request for access_token. If set to refresh_token, the refresh_token parameter has to be provided
 49        message_repository (MessageRepository): the message repository used to emit logs on HTTP requests
 50    """
 51
 52    config: Mapping[str, Any]
 53    parameters: InitVar[Mapping[str, Any]]
 54    client_id: Optional[Union[InterpolatedString, str]] = None
 55    client_secret: Optional[Union[InterpolatedString, str]] = None
 56    token_refresh_endpoint: Optional[Union[InterpolatedString, str]] = None
 57    refresh_token: Optional[Union[InterpolatedString, str]] = None
 58    scopes: Optional[List[str]] = None
 59    token_expiry_date: Optional[Union[InterpolatedString, str]] = None
 60    _token_expiry_date: Optional[AirbyteDateTime] = field(init=False, repr=False, default=None)
 61    token_expiry_date_format: Optional[str] = None
 62    token_expiry_is_time_of_expiration: bool = False
 63    access_token_name: Union[InterpolatedString, str] = "access_token"
 64    access_token_value: Optional[Union[InterpolatedString, str]] = None
 65    client_id_name: Union[InterpolatedString, str] = "client_id"
 66    client_secret_name: Union[InterpolatedString, str] = "client_secret"
 67    expires_in_name: Union[InterpolatedString, str] = "expires_in"
 68    refresh_token_name: Union[InterpolatedString, str] = "refresh_token"
 69    refresh_request_body: Optional[Mapping[str, Any]] = None
 70    refresh_request_headers: Optional[Mapping[str, Any]] = None
 71    grant_type_name: Union[InterpolatedString, str] = "grant_type"
 72    grant_type: Union[InterpolatedString, str] = "refresh_token"
 73    message_repository: MessageRepository = NoopMessageRepository()
 74    profile_assertion: Optional[DeclarativeAuthenticator] = None
 75    use_profile_assertion: Optional[Union[InterpolatedBoolean, str, bool]] = False
 76
 77    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 78        super().__init__()
 79        if self.token_refresh_endpoint is not None:
 80            self._token_refresh_endpoint: Optional[InterpolatedString] = InterpolatedString.create(
 81                self.token_refresh_endpoint, parameters=parameters
 82            )
 83        else:
 84            self._token_refresh_endpoint = None
 85        self._client_id_name = InterpolatedString.create(self.client_id_name, parameters=parameters)
 86        self._client_id = (
 87            InterpolatedString.create(self.client_id, parameters=parameters)
 88            if self.client_id
 89            else self.client_id
 90        )
 91        self._client_secret_name = InterpolatedString.create(
 92            self.client_secret_name, parameters=parameters
 93        )
 94        self._client_secret = (
 95            InterpolatedString.create(self.client_secret, parameters=parameters)
 96            if self.client_secret
 97            else self.client_secret
 98        )
 99        self._refresh_token_name = InterpolatedString.create(
100            self.refresh_token_name, parameters=parameters
101        )
102        if self.refresh_token is not None:
103            self._refresh_token: Optional[InterpolatedString] = InterpolatedString.create(
104                self.refresh_token, parameters=parameters
105            )
106        else:
107            self._refresh_token = None
108        self.access_token_name = InterpolatedString.create(
109            self.access_token_name, parameters=parameters
110        )
111        self.expires_in_name = InterpolatedString.create(
112            self.expires_in_name, parameters=parameters
113        )
114        self.grant_type_name = InterpolatedString.create(
115            self.grant_type_name, parameters=parameters
116        )
117        self.grant_type = InterpolatedString.create(
118            "urn:ietf:params:oauth:grant-type:jwt-bearer"
119            if self.use_profile_assertion
120            else self.grant_type,
121            parameters=parameters,
122        )
123        self._refresh_request_body = InterpolatedMapping(
124            self.refresh_request_body or {}, parameters=parameters
125        )
126        self._refresh_request_headers = InterpolatedMapping(
127            self.refresh_request_headers or {}, parameters=parameters
128        )
129        try:
130            if (
131                isinstance(self.token_expiry_date, (int, str))
132                and str(self.token_expiry_date).isdigit()
133            ):
134                self._token_expiry_date = ab_datetime_parse(self.token_expiry_date)
135            else:
136                self._token_expiry_date = (
137                    ab_datetime_parse(
138                        InterpolatedString.create(
139                            self.token_expiry_date, parameters=parameters
140                        ).eval(self.config)
141                    )
142                    if self.token_expiry_date
143                    else ab_datetime_now() - timedelta(days=1)
144                )
145        except ValueError as e:
146            raise ValueError(f"Invalid token expiry date format: {e}")
147        self.use_profile_assertion = (
148            InterpolatedBoolean(self.use_profile_assertion, parameters=parameters)
149            if isinstance(self.use_profile_assertion, str)
150            else self.use_profile_assertion
151        )
152        self.assertion_name = "assertion"
153
154        if self.access_token_value is not None:
155            self._access_token_value = InterpolatedString.create(
156                self.access_token_value, parameters=parameters
157            ).eval(self.config)
158        else:
159            self._access_token_value = None
160
161        self._access_token: Optional[str] = (
162            self._access_token_value if self.access_token_value else None
163        )
164
165        if not self.use_profile_assertion and any(
166            client_creds is None for client_creds in [self.client_id, self.client_secret]
167        ):
168            raise ValueError(
169                "OAuthAuthenticator configuration error: Both 'client_id' and 'client_secret' are required for the "
170                "basic OAuth flow."
171            )
172        if self.profile_assertion is None and self.use_profile_assertion:
173            raise ValueError(
174                "OAuthAuthenticator configuration error: 'profile_assertion' is required when using the profile assertion flow."
175            )
176        if self.get_grant_type() == "refresh_token" and self._refresh_token is None:
177            raise ValueError(
178                "OAuthAuthenticator configuration error: A 'refresh_token' is required when the 'grant_type' is set to 'refresh_token'."
179            )
180
181    def get_token_refresh_endpoint(self) -> Optional[str]:
182        if self._token_refresh_endpoint is not None:
183            refresh_token_endpoint: str = self._token_refresh_endpoint.eval(self.config)
184            if not refresh_token_endpoint:
185                raise ValueError(
186                    "OAuthAuthenticator was unable to evaluate token_refresh_endpoint parameter"
187                )
188            return refresh_token_endpoint
189        return None
190
191    def get_client_id_name(self) -> str:
192        return self._client_id_name.eval(self.config)  # type: ignore # eval returns a string in this context
193
194    def get_client_id(self) -> str:
195        client_id = self._client_id.eval(self.config) if self._client_id else self._client_id
196        if not client_id:
197            raise ValueError("OAuthAuthenticator was unable to evaluate client_id parameter")
198        return client_id  # type: ignore # value will be returned as a string, or an error will be raised
199
200    def get_client_secret_name(self) -> str:
201        return self._client_secret_name.eval(self.config)  # type: ignore # eval returns a string in this context
202
203    def get_client_secret(self) -> str:
204        client_secret = (
205            self._client_secret.eval(self.config) if self._client_secret else self._client_secret
206        )
207        if not client_secret:
208            # We've seen some APIs allowing empty client_secret so we will only log here
209            logger.warning(
210                "OAuthAuthenticator was unable to evaluate client_secret parameter hence it'll be empty"
211            )
212        return client_secret  # type: ignore # value will be returned as a string, which might be empty
213
214    def get_refresh_token_name(self) -> str:
215        return self._refresh_token_name.eval(self.config)  # type: ignore # eval returns a string in this context
216
217    def get_refresh_token(self) -> Optional[str]:
218        return None if self._refresh_token is None else str(self._refresh_token.eval(self.config))
219
220    def get_scopes(self) -> List[str]:
221        return self.scopes or []
222
223    def get_access_token_name(self) -> str:
224        return self.access_token_name.eval(self.config)  # type: ignore # eval returns a string in this context
225
226    def get_expires_in_name(self) -> str:
227        return self.expires_in_name.eval(self.config)  # type: ignore # eval returns a string in this context
228
229    def get_grant_type_name(self) -> str:
230        return self.grant_type_name.eval(self.config)  # type: ignore # eval returns a string in this context
231
232    def get_grant_type(self) -> str:
233        return self.grant_type.eval(self.config)  # type: ignore # eval returns a string in this context
234
235    def get_refresh_request_body(self) -> Mapping[str, Any]:
236        return self._refresh_request_body.eval(self.config)
237
238    def get_refresh_request_headers(self) -> Mapping[str, Any]:
239        return self._refresh_request_headers.eval(self.config)
240
241    def get_token_expiry_date(self) -> AirbyteDateTime:
242        if not self._has_access_token_been_initialized():
243            return AirbyteDateTime.from_datetime(datetime.min)
244        return self._token_expiry_date  # type: ignore # _token_expiry_date is an AirbyteDateTime. It is never None despite what mypy thinks
245
246    def _has_access_token_been_initialized(self) -> bool:
247        return self._access_token is not None
248
249    def set_token_expiry_date(self, value: AirbyteDateTime) -> None:
250        self._token_expiry_date = value
251
252    def get_assertion_name(self) -> str:
253        return self.assertion_name
254
255    def get_assertion(self) -> str:
256        if self.profile_assertion is None:
257            raise ValueError("profile_assertion is not set")
258        return self.profile_assertion.token
259
260    def build_refresh_request_body(self) -> Mapping[str, Any]:
261        """
262        Returns the request body to set on the refresh request
263
264        Override to define additional parameters
265        """
266        if self.use_profile_assertion:
267            return {
268                self.get_grant_type_name(): self.get_grant_type(),
269                self.get_assertion_name(): self.get_assertion(),
270            }
271        return super().build_refresh_request_body()
272
273    @property
274    def access_token(self) -> str:
275        if self._access_token is None:
276            raise ValueError("access_token is not set")
277        return self._access_token
278
279    @access_token.setter
280    def access_token(self, value: str) -> None:
281        self._access_token = value
282
283    @property
284    def _message_repository(self) -> MessageRepository:
285        """
286        Overriding AbstractOauth2Authenticator._message_repository to allow for HTTP request logs
287        """
288        return self.message_repository

Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials based on a declarative connector configuration file. Credentials can be defined explicitly or via interpolation at runtime. The generated access token is attached to each request via the Authorization header.

Attributes:
  • token_refresh_endpoint (Union[InterpolatedString, str]): The endpoint to refresh the access token
  • client_id (Union[InterpolatedString, str]): The client id
  • client_secret (Union[InterpolatedString, str]): Client secret (can be empty for APIs that support this)
  • refresh_token (Union[InterpolatedString, str]): The token used to refresh the access token
  • access_token_name (Union[InterpolatedString, str]): THe field to extract access token from in the response
  • expires_in_name (Union[InterpolatedString, str]): The field to extract expires_in from in the response
  • config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
  • scopes (Optional[List[str]]): The scopes to request
  • token_expiry_date (Optional[Union[InterpolatedString, str]]): The access token expiration date
  • token_expiry_date_format str: format of the datetime; provide it if expires_in is returned in datetime instead of seconds
  • token_expiry_is_time_of_expiration bool: set True it if expires_in is returned as time of expiration instead of the number seconds until expiration
  • refresh_request_body (Optional[Mapping[str, Any]]): The request body to send in the refresh request
  • refresh_request_headers (Optional[Mapping[str, Any]]): The request headers to send in the refresh request
  • grant_type: The grant_type to request for access_token. If set to refresh_token, the refresh_token parameter has to be provided
  • message_repository (MessageRepository): the message repository used to emit logs on HTTP requests
DeclarativeOauth2Authenticator( config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], client_id: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, client_secret: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, token_refresh_endpoint: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, refresh_token: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, scopes: Optional[List[str]] = None, token_expiry_date: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, token_expiry_date_format: Optional[str] = None, token_expiry_is_time_of_expiration: bool = False, access_token_name: Union[airbyte_cdk.InterpolatedString, str] = 'access_token', access_token_value: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, client_id_name: Union[airbyte_cdk.InterpolatedString, str] = 'client_id', client_secret_name: Union[airbyte_cdk.InterpolatedString, str] = 'client_secret', expires_in_name: Union[airbyte_cdk.InterpolatedString, str] = 'expires_in', refresh_token_name: Union[airbyte_cdk.InterpolatedString, str] = 'refresh_token', refresh_request_body: Optional[Mapping[str, Any]] = None, refresh_request_headers: Optional[Mapping[str, Any]] = None, grant_type_name: Union[airbyte_cdk.InterpolatedString, str] = 'grant_type', grant_type: Union[airbyte_cdk.InterpolatedString, str] = 'refresh_token', message_repository: airbyte_cdk.MessageRepository = <airbyte_cdk.sources.message.NoopMessageRepository object>, profile_assertion: Optional[airbyte_cdk.DeclarativeAuthenticator] = None, use_profile_assertion: Union[airbyte_cdk.InterpolatedBoolean, str, bool, NoneType] = False)

If all of refresh_token_error_status_codes, refresh_token_error_key, and refresh_token_error_values are set, then http errors with such params will be wrapped in AirbyteTracedException.

config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
client_id: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
client_secret: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
token_refresh_endpoint: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
refresh_token: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
scopes: Optional[List[str]] = None
token_expiry_date: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
token_expiry_date_format: Optional[str] = None

Format of the datetime; exists it if expires_in is returned as the expiration datetime instead of seconds until it expires

token_expiry_is_time_of_expiration: bool = False

Indicates that the Token Expiry returns the date until which the token will be valid, not the amount of time it will be valid.

access_token_name: Union[airbyte_cdk.InterpolatedString, str] = 'access_token'
access_token_value: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
client_id_name: Union[airbyte_cdk.InterpolatedString, str] = 'client_id'
client_secret_name: Union[airbyte_cdk.InterpolatedString, str] = 'client_secret'
expires_in_name: Union[airbyte_cdk.InterpolatedString, str] = 'expires_in'
refresh_token_name: Union[airbyte_cdk.InterpolatedString, str] = 'refresh_token'
refresh_request_body: Optional[Mapping[str, Any]] = None
refresh_request_headers: Optional[Mapping[str, Any]] = None
grant_type_name: Union[airbyte_cdk.InterpolatedString, str] = 'grant_type'
grant_type: Union[airbyte_cdk.InterpolatedString, str] = 'refresh_token'
profile_assertion: Optional[airbyte_cdk.DeclarativeAuthenticator] = None
use_profile_assertion: Union[airbyte_cdk.InterpolatedBoolean, str, bool, NoneType] = False
def get_token_refresh_endpoint(self) -> Optional[str]:
181    def get_token_refresh_endpoint(self) -> Optional[str]:
182        if self._token_refresh_endpoint is not None:
183            refresh_token_endpoint: str = self._token_refresh_endpoint.eval(self.config)
184            if not refresh_token_endpoint:
185                raise ValueError(
186                    "OAuthAuthenticator was unable to evaluate token_refresh_endpoint parameter"
187                )
188            return refresh_token_endpoint
189        return None

Returns the endpoint to refresh the access token

def get_client_id_name(self) -> str:
191    def get_client_id_name(self) -> str:
192        return self._client_id_name.eval(self.config)  # type: ignore # eval returns a string in this context

The client id name to authenticate

def get_client_id(self) -> str:
194    def get_client_id(self) -> str:
195        client_id = self._client_id.eval(self.config) if self._client_id else self._client_id
196        if not client_id:
197            raise ValueError("OAuthAuthenticator was unable to evaluate client_id parameter")
198        return client_id  # type: ignore # value will be returned as a string, or an error will be raised

The client id to authenticate

def get_client_secret_name(self) -> str:
200    def get_client_secret_name(self) -> str:
201        return self._client_secret_name.eval(self.config)  # type: ignore # eval returns a string in this context

The client secret name to authenticate

def get_client_secret(self) -> str:
203    def get_client_secret(self) -> str:
204        client_secret = (
205            self._client_secret.eval(self.config) if self._client_secret else self._client_secret
206        )
207        if not client_secret:
208            # We've seen some APIs allowing empty client_secret so we will only log here
209            logger.warning(
210                "OAuthAuthenticator was unable to evaluate client_secret parameter hence it'll be empty"
211            )
212        return client_secret  # type: ignore # value will be returned as a string, which might be empty

The client secret to authenticate

def get_refresh_token_name(self) -> str:
214    def get_refresh_token_name(self) -> str:
215        return self._refresh_token_name.eval(self.config)  # type: ignore # eval returns a string in this context

The refresh token name to authenticate

def get_refresh_token(self) -> Optional[str]:
217    def get_refresh_token(self) -> Optional[str]:
218        return None if self._refresh_token is None else str(self._refresh_token.eval(self.config))

The token used to refresh the access token when it expires

def get_scopes(self) -> List[str]:
220    def get_scopes(self) -> List[str]:
221        return self.scopes or []

List of requested scopes

def get_access_token_name(self) -> str:
223    def get_access_token_name(self) -> str:
224        return self.access_token_name.eval(self.config)  # type: ignore # eval returns a string in this context

Field to extract access token from in the response

def get_expires_in_name(self) -> str:
226    def get_expires_in_name(self) -> str:
227        return self.expires_in_name.eval(self.config)  # type: ignore # eval returns a string in this context

Returns the expires_in field name

def get_grant_type_name(self) -> str:
229    def get_grant_type_name(self) -> str:
230        return self.grant_type_name.eval(self.config)  # type: ignore # eval returns a string in this context

Returns grant_type specified name for requesting access_token

def get_grant_type(self) -> str:
232    def get_grant_type(self) -> str:
233        return self.grant_type.eval(self.config)  # type: ignore # eval returns a string in this context

Returns grant_type specified for requesting access_token

def get_refresh_request_body(self) -> Mapping[str, Any]:
235    def get_refresh_request_body(self) -> Mapping[str, Any]:
236        return self._refresh_request_body.eval(self.config)

Returns the request body to set on the refresh request

def get_refresh_request_headers(self) -> Mapping[str, Any]:
238    def get_refresh_request_headers(self) -> Mapping[str, Any]:
239        return self._refresh_request_headers.eval(self.config)

Returns the request headers to set on the refresh request

def get_token_expiry_date(self) -> airbyte_cdk.utils.datetime_helpers.AirbyteDateTime:
241    def get_token_expiry_date(self) -> AirbyteDateTime:
242        if not self._has_access_token_been_initialized():
243            return AirbyteDateTime.from_datetime(datetime.min)
244        return self._token_expiry_date  # type: ignore # _token_expiry_date is an AirbyteDateTime. It is never None despite what mypy thinks

Expiration date of the access token

def set_token_expiry_date(self, value: airbyte_cdk.utils.datetime_helpers.AirbyteDateTime) -> None:
249    def set_token_expiry_date(self, value: AirbyteDateTime) -> None:
250        self._token_expiry_date = value

Setter for access token expiration date

def get_assertion_name(self) -> str:
252    def get_assertion_name(self) -> str:
253        return self.assertion_name
def get_assertion(self) -> str:
255    def get_assertion(self) -> str:
256        if self.profile_assertion is None:
257            raise ValueError("profile_assertion is not set")
258        return self.profile_assertion.token
def build_refresh_request_body(self) -> Mapping[str, Any]:
260    def build_refresh_request_body(self) -> Mapping[str, Any]:
261        """
262        Returns the request body to set on the refresh request
263
264        Override to define additional parameters
265        """
266        if self.use_profile_assertion:
267            return {
268                self.get_grant_type_name(): self.get_grant_type(),
269                self.get_assertion_name(): self.get_assertion(),
270            }
271        return super().build_refresh_request_body()

Returns the request body to set on the refresh request

Override to define additional parameters

access_token: str
273    @property
274    def access_token(self) -> str:
275        if self._access_token is None:
276            raise ValueError("access_token is not set")
277        return self._access_token

Returns the access token

 41@dataclass
 42class JwtAuthenticator(DeclarativeAuthenticator):
 43    """
 44    Generates a JSON Web Token (JWT) based on a declarative connector configuration file. The generated token is attached to each request via the Authorization header.
 45
 46    Attributes:
 47        config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
 48        secret_key (Union[InterpolatedString, str]): The secret key used to sign the JWT
 49        algorithm (Union[str, JwtAlgorithm]): The algorithm used to sign the JWT
 50        token_duration (Optional[int]): The duration in seconds for which the token is valid
 51        base64_encode_secret_key (Optional[Union[InterpolatedBoolean, str, bool]]): Whether to base64 encode the secret key
 52        header_prefix (Optional[Union[InterpolatedString, str]]): The prefix to add to the Authorization header
 53        kid (Optional[Union[InterpolatedString, str]]): The key identifier to be included in the JWT header
 54        typ (Optional[Union[InterpolatedString, str]]): The type of the JWT.
 55        cty (Optional[Union[InterpolatedString, str]]): The content type of the JWT.
 56        iss (Optional[Union[InterpolatedString, str]]): The issuer of the JWT.
 57        sub (Optional[Union[InterpolatedString, str]]): The subject of the JWT.
 58        aud (Optional[Union[InterpolatedString, str]]): The audience of the JWT.
 59        additional_jwt_headers (Optional[Mapping[str, Any]]): Additional headers to include in the JWT.
 60        additional_jwt_payload (Optional[Mapping[str, Any]]): Additional payload to include in the JWT.
 61    """
 62
 63    config: Mapping[str, Any]
 64    parameters: InitVar[Mapping[str, Any]]
 65    secret_key: Union[InterpolatedString, str]
 66    algorithm: Union[str, JwtAlgorithm]
 67    token_duration: Optional[int]
 68    base64_encode_secret_key: Optional[Union[InterpolatedBoolean, str, bool]] = False
 69    header_prefix: Optional[Union[InterpolatedString, str]] = None
 70    kid: Optional[Union[InterpolatedString, str]] = None
 71    typ: Optional[Union[InterpolatedString, str]] = None
 72    cty: Optional[Union[InterpolatedString, str]] = None
 73    iss: Optional[Union[InterpolatedString, str]] = None
 74    sub: Optional[Union[InterpolatedString, str]] = None
 75    aud: Optional[Union[InterpolatedString, str]] = None
 76    additional_jwt_headers: Optional[Mapping[str, Any]] = None
 77    additional_jwt_payload: Optional[Mapping[str, Any]] = None
 78
 79    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 80        self._secret_key = InterpolatedString.create(self.secret_key, parameters=parameters)
 81        self._algorithm = (
 82            JwtAlgorithm(self.algorithm) if isinstance(self.algorithm, str) else self.algorithm
 83        )
 84        self._base64_encode_secret_key = (
 85            InterpolatedBoolean(self.base64_encode_secret_key, parameters=parameters)
 86            if isinstance(self.base64_encode_secret_key, str)
 87            else self.base64_encode_secret_key
 88        )
 89        self._token_duration = self.token_duration
 90        self._header_prefix = (
 91            InterpolatedString.create(self.header_prefix, parameters=parameters)
 92            if self.header_prefix
 93            else None
 94        )
 95        self._kid = InterpolatedString.create(self.kid, parameters=parameters) if self.kid else None
 96        self._typ = InterpolatedString.create(self.typ, parameters=parameters) if self.typ else None
 97        self._cty = InterpolatedString.create(self.cty, parameters=parameters) if self.cty else None
 98        self._iss = InterpolatedString.create(self.iss, parameters=parameters) if self.iss else None
 99        self._sub = InterpolatedString.create(self.sub, parameters=parameters) if self.sub else None
100        self._aud = InterpolatedString.create(self.aud, parameters=parameters) if self.aud else None
101        self._additional_jwt_headers = InterpolatedMapping(
102            self.additional_jwt_headers or {}, parameters=parameters
103        )
104        self._additional_jwt_payload = InterpolatedMapping(
105            self.additional_jwt_payload or {}, parameters=parameters
106        )
107
108    def _get_jwt_headers(self) -> dict[str, Any]:
109        """
110        Builds and returns the headers used when signing the JWT.
111        """
112        headers = self._additional_jwt_headers.eval(self.config, json_loads=json.loads)
113        if any(prop in headers for prop in ["kid", "alg", "typ", "cty"]):
114            raise ValueError(
115                "'kid', 'alg', 'typ', 'cty' are reserved headers and should not be set as part of 'additional_jwt_headers'"
116            )
117
118        if self._kid:
119            headers["kid"] = self._kid.eval(self.config, json_loads=json.loads)
120        if self._typ:
121            headers["typ"] = self._typ.eval(self.config, json_loads=json.loads)
122        if self._cty:
123            headers["cty"] = self._cty.eval(self.config, json_loads=json.loads)
124        headers["alg"] = self._algorithm
125        return headers
126
127    def _get_jwt_payload(self) -> dict[str, Any]:
128        """
129        Builds and returns the payload used when signing the JWT.
130        """
131        now = int(datetime.now().timestamp())
132        exp = now + self._token_duration if isinstance(self._token_duration, int) else now
133        nbf = now
134
135        payload = self._additional_jwt_payload.eval(self.config, json_loads=json.loads)
136        if any(prop in payload for prop in ["iss", "sub", "aud", "iat", "exp", "nbf"]):
137            raise ValueError(
138                "'iss', 'sub', 'aud', 'iat', 'exp', 'nbf' are reserved properties and should not be set as part of 'additional_jwt_payload'"
139            )
140
141        if self._iss:
142            payload["iss"] = self._iss.eval(self.config, json_loads=json.loads)
143        if self._sub:
144            payload["sub"] = self._sub.eval(self.config, json_loads=json.loads)
145        if self._aud:
146            payload["aud"] = self._aud.eval(self.config, json_loads=json.loads)
147
148        payload["iat"] = now
149        payload["exp"] = exp
150        payload["nbf"] = nbf
151        return payload
152
153    def _get_secret_key(self) -> str:
154        """
155        Returns the secret key used to sign the JWT.
156        """
157        secret_key: str = self._secret_key.eval(self.config, json_loads=json.loads)
158        return (
159            base64.b64encode(secret_key.encode()).decode()
160            if self._base64_encode_secret_key
161            else secret_key
162        )
163
164    def _get_signed_token(self) -> Union[str, Any]:
165        """
166        Signed the JWT using the provided secret key and algorithm and the generated headers and payload. For additional information on PyJWT see: https://pyjwt.readthedocs.io/en/stable/
167        """
168        try:
169            return jwt.encode(
170                payload=self._get_jwt_payload(),
171                key=self._get_secret_key(),
172                algorithm=self._algorithm,
173                headers=self._get_jwt_headers(),
174            )
175        except Exception as e:
176            raise ValueError(f"Failed to sign token: {e}")
177
178    def _get_header_prefix(self) -> Union[str, None]:
179        """
180        Returns the header prefix to be used when attaching the token to the request.
181        """
182        return (
183            self._header_prefix.eval(self.config, json_loads=json.loads)
184            if self._header_prefix
185            else None
186        )
187
188    @property
189    def auth_header(self) -> str:
190        return "Authorization"
191
192    @property
193    def token(self) -> str:
194        return (
195            f"{self._get_header_prefix()} {self._get_signed_token()}"
196            if self._get_header_prefix()
197            else self._get_signed_token()
198        )

Generates a JSON Web Token (JWT) based on a declarative connector configuration file. The generated token is attached to each request via the Authorization header.

Attributes:
  • config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
  • secret_key (Union[InterpolatedString, str]): The secret key used to sign the JWT
  • algorithm (Union[str, JwtAlgorithm]): The algorithm used to sign the JWT
  • token_duration (Optional[int]): The duration in seconds for which the token is valid
  • base64_encode_secret_key (Optional[Union[InterpolatedBoolean, str, bool]]): Whether to base64 encode the secret key
  • header_prefix (Optional[Union[InterpolatedString, str]]): The prefix to add to the Authorization header
  • kid (Optional[Union[InterpolatedString, str]]): The key identifier to be included in the JWT header
  • typ (Optional[Union[InterpolatedString, str]]): The type of the JWT.
  • cty (Optional[Union[InterpolatedString, str]]): The content type of the JWT.
  • iss (Optional[Union[InterpolatedString, str]]): The issuer of the JWT.
  • sub (Optional[Union[InterpolatedString, str]]): The subject of the JWT.
  • aud (Optional[Union[InterpolatedString, str]]): The audience of the JWT.
  • additional_jwt_headers (Optional[Mapping[str, Any]]): Additional headers to include in the JWT.
  • additional_jwt_payload (Optional[Mapping[str, Any]]): Additional payload to include in the JWT.
JwtAuthenticator( config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], secret_key: Union[airbyte_cdk.InterpolatedString, str], algorithm: Union[str, airbyte_cdk.sources.declarative.auth.jwt.JwtAlgorithm], token_duration: Optional[int], base64_encode_secret_key: Union[airbyte_cdk.InterpolatedBoolean, str, bool, NoneType] = False, header_prefix: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, kid: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, typ: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, cty: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, iss: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, sub: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, aud: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, additional_jwt_headers: Optional[Mapping[str, Any]] = None, additional_jwt_payload: Optional[Mapping[str, Any]] = None)
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
secret_key: Union[airbyte_cdk.InterpolatedString, str]
token_duration: Optional[int]
base64_encode_secret_key: Union[airbyte_cdk.InterpolatedBoolean, str, bool, NoneType] = False
header_prefix: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
kid: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
typ: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
cty: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
iss: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
sub: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
aud: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
additional_jwt_headers: Optional[Mapping[str, Any]] = None
additional_jwt_payload: Optional[Mapping[str, Any]] = None
auth_header: str
188    @property
189    def auth_header(self) -> str:
190        return "Authorization"

HTTP header to set on the requests

token: str
192    @property
193    def token(self) -> str:
194        return (
195            f"{self._get_header_prefix()} {self._get_signed_token()}"
196            if self._get_header_prefix()
197            else self._get_signed_token()
198        )

The header value to set on outgoing HTTP requests