airbyte_cdk.sources.declarative.auth

1#
2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3#
4
5from airbyte_cdk.sources.declarative.auth.jwt import JwtAuthenticator
6from airbyte_cdk.sources.declarative.auth.oauth import DeclarativeOauth2Authenticator
7
8__all__ = ["DeclarativeOauth2Authenticator", "JwtAuthenticator"]
 27@dataclass
 28class DeclarativeOauth2Authenticator(AbstractOauth2Authenticator, DeclarativeAuthenticator):
 29    """
 30    Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials based on
 31    a declarative connector configuration file. Credentials can be defined explicitly or via interpolation
 32    at runtime. The generated access token is attached to each request via the Authorization header.
 33
 34    Attributes:
 35        token_refresh_endpoint (Union[InterpolatedString, str]): The endpoint to refresh the access token
 36        client_id (Union[InterpolatedString, str]): The client id
 37        client_secret (Union[InterpolatedString, str]): Client secret (can be empty for APIs that support this)
 38        refresh_token (Union[InterpolatedString, str]): The token used to refresh the access token
 39        access_token_name (Union[InterpolatedString, str]): THe field to extract access token from in the response
 40        expires_in_name (Union[InterpolatedString, str]): The field to extract expires_in from in the response
 41        config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
 42        scopes (Optional[List[str]]): The scopes to request
 43        token_expiry_date (Optional[Union[InterpolatedString, str]]): The access token expiration date
 44        token_expiry_date_format str: format of the datetime; provide it if expires_in is returned in datetime instead of seconds
 45        token_expiry_is_time_of_expiration bool: set True it if expires_in is returned as time of expiration instead of the number seconds until expiration
 46        refresh_request_body (Optional[Mapping[str, Any]]): The request body to send in the refresh request
 47        refresh_request_headers (Optional[Mapping[str, Any]]): The request headers to send in the refresh request
 48        grant_type: The grant_type to request for access_token. If set to refresh_token, the refresh_token parameter has to be provided
 49        message_repository (MessageRepository): the message repository used to emit logs on HTTP requests
 50    """
 51
 52    config: Mapping[str, Any]
 53    parameters: InitVar[Mapping[str, Any]]
 54    client_id: Optional[Union[InterpolatedString, str]] = None
 55    client_secret: Optional[Union[InterpolatedString, str]] = None
 56    token_refresh_endpoint: Optional[Union[InterpolatedString, str]] = None
 57    refresh_token: Optional[Union[InterpolatedString, str]] = None
 58    scopes: Optional[List[str]] = None
 59    token_expiry_date: Optional[Union[InterpolatedString, str]] = None
 60    _token_expiry_date: Optional[AirbyteDateTime] = field(init=False, repr=False, default=None)
 61    token_expiry_date_format: Optional[str] = None
 62    token_expiry_is_time_of_expiration: bool = False
 63    access_token_name: Union[InterpolatedString, str] = "access_token"
 64    access_token_value: Optional[Union[InterpolatedString, str]] = None
 65    client_id_name: Union[InterpolatedString, str] = "client_id"
 66    client_secret_name: Union[InterpolatedString, str] = "client_secret"
 67    expires_in_name: Union[InterpolatedString, str] = "expires_in"
 68    refresh_token_name: Union[InterpolatedString, str] = "refresh_token"
 69    refresh_request_body: Optional[Mapping[str, Any]] = None
 70    refresh_request_headers: Optional[Mapping[str, Any]] = None
 71    grant_type_name: Union[InterpolatedString, str] = "grant_type"
 72    grant_type: Union[InterpolatedString, str] = "refresh_token"
 73    message_repository: MessageRepository = NoopMessageRepository()
 74    profile_assertion: Optional[DeclarativeAuthenticator] = None
 75    use_profile_assertion: Optional[Union[InterpolatedBoolean, str, bool]] = False
 76
 77    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 78        super().__init__()
 79        if self.token_refresh_endpoint is not None:
 80            self._token_refresh_endpoint: Optional[InterpolatedString] = InterpolatedString.create(
 81                self.token_refresh_endpoint, parameters=parameters
 82            )
 83        else:
 84            self._token_refresh_endpoint = None
 85        self._client_id_name = InterpolatedString.create(self.client_id_name, parameters=parameters)
 86        self._client_id = (
 87            InterpolatedString.create(self.client_id, parameters=parameters)
 88            if self.client_id
 89            else self.client_id
 90        )
 91        self._client_secret_name = InterpolatedString.create(
 92            self.client_secret_name, parameters=parameters
 93        )
 94        self._client_secret = (
 95            InterpolatedString.create(self.client_secret, parameters=parameters)
 96            if self.client_secret
 97            else self.client_secret
 98        )
 99        self._refresh_token_name = InterpolatedString.create(
100            self.refresh_token_name, parameters=parameters
101        )
102        if self.refresh_token is not None:
103            self._refresh_token: Optional[InterpolatedString] = InterpolatedString.create(
104                self.refresh_token, parameters=parameters
105            )
106        else:
107            self._refresh_token = None
108        self.access_token_name = InterpolatedString.create(
109            self.access_token_name, parameters=parameters
110        )
111        self.expires_in_name = InterpolatedString.create(
112            self.expires_in_name, parameters=parameters
113        )
114        self.grant_type_name = InterpolatedString.create(
115            self.grant_type_name, parameters=parameters
116        )
117        self.grant_type = InterpolatedString.create(
118            "urn:ietf:params:oauth:grant-type:jwt-bearer"
119            if self.use_profile_assertion
120            else self.grant_type,
121            parameters=parameters,
122        )
123        self._refresh_request_body = InterpolatedMapping(
124            self.refresh_request_body or {}, parameters=parameters
125        )
126        self._refresh_request_headers = InterpolatedMapping(
127            self.refresh_request_headers or {}, parameters=parameters
128        )
129        try:
130            if (
131                isinstance(self.token_expiry_date, (int, str))
132                and str(self.token_expiry_date).isdigit()
133            ):
134                self._token_expiry_date = ab_datetime_parse(self.token_expiry_date)
135            else:
136                self._token_expiry_date = (
137                    ab_datetime_parse(
138                        InterpolatedString.create(
139                            self.token_expiry_date, parameters=parameters
140                        ).eval(self.config)
141                    )
142                    if self.token_expiry_date
143                    else ab_datetime_now() - timedelta(days=1)
144                )
145        except ValueError as e:
146            raise ValueError(f"Invalid token expiry date format: {e}")
147        self.use_profile_assertion = (
148            InterpolatedBoolean(self.use_profile_assertion, parameters=parameters)
149            if isinstance(self.use_profile_assertion, str)
150            else self.use_profile_assertion
151        )
152        self.assertion_name = "assertion"
153
154        if self.access_token_value is not None:
155            self._access_token_value = InterpolatedString.create(
156                self.access_token_value, parameters=parameters
157            ).eval(self.config)
158        else:
159            self._access_token_value = None
160
161        self._access_token: Optional[str] = (
162            self._access_token_value if self.access_token_value else None
163        )
164
165        if not self.use_profile_assertion and any(
166            client_creds is None for client_creds in [self.client_id, self.client_secret]
167        ):
168            raise ValueError(
169                "OAuthAuthenticator configuration error: Both 'client_id' and 'client_secret' are required for the "
170                "basic OAuth flow."
171            )
172        if self.profile_assertion is None and self.use_profile_assertion:
173            raise ValueError(
174                "OAuthAuthenticator configuration error: 'profile_assertion' is required when using the profile assertion flow."
175            )
176        if self.get_grant_type() == "refresh_token" and self._refresh_token is None:
177            raise ValueError(
178                "OAuthAuthenticator configuration error: A 'refresh_token' is required when the 'grant_type' is set to 'refresh_token'."
179            )
180
181    def get_token_refresh_endpoint(self) -> Optional[str]:
182        if self._token_refresh_endpoint is not None:
183            refresh_token_endpoint: str = self._token_refresh_endpoint.eval(self.config)
184            if not refresh_token_endpoint:
185                raise ValueError(
186                    "OAuthAuthenticator was unable to evaluate token_refresh_endpoint parameter"
187                )
188            return refresh_token_endpoint
189        return None
190
191    def get_client_id_name(self) -> str:
192        return self._client_id_name.eval(self.config)  # type: ignore # eval returns a string in this context
193
194    def get_client_id(self) -> str:
195        client_id = self._client_id.eval(self.config) if self._client_id else self._client_id
196        if not client_id:
197            raise ValueError("OAuthAuthenticator was unable to evaluate client_id parameter")
198        return client_id  # type: ignore # value will be returned as a string, or an error will be raised
199
200    def get_client_secret_name(self) -> str:
201        return self._client_secret_name.eval(self.config)  # type: ignore # eval returns a string in this context
202
203    def get_client_secret(self) -> str:
204        client_secret = (
205            self._client_secret.eval(self.config) if self._client_secret else self._client_secret
206        )
207        if not client_secret:
208            # We've seen some APIs allowing empty client_secret so we will only log here
209            logger.warning(
210                "OAuthAuthenticator was unable to evaluate client_secret parameter hence it'll be empty"
211            )
212        return client_secret  # type: ignore # value will be returned as a string, which might be empty
213
214    def get_refresh_token_name(self) -> str:
215        return self._refresh_token_name.eval(self.config)  # type: ignore # eval returns a string in this context
216
217    def get_refresh_token(self) -> Optional[str]:
218        return None if self._refresh_token is None else str(self._refresh_token.eval(self.config))
219
220    def get_scopes(self) -> List[str]:
221        return self.scopes or []
222
223    def get_access_token_name(self) -> str:
224        return self.access_token_name.eval(self.config)  # type: ignore # eval returns a string in this context
225
226    def get_expires_in_name(self) -> str:
227        return self.expires_in_name.eval(self.config)  # type: ignore # eval returns a string in this context
228
229    def get_grant_type_name(self) -> str:
230        return self.grant_type_name.eval(self.config)  # type: ignore # eval returns a string in this context
231
232    def get_grant_type(self) -> str:
233        return self.grant_type.eval(self.config)  # type: ignore # eval returns a string in this context
234
235    def get_refresh_request_body(self) -> Mapping[str, Any]:
236        return self._refresh_request_body.eval(self.config)
237
238    def get_refresh_request_headers(self) -> Mapping[str, Any]:
239        return self._refresh_request_headers.eval(self.config)
240
241    def get_token_expiry_date(self) -> AirbyteDateTime:
242        if not self._has_access_token_been_initialized():
243            return AirbyteDateTime.from_datetime(datetime.min)
244        return self._token_expiry_date  # type: ignore # _token_expiry_date is an AirbyteDateTime. It is never None despite what mypy thinks
245
246    def _has_access_token_been_initialized(self) -> bool:
247        return self._access_token is not None
248
249    def set_token_expiry_date(self, value: AirbyteDateTime) -> None:
250        self._token_expiry_date = value
251
252    def get_assertion_name(self) -> str:
253        return self.assertion_name
254
255    def get_assertion(self) -> str:
256        if self.profile_assertion is None:
257            raise ValueError("profile_assertion is not set")
258        return self.profile_assertion.token
259
260    def build_refresh_request_body(self) -> Mapping[str, Any]:
261        """
262        Returns the request body to set on the refresh request
263
264        Override to define additional parameters
265        """
266        if self.use_profile_assertion:
267            return {
268                self.get_grant_type_name(): self.get_grant_type(),
269                self.get_assertion_name(): self.get_assertion(),
270            }
271        return super().build_refresh_request_body()
272
273    @property
274    def access_token(self) -> str:
275        if self._access_token is None:
276            raise ValueError("access_token is not set")
277        return self._access_token
278
279    @access_token.setter
280    def access_token(self, value: str) -> None:
281        self._access_token = value
282
283    @property
284    def _message_repository(self) -> MessageRepository:
285        """
286        Overriding AbstractOauth2Authenticator._message_repository to allow for HTTP request logs
287        """
288        return self.message_repository

Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials based on a declarative connector configuration file. Credentials can be defined explicitly or via interpolation at runtime. The generated access token is attached to each request via the Authorization header.

