airbyte_cdk.sources.declarative.auth
27@dataclass 28class DeclarativeOauth2Authenticator(AbstractOauth2Authenticator, DeclarativeAuthenticator): 29 """ 30 Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials based on 31 a declarative connector configuration file. Credentials can be defined explicitly or via interpolation 32 at runtime. The generated access token is attached to each request via the Authorization header. 33 34 Attributes: 35 token_refresh_endpoint (Union[InterpolatedString, str]): The endpoint to refresh the access token 36 client_id (Union[InterpolatedString, str]): The client id 37 client_secret (Union[InterpolatedString, str]): Client secret (can be empty for APIs that support this) 38 refresh_token (Union[InterpolatedString, str]): The token used to refresh the access token 39 access_token_name (Union[InterpolatedString, str]): THe field to extract access token from in the response 40 expires_in_name (Union[InterpolatedString, str]): The field to extract expires_in from in the response 41 config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec 42 scopes (Optional[List[str]]): The scopes to request 43 token_expiry_date (Optional[Union[InterpolatedString, str]]): The access token expiration date 44 token_expiry_date_format str: format of the datetime; provide it if expires_in is returned in datetime instead of seconds 45 token_expiry_is_time_of_expiration bool: set True it if expires_in is returned as time of expiration instead of the number seconds until expiration 46 refresh_request_body (Optional[Mapping[str, Any]]): The request body to send in the refresh request 47 refresh_request_headers (Optional[Mapping[str, Any]]): The request headers to send in the refresh request 48 grant_type: The grant_type to request for access_token. If set to refresh_token, the refresh_token parameter has to be provided 49 message_repository (MessageRepository): the message repository used to emit logs on HTTP requests 50 refresh_token_error_status_codes (Tuple[int, ...]): Status codes to identify refresh token errors in response 51 refresh_token_error_key (str): Key to identify refresh token error in response 52 refresh_token_error_values (Tuple[str, ...]): List of values to check for exception during token refresh process 53 """ 54 55 config: Mapping[str, Any] 56 parameters: InitVar[Mapping[str, Any]] 57 client_id: Optional[Union[InterpolatedString, str]] = None 58 client_secret: Optional[Union[InterpolatedString, str]] = None 59 token_refresh_endpoint: Optional[Union[InterpolatedString, str]] = None 60 refresh_token: Optional[Union[InterpolatedString, str]] = None 61 scopes: Optional[List[str]] = None 62 token_expiry_date: Optional[Union[InterpolatedString, str]] = None 63 _token_expiry_date: Optional[AirbyteDateTime] = field(init=False, repr=False, default=None) 64 token_expiry_date_format: Optional[str] = None 65 token_expiry_is_time_of_expiration: bool = False 66 access_token_name: Union[InterpolatedString, str] = "access_token" 67 access_token_value: Optional[Union[InterpolatedString, str]] = None 68 client_id_name: Union[InterpolatedString, str] = "client_id" 69 client_secret_name: Union[InterpolatedString, str] = "client_secret" 70 expires_in_name: Union[InterpolatedString, str] = "expires_in" 71 refresh_token_name: Union[InterpolatedString, str] = "refresh_token" 72 refresh_request_body: Optional[Mapping[str, Any]] = None 73 refresh_request_headers: Optional[Mapping[str, Any]] = None 74 grant_type_name: Union[InterpolatedString, str] = "grant_type" 75 grant_type: Union[InterpolatedString, str] = "refresh_token" 76 message_repository: MessageRepository = NoopMessageRepository() 77 profile_assertion: Optional[DeclarativeAuthenticator] = None 78 use_profile_assertion: Optional[Union[InterpolatedBoolean, str, bool]] = False 79 refresh_token_error_status_codes: Tuple[int, ...] = () 80 refresh_token_error_key: str = "" 81 refresh_token_error_values: Tuple[str, ...] = () 82 83 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 84 super().__init__( 85 refresh_token_error_status_codes=self.refresh_token_error_status_codes, 86 refresh_token_error_key=self.refresh_token_error_key, 87 refresh_token_error_values=self.refresh_token_error_values, 88 ) 89 if self.token_refresh_endpoint is not None: 90 self._token_refresh_endpoint: Optional[InterpolatedString] = InterpolatedString.create( 91 self.token_refresh_endpoint, parameters=parameters 92 ) 93 else: 94 self._token_refresh_endpoint = None 95 self._client_id_name = InterpolatedString.create(self.client_id_name, parameters=parameters) 96 self._client_id = ( 97 InterpolatedString.create(self.client_id, parameters=parameters) 98 if self.client_id 99 else self.client_id 100 ) 101 self._client_secret_name = InterpolatedString.create( 102 self.client_secret_name, parameters=parameters 103 ) 104 self._client_secret = ( 105 InterpolatedString.create(self.client_secret, parameters=parameters) 106 if self.client_secret 107 else self.client_secret 108 ) 109 self._refresh_token_name = InterpolatedString.create( 110 self.refresh_token_name, parameters=parameters 111 ) 112 if self.refresh_token is not None: 113 self._refresh_token: Optional[InterpolatedString] = InterpolatedString.create( 114 self.refresh_token, parameters=parameters 115 ) 116 else: 117 self._refresh_token = None 118 self.access_token_name = InterpolatedString.create( 119 self.access_token_name, parameters=parameters 120 ) 121 self.expires_in_name = InterpolatedString.create( 122 self.expires_in_name, parameters=parameters 123 ) 124 self.grant_type_name = InterpolatedString.create( 125 self.grant_type_name, parameters=parameters 126 ) 127 self.grant_type = InterpolatedString.create( 128 "urn:ietf:params:oauth:grant-type:jwt-bearer" 129 if self.use_profile_assertion 130 else self.grant_type, 131 parameters=parameters, 132 ) 133 self._refresh_request_body = InterpolatedMapping( 134 self.refresh_request_body or {}, parameters=parameters 135 ) 136 self._refresh_request_headers = InterpolatedMapping( 137 self.refresh_request_headers or {}, parameters=parameters 138 ) 139 try: 140 if ( 141 isinstance(self.token_expiry_date, (int, str)) 142 and str(self.token_expiry_date).isdigit() 143 ): 144 self._token_expiry_date = ab_datetime_parse(self.token_expiry_date) 145 else: 146 self._token_expiry_date = ( 147 ab_datetime_parse( 148 InterpolatedString.create( 149 self.token_expiry_date, parameters=parameters 150 ).eval(self.config) 151 ) 152 if self.token_expiry_date 153 else ab_datetime_now() - timedelta(days=1) 154 ) 155 except ValueError as e: 156 raise ValueError(f"Invalid token expiry date format: {e}") 157 self.use_profile_assertion = ( 158 InterpolatedBoolean(self.use_profile_assertion, parameters=parameters) 159 if isinstance(self.use_profile_assertion, str) 160 else self.use_profile_assertion 161 ) 162 self.assertion_name = "assertion" 163 164 if self.access_token_value is not None: 165 self._access_token_value = InterpolatedString.create( 166 self.access_token_value, parameters=parameters 167 ).eval(self.config) 168 else: 169 self._access_token_value = None 170 171 self._access_token: Optional[str] = ( 172 self._access_token_value if self.access_token_value else None 173 ) 174 175 if not self.use_profile_assertion and any( 176 client_creds is None for client_creds in [self.client_id, self.client_secret] 177 ): 178 raise ValueError( 179 "OAuthAuthenticator configuration error: Both 'client_id' and 'client_secret' are required for the " 180 "basic OAuth flow." 181 ) 182 if self.profile_assertion is None and self.use_profile_assertion: 183 raise ValueError( 184 "OAuthAuthenticator configuration error: 'profile_assertion' is required when using the profile assertion flow." 185 ) 186 if self.get_grant_type() == "refresh_token" and self._refresh_token is None: 187 raise ValueError( 188 "OAuthAuthenticator configuration error: A 'refresh_token' is required when the 'grant_type' is set to 'refresh_token'." 189 ) 190 191 def get_token_refresh_endpoint(self) -> Optional[str]: 192 if self._token_refresh_endpoint is not None: 193 refresh_token_endpoint: str = self._token_refresh_endpoint.eval(self.config) 194 if not refresh_token_endpoint: 195 raise ValueError( 196 "OAuthAuthenticator was unable to evaluate token_refresh_endpoint parameter" 197 ) 198 return refresh_token_endpoint 199 return None 200 201 def get_client_id_name(self) -> str: 202 return self._client_id_name.eval(self.config) # type: ignore # eval returns a string in this context 203 204 def get_client_id(self) -> str: 205 client_id = self._client_id.eval(self.config) if self._client_id else self._client_id 206 if not client_id: 207 raise ValueError("OAuthAuthenticator was unable to evaluate client_id parameter") 208 return client_id # type: ignore # value will be returned as a string, or an error will be raised 209 210 def get_client_secret_name(self) -> str: 211 return self._client_secret_name.eval(self.config) # type: ignore # eval returns a string in this context 212 213 def get_client_secret(self) -> str: 214 client_secret = ( 215 self._client_secret.eval(self.config) if self._client_secret else self._client_secret 216 ) 217 if not client_secret: 218 # We've seen some APIs allowing empty client_secret so we will only log here 219 logger.warning( 220 "OAuthAuthenticator was unable to evaluate client_secret parameter hence it'll be empty" 221 ) 222 return client_secret # type: ignore # value will be returned as a string, which might be empty 223 224 def get_refresh_token_name(self) -> str: 225 return self._refresh_token_name.eval(self.config) # type: ignore # eval returns a string in this context 226 227 def get_refresh_token(self) -> Optional[str]: 228 return None if self._refresh_token is None else str(self._refresh_token.eval(self.config)) 229 230 def get_scopes(self) -> List[str]: 231 return self.scopes or [] 232 233 def get_access_token_name(self) -> str: 234 return self.access_token_name.eval(self.config) # type: ignore # eval returns a string in this context 235 236 def get_expires_in_name(self) -> str: 237 return self.expires_in_name.eval(self.config) # type: ignore # eval returns a string in this context 238 239 def get_grant_type_name(self) -> str: 240 return self.grant_type_name.eval(self.config) # type: ignore # eval returns a string in this context 241 242 def get_grant_type(self) -> str: 243 return self.grant_type.eval(self.config) # type: ignore # eval returns a string in this context 244 245 def get_refresh_request_body(self) -> Mapping[str, Any]: 246 return self._refresh_request_body.eval(self.config) 247 248 def get_refresh_request_headers(self) -> Mapping[str, Any]: 249 return self._refresh_request_headers.eval(self.config) 250 251 def get_token_expiry_date(self) -> AirbyteDateTime: 252 if not self._has_access_token_been_initialized(): 253 return AirbyteDateTime.from_datetime(datetime.min) 254 return self._token_expiry_date # type: ignore # _token_expiry_date is an AirbyteDateTime. It is never None despite what mypy thinks 255 256 def _has_access_token_been_initialized(self) -> bool: 257 return self._access_token is not None 258 259 def set_token_expiry_date(self, value: AirbyteDateTime) -> None: 260 self._token_expiry_date = value 261 262 def get_assertion_name(self) -> str: 263 return self.assertion_name 264 265 def get_assertion(self) -> str: 266 if self.profile_assertion is None: 267 raise ValueError("profile_assertion is not set") 268 return self.profile_assertion.token 269 270 def build_refresh_request_body(self) -> Mapping[str, Any]: 271 """ 272 Returns the request body to set on the refresh request 273 274 Override to define additional parameters 275 """ 276 if self.use_profile_assertion: 277 return { 278 self.get_grant_type_name(): self.get_grant_type(), 279 self.get_assertion_name(): self.get_assertion(), 280 } 281 return super().build_refresh_request_body() 282 283 @property 284 def access_token(self) -> str: 285 if self._access_token is None: 286 raise ValueError("access_token is not set") 287 return self._access_token 288 289 @access_token.setter 290 def access_token(self, value: str) -> None: 291 self._access_token = value 292 293 @property 294 def _message_repository(self) -> MessageRepository: 295 """ 296 Overriding AbstractOauth2Authenticator._message_repository to allow for HTTP request logs 297 """ 298 return self.message_repository
Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials based on a declarative connector configuration file. Credentials can be defined explicitly or via interpolation at runtime. The generated access token is attached to each request via the Authorization header.
Attributes:
- token_refresh_endpoint (Union[InterpolatedString, str]): The endpoint to refresh the access token
- client_id (Union[InterpolatedString, str]): The client id
- client_secret (Union[InterpolatedString, str]): Client secret (can be empty for APIs that support this)
- refresh_token (Union[InterpolatedString, str]): The token used to refresh the access token
- access_token_name (Union[InterpolatedString, str]): THe field to extract access token from in the response
- expires_in_name (Union[InterpolatedString, str]): The field to extract expires_in from in the response
- config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
- scopes (Optional[List[str]]): The scopes to request
- token_expiry_date (Optional[Union[InterpolatedString, str]]): The access token expiration date
- token_expiry_date_format str: format of the datetime; provide it if expires_in is returned in datetime instead of seconds
- token_expiry_is_time_of_expiration bool: set True it if expires_in is returned as time of expiration instead of the number seconds until expiration
- refresh_request_body (Optional[Mapping[str, Any]]): The request body to send in the refresh request
- refresh_request_headers (Optional[Mapping[str, Any]]): The request headers to send in the refresh request
- grant_type: The grant_type to request for access_token. If set to refresh_token, the refresh_token parameter has to be provided
- message_repository (MessageRepository): the message repository used to emit logs on HTTP requests
- refresh_token_error_status_codes (Tuple[int, ...]): Status codes to identify refresh token errors in response
- refresh_token_error_key (str): Key to identify refresh token error in response
- refresh_token_error_values (Tuple[str, ...]): List of values to check for exception during token refresh process
If all of refresh_token_error_status_codes, refresh_token_error_key, and refresh_token_error_values are set, then http errors with such params will be wrapped in AirbyteTracedException.
Format of the datetime; exists it if expires_in is returned as the expiration datetime instead of seconds until it expires
Indicates that the Token Expiry returns the date until which the token will be valid, not the amount of time it will be valid.
191 def get_token_refresh_endpoint(self) -> Optional[str]: 192 if self._token_refresh_endpoint is not None: 193 refresh_token_endpoint: str = self._token_refresh_endpoint.eval(self.config) 194 if not refresh_token_endpoint: 195 raise ValueError( 196 "OAuthAuthenticator was unable to evaluate token_refresh_endpoint parameter" 197 ) 198 return refresh_token_endpoint 199 return None
Returns the endpoint to refresh the access token
201 def get_client_id_name(self) -> str: 202 return self._client_id_name.eval(self.config) # type: ignore # eval returns a string in this context
The client id name to authenticate
204 def get_client_id(self) -> str: 205 client_id = self._client_id.eval(self.config) if self._client_id else self._client_id 206 if not client_id: 207 raise ValueError("OAuthAuthenticator was unable to evaluate client_id parameter") 208 return client_id # type: ignore # value will be returned as a string, or an error will be raised
The client id to authenticate
210 def get_client_secret_name(self) -> str: 211 return self._client_secret_name.eval(self.config) # type: ignore # eval returns a string in this context
The client secret name to authenticate
213 def get_client_secret(self) -> str: 214 client_secret = ( 215 self._client_secret.eval(self.config) if self._client_secret else self._client_secret 216 ) 217 if not client_secret: 218 # We've seen some APIs allowing empty client_secret so we will only log here 219 logger.warning( 220 "OAuthAuthenticator was unable to evaluate client_secret parameter hence it'll be empty" 221 ) 222 return client_secret # type: ignore # value will be returned as a string, which might be empty
The client secret to authenticate
224 def get_refresh_token_name(self) -> str: 225 return self._refresh_token_name.eval(self.config) # type: ignore # eval returns a string in this context
The refresh token name to authenticate
227 def get_refresh_token(self) -> Optional[str]: 228 return None if self._refresh_token is None else str(self._refresh_token.eval(self.config))
The token used to refresh the access token when it expires
233 def get_access_token_name(self) -> str: 234 return self.access_token_name.eval(self.config) # type: ignore # eval returns a string in this context
Field to extract access token from in the response
236 def get_expires_in_name(self) -> str: 237 return self.expires_in_name.eval(self.config) # type: ignore # eval returns a string in this context
Returns the expires_in field name
239 def get_grant_type_name(self) -> str: 240 return self.grant_type_name.eval(self.config) # type: ignore # eval returns a string in this context
Returns grant_type specified name for requesting access_token
242 def get_grant_type(self) -> str: 243 return self.grant_type.eval(self.config) # type: ignore # eval returns a string in this context
Returns grant_type specified for requesting access_token
245 def get_refresh_request_body(self) -> Mapping[str, Any]: 246 return self._refresh_request_body.eval(self.config)
Returns the request body to set on the refresh request
248 def get_refresh_request_headers(self) -> Mapping[str, Any]: 249 return self._refresh_request_headers.eval(self.config)
Returns the request headers to set on the refresh request
251 def get_token_expiry_date(self) -> AirbyteDateTime: 252 if not self._has_access_token_been_initialized(): 253 return AirbyteDateTime.from_datetime(datetime.min) 254 return self._token_expiry_date # type: ignore # _token_expiry_date is an AirbyteDateTime. It is never None despite what mypy thinks
Expiration date of the access token
259 def set_token_expiry_date(self, value: AirbyteDateTime) -> None: 260 self._token_expiry_date = value
Setter for access token expiration date
270 def build_refresh_request_body(self) -> Mapping[str, Any]: 271 """ 272 Returns the request body to set on the refresh request 273 274 Override to define additional parameters 275 """ 276 if self.use_profile_assertion: 277 return { 278 self.get_grant_type_name(): self.get_grant_type(), 279 self.get_assertion_name(): self.get_assertion(), 280 } 281 return super().build_refresh_request_body()
Returns the request body to set on the refresh request
Override to define additional parameters
283 @property 284 def access_token(self) -> str: 285 if self._access_token is None: 286 raise ValueError("access_token is not set") 287 return self._access_token
Returns the access token
Inherited Members
55@dataclass 56class JwtAuthenticator(DeclarativeAuthenticator): 57 """ 58 Generates a JSON Web Token (JWT) based on a declarative connector configuration file. The generated token is attached to each request via the Authorization header. 59 60 Attributes: 61 config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec 62 secret_key (Union[InterpolatedString, str]): The secret key used to sign the JWT 63 algorithm (Union[str, JwtAlgorithm]): The algorithm used to sign the JWT 64 token_duration (Optional[int]): The duration in seconds for which the token is valid 65 base64_encode_secret_key (Optional[Union[InterpolatedBoolean, str, bool]]): Whether to base64 encode the secret key 66 header_prefix (Optional[Union[InterpolatedString, str]]): The prefix to add to the Authorization header 67 kid (Optional[Union[InterpolatedString, str]]): The key identifier to be included in the JWT header 68 typ (Optional[Union[InterpolatedString, str]]): The type of the JWT. 69 cty (Optional[Union[InterpolatedString, str]]): The content type of the JWT. 70 iss (Optional[Union[InterpolatedString, str]]): The issuer of the JWT. 71 sub (Optional[Union[InterpolatedString, str]]): The subject of the JWT. 72 aud (Optional[Union[InterpolatedString, str]]): The audience of the JWT. 73 additional_jwt_headers (Optional[Mapping[str, Any]]): Additional headers to include in the JWT. 74 additional_jwt_payload (Optional[Mapping[str, Any]]): Additional payload to include in the JWT. 75 """ 76 77 config: Mapping[str, Any] 78 parameters: InitVar[Mapping[str, Any]] 79 secret_key: Union[InterpolatedString, str] 80 algorithm: Union[str, JwtAlgorithm] 81 token_duration: Optional[int] 82 base64_encode_secret_key: Optional[Union[InterpolatedBoolean, str, bool]] = False 83 header_prefix: Optional[Union[InterpolatedString, str]] = None 84 kid: Optional[Union[InterpolatedString, str]] = None 85 typ: Optional[Union[InterpolatedString, str]] = None 86 cty: Optional[Union[InterpolatedString, str]] = None 87 iss: Optional[Union[InterpolatedString, str]] = None 88 sub: Optional[Union[InterpolatedString, str]] = None 89 aud: Optional[Union[InterpolatedString, str]] = None 90 additional_jwt_headers: Optional[Mapping[str, Any]] = None 91 additional_jwt_payload: Optional[Mapping[str, Any]] = None 92 passphrase: Optional[Union[InterpolatedString, str]] = None 93 request_option: Optional[RequestOption] = None 94 95 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 96 self._secret_key = InterpolatedString.create(self.secret_key, parameters=parameters) 97 self._algorithm = ( 98 JwtAlgorithm(self.algorithm) if isinstance(self.algorithm, str) else self.algorithm 99 ) 100 self._base64_encode_secret_key = ( 101 InterpolatedBoolean(self.base64_encode_secret_key, parameters=parameters) 102 if isinstance(self.base64_encode_secret_key, str) 103 else self.base64_encode_secret_key 104 ) 105 self._token_duration = self.token_duration 106 self._header_prefix = ( 107 InterpolatedString.create(self.header_prefix, parameters=parameters) 108 if self.header_prefix 109 else None 110 ) 111 self._kid = InterpolatedString.create(self.kid, parameters=parameters) if self.kid else None 112 self._typ = InterpolatedString.create(self.typ, parameters=parameters) if self.typ else None 113 self._cty = InterpolatedString.create(self.cty, parameters=parameters) if self.cty else None 114 self._iss = InterpolatedString.create(self.iss, parameters=parameters) if self.iss else None 115 self._sub = InterpolatedString.create(self.sub, parameters=parameters) if self.sub else None 116 self._aud = InterpolatedString.create(self.aud, parameters=parameters) if self.aud else None 117 self._additional_jwt_headers = InterpolatedMapping( 118 self.additional_jwt_headers or {}, parameters=parameters 119 ) 120 self._additional_jwt_payload = InterpolatedMapping( 121 self.additional_jwt_payload or {}, parameters=parameters 122 ) 123 self._passphrase = ( 124 InterpolatedString.create(self.passphrase, parameters=parameters) 125 if self.passphrase 126 else None 127 ) 128 129 # When we first implemented the JWT authenticator, we assumed that the signed token was always supposed 130 # to be loaded into the request headers under the `Authorization` key. This is not always the case, but 131 # this default option allows for backwards compatibility to be retained for existing connectors 132 self._request_option = self.request_option or RequestOption( 133 inject_into=RequestOptionType.header, field_name="Authorization", parameters=parameters 134 ) 135 136 def _get_jwt_headers(self) -> dict[str, Any]: 137 """ 138 Builds and returns the headers used when signing the JWT. 139 """ 140 headers = self._additional_jwt_headers.eval(self.config, json_loads=json.loads) 141 if any(prop in headers for prop in ["kid", "alg", "typ", "cty"]): 142 raise ValueError( 143 "'kid', 'alg', 'typ', 'cty' are reserved headers and should not be set as part of 'additional_jwt_headers'" 144 ) 145 146 if self._kid: 147 headers["kid"] = self._kid.eval(self.config, json_loads=json.loads) 148 if self._typ: 149 headers["typ"] = self._typ.eval(self.config, json_loads=json.loads) 150 if self._cty: 151 headers["cty"] = self._cty.eval(self.config, json_loads=json.loads) 152 headers["alg"] = self._algorithm 153 return headers 154 155 def _get_jwt_payload(self) -> dict[str, Any]: 156 """ 157 Builds and returns the payload used when signing the JWT. 158 """ 159 now = int(datetime.now().timestamp()) 160 exp = now + self._token_duration if isinstance(self._token_duration, int) else now 161 nbf = now 162 163 payload = self._additional_jwt_payload.eval(self.config, json_loads=json.loads) 164 if any(prop in payload for prop in ["iss", "sub", "aud", "iat", "exp", "nbf"]): 165 raise ValueError( 166 "'iss', 'sub', 'aud', 'iat', 'exp', 'nbf' are reserved properties and should not be set as part of 'additional_jwt_payload'" 167 ) 168 169 if self._iss: 170 payload["iss"] = self._iss.eval(self.config, json_loads=json.loads) 171 if self._sub: 172 payload["sub"] = self._sub.eval(self.config, json_loads=json.loads) 173 if self._aud: 174 payload["aud"] = self._aud.eval(self.config, json_loads=json.loads) 175 176 payload["iat"] = now 177 payload["exp"] = exp 178 payload["nbf"] = nbf 179 return payload 180 181 def _get_secret_key(self) -> JwtKeyTypes: 182 """ 183 Returns the secret key used to sign the JWT. 184 """ 185 secret_key: str = self._secret_key.eval(self.config, json_loads=json.loads) 186 187 if self._passphrase: 188 passphrase_value = self._passphrase.eval(self.config, json_loads=json.loads) 189 if passphrase_value: 190 private_key = serialization.load_pem_private_key( 191 secret_key.encode(), 192 password=passphrase_value.encode(), 193 ) 194 return cast(JwtKeyTypes, private_key) 195 196 return ( 197 base64.b64encode(secret_key.encode()).decode() 198 if self._base64_encode_secret_key 199 else secret_key 200 ) 201 202 def _get_signed_token(self) -> Union[str, Any]: 203 """ 204 Signed the JWT using the provided secret key and algorithm and the generated headers and payload. For additional information on PyJWT see: https://pyjwt.readthedocs.io/en/stable/ 205 """ 206 try: 207 return jwt.encode( 208 payload=self._get_jwt_payload(), 209 key=self._get_secret_key(), 210 algorithm=self._algorithm, 211 headers=self._get_jwt_headers(), 212 ) 213 except Exception as e: 214 raise ValueError(f"Failed to sign token: {e}") 215 216 def _get_header_prefix(self) -> Union[str, None]: 217 """ 218 Returns the header prefix to be used when attaching the token to the request. 219 """ 220 return ( 221 self._header_prefix.eval(self.config, json_loads=json.loads) 222 if self._header_prefix 223 else None 224 ) 225 226 @property 227 def auth_header(self) -> str: 228 options = self._get_request_options(RequestOptionType.header) 229 return next(iter(options.keys()), "") 230 231 @property 232 def token(self) -> str: 233 return ( 234 f"{self._get_header_prefix()} {self._get_signed_token()}" 235 if self._get_header_prefix() 236 else self._get_signed_token() 237 ) 238 239 def get_request_params(self) -> Mapping[str, Any]: 240 return self._get_request_options(RequestOptionType.request_parameter) 241 242 def get_request_body_data(self) -> Union[Mapping[str, Any], str]: 243 return self._get_request_options(RequestOptionType.body_data) 244 245 def get_request_body_json(self) -> Mapping[str, Any]: 246 return self._get_request_options(RequestOptionType.body_json) 247 248 def _get_request_options(self, option_type: RequestOptionType) -> Mapping[str, Any]: 249 options: MutableMapping[str, Any] = {} 250 if self._request_option.inject_into == option_type: 251 self._request_option.inject_into_request(options, self.token, self.config) 252 return options
Generates a JSON Web Token (JWT) based on a declarative connector configuration file. The generated token is attached to each request via the Authorization header.
Attributes:
- config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
- secret_key (Union[InterpolatedString, str]): The secret key used to sign the JWT
- algorithm (Union[str, JwtAlgorithm]): The algorithm used to sign the JWT
- token_duration (Optional[int]): The duration in seconds for which the token is valid
- base64_encode_secret_key (Optional[Union[InterpolatedBoolean, str, bool]]): Whether to base64 encode the secret key
- header_prefix (Optional[Union[InterpolatedString, str]]): The prefix to add to the Authorization header
- kid (Optional[Union[InterpolatedString, str]]): The key identifier to be included in the JWT header
- typ (Optional[Union[InterpolatedString, str]]): The type of the JWT.
- cty (Optional[Union[InterpolatedString, str]]): The content type of the JWT.
- iss (Optional[Union[InterpolatedString, str]]): The issuer of the JWT.
- sub (Optional[Union[InterpolatedString, str]]): The subject of the JWT.
- aud (Optional[Union[InterpolatedString, str]]): The audience of the JWT.
- additional_jwt_headers (Optional[Mapping[str, Any]]): Additional headers to include in the JWT.
- additional_jwt_payload (Optional[Mapping[str, Any]]): Additional payload to include in the JWT.
226 @property 227 def auth_header(self) -> str: 228 options = self._get_request_options(RequestOptionType.header) 229 return next(iter(options.keys()), "")
HTTP header to set on the requests
231 @property 232 def token(self) -> str: 233 return ( 234 f"{self._get_header_prefix()} {self._get_signed_token()}" 235 if self._get_header_prefix() 236 else self._get_signed_token() 237 )
The header value to set on outgoing HTTP requests
239 def get_request_params(self) -> Mapping[str, Any]: 240 return self._get_request_options(RequestOptionType.request_parameter)
HTTP request parameter to add to the requests
242 def get_request_body_data(self) -> Union[Mapping[str, Any], str]: 243 return self._get_request_options(RequestOptionType.body_data)
Form-encoded body data to set on the requests