airbyte_cdk.sources.declarative.auth
27@dataclass 28class DeclarativeOauth2Authenticator(AbstractOauth2Authenticator, DeclarativeAuthenticator): 29 """ 30 Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials based on 31 a declarative connector configuration file. Credentials can be defined explicitly or via interpolation 32 at runtime. The generated access token is attached to each request via the Authorization header. 33 34 Attributes: 35 token_refresh_endpoint (Union[InterpolatedString, str]): The endpoint to refresh the access token 36 client_id (Union[InterpolatedString, str]): The client id 37 client_secret (Union[InterpolatedString, str]): Client secret (can be empty for APIs that support this) 38 refresh_token (Union[InterpolatedString, str]): The token used to refresh the access token 39 access_token_name (Union[InterpolatedString, str]): THe field to extract access token from in the response 40 expires_in_name (Union[InterpolatedString, str]): The field to extract expires_in from in the response 41 config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec 42 scopes (Optional[List[str]]): The scopes to request 43 token_expiry_date (Optional[Union[InterpolatedString, str]]): The access token expiration date 44 token_expiry_date_format str: format of the datetime; provide it if expires_in is returned in datetime instead of seconds 45 token_expiry_is_time_of_expiration bool: set True it if expires_in is returned as time of expiration instead of the number seconds until expiration 46 refresh_request_body (Optional[Mapping[str, Any]]): The request body to send in the refresh request 47 refresh_request_headers (Optional[Mapping[str, Any]]): The request headers to send in the refresh request 48 grant_type: The grant_type to request for access_token. If set to refresh_token, the refresh_token parameter has to be provided 49 message_repository (MessageRepository): the message repository used to emit logs on HTTP requests 50 """ 51 52 config: Mapping[str, Any] 53 parameters: InitVar[Mapping[str, Any]] 54 client_id: Optional[Union[InterpolatedString, str]] = None 55 client_secret: Optional[Union[InterpolatedString, str]] = None 56 token_refresh_endpoint: Optional[Union[InterpolatedString, str]] = None 57 refresh_token: Optional[Union[InterpolatedString, str]] = None 58 scopes: Optional[List[str]] = None 59 token_expiry_date: Optional[Union[InterpolatedString, str]] = None 60 _token_expiry_date: Optional[AirbyteDateTime] = field(init=False, repr=False, default=None) 61 token_expiry_date_format: Optional[str] = None 62 token_expiry_is_time_of_expiration: bool = False 63 access_token_name: Union[InterpolatedString, str] = "access_token" 64 access_token_value: Optional[Union[InterpolatedString, str]] = None 65 client_id_name: Union[InterpolatedString, str] = "client_id" 66 client_secret_name: Union[InterpolatedString, str] = "client_secret" 67 expires_in_name: Union[InterpolatedString, str] = "expires_in" 68 refresh_token_name: Union[InterpolatedString, str] = "refresh_token" 69 refresh_request_body: Optional[Mapping[str, Any]] = None 70 refresh_request_headers: Optional[Mapping[str, Any]] = None 71 grant_type_name: Union[InterpolatedString, str] = "grant_type" 72 grant_type: Union[InterpolatedString, str] = "refresh_token" 73 message_repository: MessageRepository = NoopMessageRepository() 74 profile_assertion: Optional[DeclarativeAuthenticator] = None 75 use_profile_assertion: Optional[Union[InterpolatedBoolean, str, bool]] = False 76 77 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 78 super().__init__() 79 if self.token_refresh_endpoint is not None: 80 self._token_refresh_endpoint: Optional[InterpolatedString] = InterpolatedString.create( 81 self.token_refresh_endpoint, parameters=parameters 82 ) 83 else: 84 self._token_refresh_endpoint = None 85 self._client_id_name = InterpolatedString.create(self.client_id_name, parameters=parameters) 86 self._client_id = ( 87 InterpolatedString.create(self.client_id, parameters=parameters) 88 if self.client_id 89 else self.client_id 90 ) 91 self._client_secret_name = InterpolatedString.create( 92 self.client_secret_name, parameters=parameters 93 ) 94 self._client_secret = ( 95 InterpolatedString.create(self.client_secret, parameters=parameters) 96 if self.client_secret 97 else self.client_secret 98 ) 99 self._refresh_token_name = InterpolatedString.create( 100 self.refresh_token_name, parameters=parameters 101 ) 102 if self.refresh_token is not None: 103 self._refresh_token: Optional[InterpolatedString] = InterpolatedString.create( 104 self.refresh_token, parameters=parameters 105 ) 106 else: 107 self._refresh_token = None 108 self.access_token_name = InterpolatedString.create( 109 self.access_token_name, parameters=parameters 110 ) 111 self.expires_in_name = InterpolatedString.create( 112 self.expires_in_name, parameters=parameters 113 ) 114 self.grant_type_name = InterpolatedString.create( 115 self.grant_type_name, parameters=parameters 116 ) 117 self.grant_type = InterpolatedString.create( 118 "urn:ietf:params:oauth:grant-type:jwt-bearer" 119 if self.use_profile_assertion 120 else self.grant_type, 121 parameters=parameters, 122 ) 123 self._refresh_request_body = InterpolatedMapping( 124 self.refresh_request_body or {}, parameters=parameters 125 ) 126 self._refresh_request_headers = InterpolatedMapping( 127 self.refresh_request_headers or {}, parameters=parameters 128 ) 129 try: 130 if ( 131 isinstance(self.token_expiry_date, (int, str)) 132 and str(self.token_expiry_date).isdigit() 133 ): 134 self._token_expiry_date = ab_datetime_parse(self.token_expiry_date) 135 else: 136 self._token_expiry_date = ( 137 ab_datetime_parse( 138 InterpolatedString.create( 139 self.token_expiry_date, parameters=parameters 140 ).eval(self.config) 141 ) 142 if self.token_expiry_date 143 else ab_datetime_now() - timedelta(days=1) 144 ) 145 except ValueError as e: 146 raise ValueError(f"Invalid token expiry date format: {e}") 147 self.use_profile_assertion = ( 148 InterpolatedBoolean(self.use_profile_assertion, parameters=parameters) 149 if isinstance(self.use_profile_assertion, str) 150 else self.use_profile_assertion 151 ) 152 self.assertion_name = "assertion" 153 154 if self.access_token_value is not None: 155 self._access_token_value = InterpolatedString.create( 156 self.access_token_value, parameters=parameters 157 ).eval(self.config) 158 else: 159 self._access_token_value = None 160 161 self._access_token: Optional[str] = ( 162 self._access_token_value if self.access_token_value else None 163 ) 164 165 if not self.use_profile_assertion and any( 166 client_creds is None for client_creds in [self.client_id, self.client_secret] 167 ): 168 raise ValueError( 169 "OAuthAuthenticator configuration error: Both 'client_id' and 'client_secret' are required for the " 170 "basic OAuth flow." 171 ) 172 if self.profile_assertion is None and self.use_profile_assertion: 173 raise ValueError( 174 "OAuthAuthenticator configuration error: 'profile_assertion' is required when using the profile assertion flow." 175 ) 176 if self.get_grant_type() == "refresh_token" and self._refresh_token is None: 177 raise ValueError( 178 "OAuthAuthenticator configuration error: A 'refresh_token' is required when the 'grant_type' is set to 'refresh_token'." 179 ) 180 181 def get_token_refresh_endpoint(self) -> Optional[str]: 182 if self._token_refresh_endpoint is not None: 183 refresh_token_endpoint: str = self._token_refresh_endpoint.eval(self.config) 184 if not refresh_token_endpoint: 185 raise ValueError( 186 "OAuthAuthenticator was unable to evaluate token_refresh_endpoint parameter" 187 ) 188 return refresh_token_endpoint 189 return None 190 191 def get_client_id_name(self) -> str: 192 return self._client_id_name.eval(self.config) # type: ignore # eval returns a string in this context 193 194 def get_client_id(self) -> str: 195 client_id = self._client_id.eval(self.config) if self._client_id else self._client_id 196 if not client_id: 197 raise ValueError("OAuthAuthenticator was unable to evaluate client_id parameter") 198 return client_id # type: ignore # value will be returned as a string, or an error will be raised 199 200 def get_client_secret_name(self) -> str: 201 return self._client_secret_name.eval(self.config) # type: ignore # eval returns a string in this context 202 203 def get_client_secret(self) -> str: 204 client_secret = ( 205 self._client_secret.eval(self.config) if self._client_secret else self._client_secret 206 ) 207 if not client_secret: 208 # We've seen some APIs allowing empty client_secret so we will only log here 209 logger.warning( 210 "OAuthAuthenticator was unable to evaluate client_secret parameter hence it'll be empty" 211 ) 212 return client_secret # type: ignore # value will be returned as a string, which might be empty 213 214 def get_refresh_token_name(self) -> str: 215 return self._refresh_token_name.eval(self.config) # type: ignore # eval returns a string in this context 216 217 def get_refresh_token(self) -> Optional[str]: 218 return None if self._refresh_token is None else str(self._refresh_token.eval(self.config)) 219 220 def get_scopes(self) -> List[str]: 221 return self.scopes or [] 222 223 def get_access_token_name(self) -> str: 224 return self.access_token_name.eval(self.config) # type: ignore # eval returns a string in this context 225 226 def get_expires_in_name(self) -> str: 227 return self.expires_in_name.eval(self.config) # type: ignore # eval returns a string in this context 228 229 def get_grant_type_name(self) -> str: 230 return self.grant_type_name.eval(self.config) # type: ignore # eval returns a string in this context 231 232 def get_grant_type(self) -> str: 233 return self.grant_type.eval(self.config) # type: ignore # eval returns a string in this context 234 235 def get_refresh_request_body(self) -> Mapping[str, Any]: 236 return self._refresh_request_body.eval(self.config) 237 238 def get_refresh_request_headers(self) -> Mapping[str, Any]: 239 return self._refresh_request_headers.eval(self.config) 240 241 def get_token_expiry_date(self) -> AirbyteDateTime: 242 if not self._has_access_token_been_initialized(): 243 return AirbyteDateTime.from_datetime(datetime.min) 244 return self._token_expiry_date # type: ignore # _token_expiry_date is an AirbyteDateTime. It is never None despite what mypy thinks 245 246 def _has_access_token_been_initialized(self) -> bool: 247 return self._access_token is not None 248 249 def set_token_expiry_date(self, value: AirbyteDateTime) -> None: 250 self._token_expiry_date = value 251 252 def get_assertion_name(self) -> str: 253 return self.assertion_name 254 255 def get_assertion(self) -> str: 256 if self.profile_assertion is None: 257 raise ValueError("profile_assertion is not set") 258 return self.profile_assertion.token 259 260 def build_refresh_request_body(self) -> Mapping[str, Any]: 261 """ 262 Returns the request body to set on the refresh request 263 264 Override to define additional parameters 265 """ 266 if self.use_profile_assertion: 267 return { 268 self.get_grant_type_name(): self.get_grant_type(), 269 self.get_assertion_name(): self.get_assertion(), 270 } 271 return super().build_refresh_request_body() 272 273 @property 274 def access_token(self) -> str: 275 if self._access_token is None: 276 raise ValueError("access_token is not set") 277 return self._access_token 278 279 @access_token.setter 280 def access_token(self, value: str) -> None: 281 self._access_token = value 282 283 @property 284 def _message_repository(self) -> MessageRepository: 285 """ 286 Overriding AbstractOauth2Authenticator._message_repository to allow for HTTP request logs 287 """ 288 return self.message_repository
Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials based on a declarative connector configuration file. Credentials can be defined explicitly or via interpolation at runtime. The generated access token is attached to each request via the Authorization header.
Attributes:
- token_refresh_endpoint (Union[InterpolatedString, str]): The endpoint to refresh the access token
- client_id (Union[InterpolatedString, str]): The client id
- client_secret (Union[InterpolatedString, str]): Client secret (can be empty for APIs that support this)
- refresh_token (Union[InterpolatedString, str]): The token used to refresh the access token
- access_token_name (Union[InterpolatedString, str]): THe field to extract access token from in the response
- expires_in_name (Union[InterpolatedString, str]): The field to extract expires_in from in the response
- config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
- scopes (Optional[List[str]]): The scopes to request
- token_expiry_date (Optional[Union[InterpolatedString, str]]): The access token expiration date
- token_expiry_date_format str: format of the datetime; provide it if expires_in is returned in datetime instead of seconds
- token_expiry_is_time_of_expiration bool: set True it if expires_in is returned as time of expiration instead of the number seconds until expiration
- refresh_request_body (Optional[Mapping[str, Any]]): The request body to send in the refresh request
- refresh_request_headers (Optional[Mapping[str, Any]]): The request headers to send in the refresh request
- grant_type: The grant_type to request for access_token. If set to refresh_token, the refresh_token parameter has to be provided
- message_repository (MessageRepository): the message repository used to emit logs on HTTP requests
If all of refresh_token_error_status_codes, refresh_token_error_key, and refresh_token_error_values are set, then http errors with such params will be wrapped in AirbyteTracedException.
Format of the datetime; exists it if expires_in is returned as the expiration datetime instead of seconds until it expires
Indicates that the Token Expiry returns the date until which the token will be valid, not the amount of time it will be valid.
181 def get_token_refresh_endpoint(self) -> Optional[str]: 182 if self._token_refresh_endpoint is not None: 183 refresh_token_endpoint: str = self._token_refresh_endpoint.eval(self.config) 184 if not refresh_token_endpoint: 185 raise ValueError( 186 "OAuthAuthenticator was unable to evaluate token_refresh_endpoint parameter" 187 ) 188 return refresh_token_endpoint 189 return None
Returns the endpoint to refresh the access token
191 def get_client_id_name(self) -> str: 192 return self._client_id_name.eval(self.config) # type: ignore # eval returns a string in this context
The client id name to authenticate
194 def get_client_id(self) -> str: 195 client_id = self._client_id.eval(self.config) if self._client_id else self._client_id 196 if not client_id: 197 raise ValueError("OAuthAuthenticator was unable to evaluate client_id parameter") 198 return client_id # type: ignore # value will be returned as a string, or an error will be raised
The client id to authenticate
200 def get_client_secret_name(self) -> str: 201 return self._client_secret_name.eval(self.config) # type: ignore # eval returns a string in this context
The client secret name to authenticate
203 def get_client_secret(self) -> str: 204 client_secret = ( 205 self._client_secret.eval(self.config) if self._client_secret else self._client_secret 206 ) 207 if not client_secret: 208 # We've seen some APIs allowing empty client_secret so we will only log here 209 logger.warning( 210 "OAuthAuthenticator was unable to evaluate client_secret parameter hence it'll be empty" 211 ) 212 return client_secret # type: ignore # value will be returned as a string, which might be empty
The client secret to authenticate
214 def get_refresh_token_name(self) -> str: 215 return self._refresh_token_name.eval(self.config) # type: ignore # eval returns a string in this context
The refresh token name to authenticate
217 def get_refresh_token(self) -> Optional[str]: 218 return None if self._refresh_token is None else str(self._refresh_token.eval(self.config))
The token used to refresh the access token when it expires
223 def get_access_token_name(self) -> str: 224 return self.access_token_name.eval(self.config) # type: ignore # eval returns a string in this context
Field to extract access token from in the response
226 def get_expires_in_name(self) -> str: 227 return self.expires_in_name.eval(self.config) # type: ignore # eval returns a string in this context
Returns the expires_in field name
229 def get_grant_type_name(self) -> str: 230 return self.grant_type_name.eval(self.config) # type: ignore # eval returns a string in this context
Returns grant_type specified name for requesting access_token
232 def get_grant_type(self) -> str: 233 return self.grant_type.eval(self.config) # type: ignore # eval returns a string in this context
Returns grant_type specified for requesting access_token
235 def get_refresh_request_body(self) -> Mapping[str, Any]: 236 return self._refresh_request_body.eval(self.config)
Returns the request body to set on the refresh request
238 def get_refresh_request_headers(self) -> Mapping[str, Any]: 239 return self._refresh_request_headers.eval(self.config)
Returns the request headers to set on the refresh request
241 def get_token_expiry_date(self) -> AirbyteDateTime: 242 if not self._has_access_token_been_initialized(): 243 return AirbyteDateTime.from_datetime(datetime.min) 244 return self._token_expiry_date # type: ignore # _token_expiry_date is an AirbyteDateTime. It is never None despite what mypy thinks
Expiration date of the access token
249 def set_token_expiry_date(self, value: AirbyteDateTime) -> None: 250 self._token_expiry_date = value
Setter for access token expiration date
260 def build_refresh_request_body(self) -> Mapping[str, Any]: 261 """ 262 Returns the request body to set on the refresh request 263 264 Override to define additional parameters 265 """ 266 if self.use_profile_assertion: 267 return { 268 self.get_grant_type_name(): self.get_grant_type(), 269 self.get_assertion_name(): self.get_assertion(), 270 } 271 return super().build_refresh_request_body()
Returns the request body to set on the refresh request
Override to define additional parameters
273 @property 274 def access_token(self) -> str: 275 if self._access_token is None: 276 raise ValueError("access_token is not set") 277 return self._access_token
Returns the access token
Inherited Members
41@dataclass 42class JwtAuthenticator(DeclarativeAuthenticator): 43 """ 44 Generates a JSON Web Token (JWT) based on a declarative connector configuration file. The generated token is attached to each request via the Authorization header. 45 46 Attributes: 47 config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec 48 secret_key (Union[InterpolatedString, str]): The secret key used to sign the JWT 49 algorithm (Union[str, JwtAlgorithm]): The algorithm used to sign the JWT 50 token_duration (Optional[int]): The duration in seconds for which the token is valid 51 base64_encode_secret_key (Optional[Union[InterpolatedBoolean, str, bool]]): Whether to base64 encode the secret key 52 header_prefix (Optional[Union[InterpolatedString, str]]): The prefix to add to the Authorization header 53 kid (Optional[Union[InterpolatedString, str]]): The key identifier to be included in the JWT header 54 typ (Optional[Union[InterpolatedString, str]]): The type of the JWT. 55 cty (Optional[Union[InterpolatedString, str]]): The content type of the JWT. 56 iss (Optional[Union[InterpolatedString, str]]): The issuer of the JWT. 57 sub (Optional[Union[InterpolatedString, str]]): The subject of the JWT. 58 aud (Optional[Union[InterpolatedString, str]]): The audience of the JWT. 59 additional_jwt_headers (Optional[Mapping[str, Any]]): Additional headers to include in the JWT. 60 additional_jwt_payload (Optional[Mapping[str, Any]]): Additional payload to include in the JWT. 61 """ 62 63 config: Mapping[str, Any] 64 parameters: InitVar[Mapping[str, Any]] 65 secret_key: Union[InterpolatedString, str] 66 algorithm: Union[str, JwtAlgorithm] 67 token_duration: Optional[int] 68 base64_encode_secret_key: Optional[Union[InterpolatedBoolean, str, bool]] = False 69 header_prefix: Optional[Union[InterpolatedString, str]] = None 70 kid: Optional[Union[InterpolatedString, str]] = None 71 typ: Optional[Union[InterpolatedString, str]] = None 72 cty: Optional[Union[InterpolatedString, str]] = None 73 iss: Optional[Union[InterpolatedString, str]] = None 74 sub: Optional[Union[InterpolatedString, str]] = None 75 aud: Optional[Union[InterpolatedString, str]] = None 76 additional_jwt_headers: Optional[Mapping[str, Any]] = None 77 additional_jwt_payload: Optional[Mapping[str, Any]] = None 78 79 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 80 self._secret_key = InterpolatedString.create(self.secret_key, parameters=parameters) 81 self._algorithm = ( 82 JwtAlgorithm(self.algorithm) if isinstance(self.algorithm, str) else self.algorithm 83 ) 84 self._base64_encode_secret_key = ( 85 InterpolatedBoolean(self.base64_encode_secret_key, parameters=parameters) 86 if isinstance(self.base64_encode_secret_key, str) 87 else self.base64_encode_secret_key 88 ) 89 self._token_duration = self.token_duration 90 self._header_prefix = ( 91 InterpolatedString.create(self.header_prefix, parameters=parameters) 92 if self.header_prefix 93 else None 94 ) 95 self._kid = InterpolatedString.create(self.kid, parameters=parameters) if self.kid else None 96 self._typ = InterpolatedString.create(self.typ, parameters=parameters) if self.typ else None 97 self._cty = InterpolatedString.create(self.cty, parameters=parameters) if self.cty else None 98 self._iss = InterpolatedString.create(self.iss, parameters=parameters) if self.iss else None 99 self._sub = InterpolatedString.create(self.sub, parameters=parameters) if self.sub else None 100 self._aud = InterpolatedString.create(self.aud, parameters=parameters) if self.aud else None 101 self._additional_jwt_headers = InterpolatedMapping( 102 self.additional_jwt_headers or {}, parameters=parameters 103 ) 104 self._additional_jwt_payload = InterpolatedMapping( 105 self.additional_jwt_payload or {}, parameters=parameters 106 ) 107 108 def _get_jwt_headers(self) -> dict[str, Any]: 109 """ 110 Builds and returns the headers used when signing the JWT. 111 """ 112 headers = self._additional_jwt_headers.eval(self.config, json_loads=json.loads) 113 if any(prop in headers for prop in ["kid", "alg", "typ", "cty"]): 114 raise ValueError( 115 "'kid', 'alg', 'typ', 'cty' are reserved headers and should not be set as part of 'additional_jwt_headers'" 116 ) 117 118 if self._kid: 119 headers["kid"] = self._kid.eval(self.config, json_loads=json.loads) 120 if self._typ: 121 headers["typ"] = self._typ.eval(self.config, json_loads=json.loads) 122 if self._cty: 123 headers["cty"] = self._cty.eval(self.config, json_loads=json.loads) 124 headers["alg"] = self._algorithm 125 return headers 126 127 def _get_jwt_payload(self) -> dict[str, Any]: 128 """ 129 Builds and returns the payload used when signing the JWT. 130 """ 131 now = int(datetime.now().timestamp()) 132 exp = now + self._token_duration if isinstance(self._token_duration, int) else now 133 nbf = now 134 135 payload = self._additional_jwt_payload.eval(self.config, json_loads=json.loads) 136 if any(prop in payload for prop in ["iss", "sub", "aud", "iat", "exp", "nbf"]): 137 raise ValueError( 138 "'iss', 'sub', 'aud', 'iat', 'exp', 'nbf' are reserved properties and should not be set as part of 'additional_jwt_payload'" 139 ) 140 141 if self._iss: 142 payload["iss"] = self._iss.eval(self.config, json_loads=json.loads) 143 if self._sub: 144 payload["sub"] = self._sub.eval(self.config, json_loads=json.loads) 145 if self._aud: 146 payload["aud"] = self._aud.eval(self.config, json_loads=json.loads) 147 148 payload["iat"] = now 149 payload["exp"] = exp 150 payload["nbf"] = nbf 151 return payload 152 153 def _get_secret_key(self) -> str: 154 """ 155 Returns the secret key used to sign the JWT. 156 """ 157 secret_key: str = self._secret_key.eval(self.config, json_loads=json.loads) 158 return ( 159 base64.b64encode(secret_key.encode()).decode() 160 if self._base64_encode_secret_key 161 else secret_key 162 ) 163 164 def _get_signed_token(self) -> Union[str, Any]: 165 """ 166 Signed the JWT using the provided secret key and algorithm and the generated headers and payload. For additional information on PyJWT see: https://pyjwt.readthedocs.io/en/stable/ 167 """ 168 try: 169 return jwt.encode( 170 payload=self._get_jwt_payload(), 171 key=self._get_secret_key(), 172 algorithm=self._algorithm, 173 headers=self._get_jwt_headers(), 174 ) 175 except Exception as e: 176 raise ValueError(f"Failed to sign token: {e}") 177 178 def _get_header_prefix(self) -> Union[str, None]: 179 """ 180 Returns the header prefix to be used when attaching the token to the request. 181 """ 182 return ( 183 self._header_prefix.eval(self.config, json_loads=json.loads) 184 if self._header_prefix 185 else None 186 ) 187 188 @property 189 def auth_header(self) -> str: 190 return "Authorization" 191 192 @property 193 def token(self) -> str: 194 return ( 195 f"{self._get_header_prefix()} {self._get_signed_token()}" 196 if self._get_header_prefix() 197 else self._get_signed_token() 198 )
Generates a JSON Web Token (JWT) based on a declarative connector configuration file. The generated token is attached to each request via the Authorization header.
Attributes:
- config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
- secret_key (Union[InterpolatedString, str]): The secret key used to sign the JWT
- algorithm (Union[str, JwtAlgorithm]): The algorithm used to sign the JWT
- token_duration (Optional[int]): The duration in seconds for which the token is valid
- base64_encode_secret_key (Optional[Union[InterpolatedBoolean, str, bool]]): Whether to base64 encode the secret key
- header_prefix (Optional[Union[InterpolatedString, str]]): The prefix to add to the Authorization header
- kid (Optional[Union[InterpolatedString, str]]): The key identifier to be included in the JWT header
- typ (Optional[Union[InterpolatedString, str]]): The type of the JWT.
- cty (Optional[Union[InterpolatedString, str]]): The content type of the JWT.
- iss (Optional[Union[InterpolatedString, str]]): The issuer of the JWT.
- sub (Optional[Union[InterpolatedString, str]]): The subject of the JWT.
- aud (Optional[Union[InterpolatedString, str]]): The audience of the JWT.
- additional_jwt_headers (Optional[Mapping[str, Any]]): Additional headers to include in the JWT.
- additional_jwt_payload (Optional[Mapping[str, Any]]): Additional payload to include in the JWT.