Attributes:
  • token_refresh_endpoint (Union[InterpolatedString, str]): The endpoint to refresh the access token
  • client_id (Union[InterpolatedString, str]): The client id
  • client_secret (Union[InterpolatedString, str]): Client secret (can be empty for APIs that support this)
  • refresh_token (Union[InterpolatedString, str]): The token used to refresh the access token
  • access_token_name (Union[InterpolatedString, str]): THe field to extract access token from in the response
  • expires_in_name (Union[InterpolatedString, str]): The field to extract expires_in from in the response
  • config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
  • scopes (Optional[List[str]]): The scopes to request
  • token_expiry_date (Optional[Union[InterpolatedString, str]]): The access token expiration date
  • token_expiry_date_format str: format of the datetime; provide it if expires_in is returned in datetime instead of seconds
  • token_expiry_is_time_of_expiration bool: set True it if expires_in is returned as time of expiration instead of the number seconds until expiration
  • refresh_request_body (Optional[Mapping[str, Any]]): The request body to send in the refresh request
  • refresh_request_headers (Optional[Mapping[str, Any]]): The request headers to send in the refresh request
  • grant_type: The grant_type to request for access_token. If set to refresh_token, the refresh_token parameter has to be provided
  • message_repository (MessageRepository): the message repository used to emit logs on HTTP requests
DeclarativeOauth2Authenticator( config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], client_id: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, client_secret: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, token_refresh_endpoint: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, refresh_token: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, scopes: Optional[List[str]] = None, token_expiry_date: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, token_expiry_date_format: Optional[str] = None, token_expiry_is_time_of_expiration: bool = False, access_token_name: Union[airbyte_cdk.InterpolatedString, str] = 'access_token', access_token_value: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, client_id_name: Union[airbyte_cdk.InterpolatedString, str] = 'client_id', client_secret_name: Union[airbyte_cdk.InterpolatedString, str] = 'client_secret', expires_in_name: Union[airbyte_cdk.InterpolatedString, str] = 'expires_in', refresh_token_name: Union[airbyte_cdk.InterpolatedString, str] = 'refresh_token', refresh_request_body: Optional[Mapping[str, Any]] = None, refresh_request_headers: Optional[Mapping[str, Any]] = None, grant_type_name: Union[airbyte_cdk.InterpolatedString, str] = 'grant_type', grant_type: Union[airbyte_cdk.InterpolatedString, str] = 'refresh_token', message_repository: airbyte_cdk.MessageRepository = <airbyte_cdk.sources.message.NoopMessageRepository object>, profile_assertion: Optional[airbyte_cdk.DeclarativeAuthenticator] = None, use_profile_assertion: Union[airbyte_cdk.InterpolatedBoolean, str, bool, NoneType] = False)

If all of refresh_token_error_status_codes, refresh_token_error_key, and refresh_token_error_values are set, then http errors with such params will be wrapped in AirbyteTracedException.

config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
client_id: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
client_secret: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
token_refresh_endpoint: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
refresh_token: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
scopes: Optional[List[str]] = None
token_expiry_date: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
token_expiry_date_format: Optional[str] = None

Format of the datetime; exists it if expires_in is returned as the expiration datetime instead of seconds until it expires

token_expiry_is_time_of_expiration: bool = False

Indicates that the Token Expiry returns the date until which the token will be valid, not the amount of time it will be valid.

access_token_name: Union[airbyte_cdk.InterpolatedString, str] = 'access_token'
access_token_value: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
client_id_name: Union[airbyte_cdk.InterpolatedString, str] = 'client_id'
client_secret_name: Union[airbyte_cdk.InterpolatedString, str] = 'client_secret'
expires_in_name: Union[airbyte_cdk.InterpolatedString, str] = 'expires_in'
refresh_token_name: Union[airbyte_cdk.InterpolatedString, str] = 'refresh_token'
refresh_request_body: Optional[Mapping[str, Any]] = None
refresh_request_headers: Optional[Mapping[str, Any]] = None
grant_type_name: Union[airbyte_cdk.InterpolatedString, str] = 'grant_type'
grant_type: Union[airbyte_cdk.InterpolatedString, str] = 'refresh_token'
profile_assertion: Optional[airbyte_cdk.DeclarativeAuthenticator] = None
use_profile_assertion: Union[airbyte_cdk.InterpolatedBoolean, str, bool, NoneType] = False
def get_token_refresh_endpoint(self) -> Optional[str]:
181    def get_token_refresh_endpoint(self) -> Optional[str]:
182        if self._token_refresh_endpoint is not None:
183            refresh_token_endpoint: str = self._token_refresh_endpoint.eval(self.config)
184            if not refresh_token_endpoint:
185                raise ValueError(
186                    "OAuthAuthenticator was unable to evaluate token_refresh_endpoint parameter"
187                )
188            return refresh_token_endpoint
189        return None

Returns the endpoint to refresh the access token

def get_client_id_name(self) -> str:
191    def get_client_id_name(self) -> str:
192        return self._client_id_name.eval(self.config)  # type: ignore # eval returns a string in this context

The client id name to authenticate

def get_client_id(self) -> str:
194    def get_client_id(self) -> str:
195        client_id = self._client_id.eval(self.config) if self._client_id else self._client_id
196        if not client_id:
197            raise ValueError("OAuthAuthenticator was unable to evaluate client_id parameter")
198        return client_id  # type: ignore # value will be returned as a string, or an error will be raised

The client id to authenticate

def get_client_secret_name(self) -> str:
200    def get_client_secret_name(self) -> str:
201        return self._client_secret_name.eval(self.config)  # type: ignore # eval returns a string in this context

The client secret name to authenticate

def get_client_secret(self) -> str:
203    def get_client_secret(self) -> str:
204        client_secret = (
205            self._client_secret.eval(self.config) if self._client_secret else self._client_secret
206        )
207        if not client_secret:
208            # We've seen some APIs allowing empty client_secret so we will only log here
209            logger.warning(
210                "OAuthAuthenticator was unable to evaluate client_secret parameter hence it'll be empty"
211            )
212        return client_secret  # type: ignore # value will be returned as a string, which might be empty

The client secret to authenticate

def get_refresh_token_name(self) -> str:
214    def get_refresh_token_name(self) -> str:
215        return self._refresh_token_name.eval(self.config)  # type: ignore # eval returns a string in this context

The refresh token name to authenticate

def get_refresh_token(self) -> Optional[str]:
217    def get_refresh_token(self) -> Optional[str]:
218        return None if self._refresh_token is None else str(self._refresh_token.eval(self.config))

The token used to refresh the access token when it expires

def get_scopes(self) -> List[str]:
220    def get_scopes(self) -> List[str]:
221        return self.scopes or []

List of requested scopes

def get_access_token_name(self) -> str:
223    def get_access_token_name(self) -> str:
224        return self.access_token_name.eval(self.config)  # type: ignore # eval returns a string in this context

Field to extract access token from in the response

def get_expires_in_name(self) -> str:
226    def get_expires_in_name(self) -> str:
227        return self.expires_in_name.eval(self.config)  # type: ignore # eval returns a string in this context

Returns the expires_in field name

def get_grant_type_name(self) -> str:
229    def get_grant_type_name(self) -> str:
230        return self.grant_type_name.eval(self.config)  # type: ignore # eval returns a string in this context

Returns grant_type specified name for requesting access_token

def get_grant_type(self) -> str:
232    def get_grant_type(self) -> str:
233        return self.grant_type.eval(self.config)  # type: ignore # eval returns a string in this context

Returns grant_type specified for requesting access_token

def get_refresh_request_body(self) -> Mapping[str, Any]:
235    def get_refresh_request_body(self) -> Mapping[str, Any]:
236        return self._refresh_request_body.eval(self.config)

Returns the request body to set on the refresh request

def get_refresh_request_headers(self) -> Mapping[str, Any]:
238    def get_refresh_request_headers(self) -> Mapping[str, Any]:
239        return self._refresh_request_headers.eval(self.config)

Returns the request headers to set on the refresh request

def get_token_expiry_date(self) -> airbyte_cdk.utils.datetime_helpers.AirbyteDateTime:
241    def get_token_expiry_date(self) -> AirbyteDateTime:
242        if not self._has_access_token_been_initialized():
243            return AirbyteDateTime.from_datetime(datetime.min)
244        return self._token_expiry_date  # type: ignore # _token_expiry_date is an AirbyteDateTime. It is never None despite what mypy thinks

Expiration date of the access token

def set_token_expiry_date(self, value: airbyte_cdk.utils.datetime_helpers.AirbyteDateTime) -> None:
249    def set_token_expiry_date(self, value: AirbyteDateTime) -> None:
250        self._token_expiry_date = value

Setter for access token expiration date

def get_assertion_name(self) -> str:
252    def get_assertion_name(self) -> str:
253        return self.assertion_name
def get_assertion(self) -> str:
255    def get_assertion(self) -> str:
256        if self.profile_assertion is None:
257            raise ValueError("profile_assertion is not set")
258        return self.profile_assertion.token
def build_refresh_request_body(self) -> Mapping[str, Any]:
260    def build_refresh_request_body(self) -> Mapping[str, Any]:
261        """
262        Returns the request body to set on the refresh request
263
264        Override to define additional parameters
265        """
266        if self.use_profile_assertion:
267            return {
268                self.get_grant_type_name(): self.get_grant_type(),
269                self.get_assertion_name(): self.get_assertion(),
270            }
271        return super().build_refresh_request_body()

Returns the request body to set on the refresh request

Override to define additional parameters

access_token: str
273    @property
274    def access_token(self) -> str:
275        if self._access_token is None:
276            raise ValueError("access_token is not set")
277        return self._access_token

Returns the access token

 55@dataclass
 56class JwtAuthenticator(DeclarativeAuthenticator):
 57    """
 58    Generates a JSON Web Token (JWT) based on a declarative connector configuration file. The generated token is attached to each request via the Authorization header.
 59
 60    Attributes:
 61        config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
 62        secret_key (Union[InterpolatedString, str]): The secret key used to sign the JWT
 63        algorithm (Union[str, JwtAlgorithm]): The algorithm used to sign the JWT
 64        token_duration (Optional[int]): The duration in seconds for which the token is valid
 65        base64_encode_secret_key (Optional[Union[InterpolatedBoolean, str, bool]]): Whether to base64 encode the secret key
 66        header_prefix (Optional[Union[InterpolatedString, str]]): The prefix to add to the Authorization header
 67        kid (Optional[Union[InterpolatedString, str]]): The key identifier to be included in the JWT header
 68        typ (Optional[Union[InterpolatedString, str]]): The type of the JWT.
 69        cty (Optional[Union[InterpolatedString, str]]): The content type of the JWT.
 70        iss (Optional[Union[InterpolatedString, str]]): The issuer of the JWT.
 71        sub (Optional[Union[InterpolatedString, str]]): The subject of the JWT.
 72        aud (Optional[Union[InterpolatedString, str]]): The audience of the JWT.
 73        additional_jwt_headers (Optional[Mapping[str, Any]]): Additional headers to include in the JWT.
 74        additional_jwt_payload (Optional[Mapping[str, Any]]): Additional payload to include in the JWT.
 75    """
 76
 77    config: Mapping[str, Any]
 78    parameters: InitVar[Mapping[str, Any]]
 79    secret_key: Union[InterpolatedString, str]
 80    algorithm: Union[str, JwtAlgorithm]
 81    token_duration: Optional[int]
 82    base64_encode_secret_key: Optional[Union[InterpolatedBoolean, str, bool]] = False
 83    header_prefix: Optional[Union[InterpolatedString, str]] = None
 84    kid: Optional[Union[InterpolatedString, str]] = None
 85    typ: Optional[Union[InterpolatedString, str]] = None
 86    cty: Optional[Union[InterpolatedString, str]] = None
 87    iss: Optional[Union[InterpolatedString, str]] = None
 88    sub: Optional[Union[InterpolatedString, str]] = None
 89    aud: Optional[Union[InterpolatedString, str]] = None
 90    additional_jwt_headers: Optional[Mapping[str, Any]] = None
 91    additional_jwt_payload: Optional[Mapping[str, Any]] = None
 92    passphrase: Optional[Union[InterpolatedString, str]] = None
 93    request_option: Optional[RequestOption] = None
 94
 95    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 96        self._secret_key = InterpolatedString.create(self.secret_key, parameters=parameters)
 97        self._algorithm = (
 98            JwtAlgorithm(self.algorithm) if isinstance(self.algorithm, str) else self.algorithm
 99        )
100        self._base64_encode_secret_key = (
101            InterpolatedBoolean(self.base64_encode_secret_key, parameters=parameters)
102            if isinstance(self.base64_encode_secret_key, str)
103            else self.base64_encode_secret_key
104        )
105        self._token_duration = self.token_duration
106        self._header_prefix = (
107            InterpolatedString.create(self.header_prefix, parameters=parameters)
108            if self.header_prefix
109            else None
110        )
111        self._kid = InterpolatedString.create(self.kid, parameters=parameters) if self.kid else None
112        self._typ = InterpolatedString.create(self.typ, parameters=parameters) if self.typ else None
113        self._cty = InterpolatedString.create(self.cty, parameters=parameters) if self.cty else None
114        self._iss = InterpolatedString.create(self.iss, parameters=parameters) if self.iss else None
115        self._sub = InterpolatedString.create(self.sub, parameters=parameters) if self.sub else None
116        self._aud = InterpolatedString.create(self.aud, parameters=parameters) if self.aud else None
117        self._additional_jwt_headers = InterpolatedMapping(
118            self.additional_jwt_headers or {}, parameters=parameters
119        )
120        self._additional_jwt_payload = InterpolatedMapping(
121            self.additional_jwt_payload or {}, parameters=parameters
122        )
123        self._passphrase = (
124            InterpolatedString.create(self.passphrase, parameters=parameters)
125            if self.passphrase
126            else None
127        )
128
129        # When we first implemented the JWT authenticator, we assumed that the signed token was always supposed
130        # to be loaded into the request headers under the `Authorization` key. This is not always the case, but
131        # this default option allows for backwards compatibility to be retained for existing connectors
132        self._request_option = self.request_option or RequestOption(
133            inject_into=RequestOptionType.header, field_name="Authorization", parameters=parameters
134        )
135
136    def _get_jwt_headers(self) -> dict[str, Any]:
137        """
138        Builds and returns the headers used when signing the JWT.
139        """
140        headers = self._additional_jwt_headers.eval(self.config, json_loads=json.loads)
141        if any(prop in headers for prop in ["kid", "alg", "typ", "cty"]):
142            raise ValueError(
143                "'kid', 'alg', 'typ', 'cty' are reserved headers and should not be set as part of 'additional_jwt_headers'"
144            )
145
146        if self._kid:
147            headers["kid"] = self._kid.eval(self.config, json_loads=json.loads)
148        if self._typ:
149            headers["typ"] = self._typ.eval(self.config, json_loads=json.loads)
150        if self._cty:
151            headers["cty"] = self._cty.eval(self.config, json_loads=json.loads)
152        headers["alg"] = self._algorithm
153        return headers
154
155    def _get_jwt_payload(self) -> dict[str, Any]:
156        """
157        Builds and returns the payload used when signing the JWT.
158        """
159        now = int(datetime.now().timestamp())
160        exp = now + self._token_duration if isinstance(self._token_duration, int) else now
161        nbf = now
162
163        payload = self._additional_jwt_payload.eval(self.config, json_loads=json.loads)
164        if any(prop in payload for prop in ["iss", "sub", "aud", "iat", "exp", "nbf"]):
165            raise ValueError(
166                "'iss', 'sub', 'aud', 'iat', 'exp', 'nbf' are reserved properties and should not be set as part of 'additional_jwt_payload'"
167            )
168
169        if self._iss:
170            payload["iss"] = self._iss.eval(self.config, json_loads=json.loads)
171        if self._sub:
172            payload["sub"] = self._sub.eval(self.config, json_loads=json.loads)
173        if self._aud:
174            payload["aud"] = self._aud.eval(self.config, json_loads=json.loads)
175
176        payload["iat"] = now
177        payload["exp"] = exp
178        payload["nbf"] = nbf
179        return payload
180
181    def _get_secret_key(self) -> JwtKeyTypes:
182        """
183        Returns the secret key used to sign the JWT.
184        """
185        secret_key: str = self._secret_key.eval(self.config, json_loads=json.loads)
186
187        if self._passphrase:
188            passphrase_value = self._passphrase.eval(self.config, json_loads=json.loads)
189            if passphrase_value:
190                private_key = serialization.load_pem_private_key(
191                    secret_key.encode(),
192                    password=passphrase_value.encode(),
193                )
194                return cast(JwtKeyTypes, private_key)
195
196        return (
197            base64.b64encode(secret_key.encode()).decode()
198            if self._base64_encode_secret_key
199            else secret_key
200        )
201
202    def _get_signed_token(self) -> Union[str, Any]:
203        """
204        Signed the JWT using the provided secret key and algorithm and the generated headers and payload. For additional information on PyJWT see: https://pyjwt.readthedocs.io/en/stable/
205        """
206        try:
207            return jwt.encode(
208                payload=self._get_jwt_payload(),
209                key=self._get_secret_key(),
210                algorithm=self._algorithm,
211                headers=self._get_jwt_headers(),
212            )
213        except Exception as e:
214            raise ValueError(f"Failed to sign token: {e}")
215
216    def _get_header_prefix(self) -> Union[str, None]:
217        """
218        Returns the header prefix to be used when attaching the token to the request.
219        """
220        return (
221            self._header_prefix.eval(self.config, json_loads=json.loads)
222            if self._header_prefix
223            else None
224        )
225
226    @property
227    def auth_header(self) -> str:
228        options = self._get_request_options(RequestOptionType.header)
229        return next(iter(options.keys()), "")
230
231    @property
232    def token(self) -> str:
233        return (
234            f"{self._get_header_prefix()} {self._get_signed_token()}"
235            if self._get_header_prefix()
236            else self._get_signed_token()
237        )
238
239    def get_request_params(self) -> Mapping[str, Any]:
240        return self._get_request_options(RequestOptionType.request_parameter)
241
242    def get_request_body_data(self) -> Union[Mapping[str, Any], str]:
243        return self._get_request_options(RequestOptionType.body_data)
244
245    def get_request_body_json(self) -> Mapping[str, Any]:
246        return self._get_request_options(RequestOptionType.body_json)
247
248    def _get_request_options(self, option_type: RequestOptionType) -> Mapping[str, Any]:
249        options: MutableMapping[str, Any] = {}
250        if self._request_option.inject_into == option_type:
251            self._request_option.inject_into_request(options, self.token, self.config)
252        return options

Generates a JSON Web Token (JWT) based on a declarative connector configuration file. The generated token is attached to each request via the Authorization header.

Attributes:
  • config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
  • secret_key (Union[InterpolatedString, str]): The secret key used to sign the JWT
  • algorithm (Union[str, JwtAlgorithm]): The algorithm used to sign the JWT
  • token_duration (Optional[int]): The duration in seconds for which the token is valid
  • base64_encode_secret_key (Optional[Union[InterpolatedBoolean, str, bool]]): Whether to base64 encode the secret key
  • header_prefix (Optional[Union[InterpolatedString, str]]): The prefix to add to the Authorization header
  • kid (Optional[Union[InterpolatedString, str]]): The key identifier to be included in the JWT header
  • typ (Optional[Union[InterpolatedString, str]]): The type of the JWT.
  • cty (Optional[Union[InterpolatedString, str]]): The content type of the JWT.
  • iss (Optional[Union[InterpolatedString, str]]): The issuer of the JWT.
  • sub (Optional[Union[InterpolatedString, str]]): The subject of the JWT.
  • aud (Optional[Union[InterpolatedString, str]]): The audience of the JWT.
  • additional_jwt_headers (Optional[Mapping[str, Any]]): Additional headers to include in the JWT.
  • additional_jwt_payload (Optional[Mapping[str, Any]]): Additional payload to include in the JWT.
JwtAuthenticator( config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], secret_key: Union[airbyte_cdk.InterpolatedString, str], algorithm: Union[str, airbyte_cdk.sources.declarative.auth.jwt.JwtAlgorithm], token_duration: Optional[int], base64_encode_secret_key: Union[airbyte_cdk.InterpolatedBoolean, str, bool, NoneType] = False, header_prefix: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, kid: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, typ: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, cty: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, iss: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, sub: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, aud: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, additional_jwt_headers: Optional[Mapping[str, Any]] = None, additional_jwt_payload: Optional[Mapping[str, Any]] = None, passphrase: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, request_option: Optional[airbyte_cdk.RequestOption] = None)
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
secret_key: Union[airbyte_cdk.InterpolatedString, str]
token_duration: Optional[int]
base64_encode_secret_key: Union[airbyte_cdk.InterpolatedBoolean, str, bool, NoneType] = False
header_prefix: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
kid: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
typ: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
cty: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
iss: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
sub: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
aud: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
additional_jwt_headers: Optional[Mapping[str, Any]] = None
additional_jwt_payload: Optional[Mapping[str, Any]] = None
passphrase: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
request_option: Optional[airbyte_cdk.RequestOption] = None
auth_header: str
226    @property
227    def auth_header(self) -> str:
228        options = self._get_request_options(RequestOptionType.header)
229        return next(iter(options.keys()), "")

HTTP header to set on the requests

token: str
231    @property
232    def token(self) -> str:
233        return (
234            f"{self._get_header_prefix()} {self._get_signed_token()}"
235            if self._get_header_prefix()
236            else self._get_signed_token()
237        )

The header value to set on outgoing HTTP requests

def get_request_params(self) -> Mapping[str, Any]:
239    def get_request_params(self) -> Mapping[str, Any]:
240        return self._get_request_options(RequestOptionType.request_parameter)

HTTP request parameter to add to the requests

def get_request_body_data(self) -> Union[Mapping[str, Any], str]:
242    def get_request_body_data(self) -> Union[Mapping[str, Any], str]:
243        return self._get_request_options(RequestOptionType.body_data)

Form-encoded body data to set on the requests

def get_request_body_json(self) -> Mapping[str, Any]:
245    def get_request_body_json(self) -> Mapping[str, Any]:
246        return self._get_request_options(RequestOptionType.body_json)

JSON-encoded body data to set on the requests