airbyte_cdk.sources.declarative.auth
24@dataclass 25class DeclarativeOauth2Authenticator(AbstractOauth2Authenticator, DeclarativeAuthenticator): 26 """ 27 Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials based on 28 a declarative connector configuration file. Credentials can be defined explicitly or via interpolation 29 at runtime. The generated access token is attached to each request via the Authorization header. 30 31 Attributes: 32 token_refresh_endpoint (Union[InterpolatedString, str]): The endpoint to refresh the access token 33 client_id (Union[InterpolatedString, str]): The client id 34 client_secret (Union[InterpolatedString, str]): Client secret 35 refresh_token (Union[InterpolatedString, str]): The token used to refresh the access token 36 access_token_name (Union[InterpolatedString, str]): THe field to extract access token from in the response 37 expires_in_name (Union[InterpolatedString, str]): The field to extract expires_in from in the response 38 config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec 39 scopes (Optional[List[str]]): The scopes to request 40 token_expiry_date (Optional[Union[InterpolatedString, str]]): The access token expiration date 41 token_expiry_date_format str: format of the datetime; provide it if expires_in is returned in datetime instead of seconds 42 token_expiry_is_time_of_expiration bool: set True it if expires_in is returned as time of expiration instead of the number seconds until expiration 43 refresh_request_body (Optional[Mapping[str, Any]]): The request body to send in the refresh request 44 refresh_request_headers (Optional[Mapping[str, Any]]): The request headers to send in the refresh request 45 grant_type: The grant_type to request for access_token. If set to refresh_token, the refresh_token parameter has to be provided 46 message_repository (MessageRepository): the message repository used to emit logs on HTTP requests 47 """ 48 49 config: Mapping[str, Any] 50 parameters: InitVar[Mapping[str, Any]] 51 client_id: Optional[Union[InterpolatedString, str]] = None 52 client_secret: Optional[Union[InterpolatedString, str]] = None 53 token_refresh_endpoint: Optional[Union[InterpolatedString, str]] = None 54 refresh_token: Optional[Union[InterpolatedString, str]] = None 55 scopes: Optional[List[str]] = None 56 token_expiry_date: Optional[Union[InterpolatedString, str]] = None 57 _token_expiry_date: Optional[AirbyteDateTime] = field(init=False, repr=False, default=None) 58 token_expiry_date_format: Optional[str] = None 59 token_expiry_is_time_of_expiration: bool = False 60 access_token_name: Union[InterpolatedString, str] = "access_token" 61 access_token_value: Optional[Union[InterpolatedString, str]] = None 62 client_id_name: Union[InterpolatedString, str] = "client_id" 63 client_secret_name: Union[InterpolatedString, str] = "client_secret" 64 expires_in_name: Union[InterpolatedString, str] = "expires_in" 65 refresh_token_name: Union[InterpolatedString, str] = "refresh_token" 66 refresh_request_body: Optional[Mapping[str, Any]] = None 67 refresh_request_headers: Optional[Mapping[str, Any]] = None 68 grant_type_name: Union[InterpolatedString, str] = "grant_type" 69 grant_type: Union[InterpolatedString, str] = "refresh_token" 70 message_repository: MessageRepository = NoopMessageRepository() 71 profile_assertion: Optional[DeclarativeAuthenticator] = None 72 use_profile_assertion: Optional[Union[InterpolatedBoolean, str, bool]] = False 73 74 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 75 super().__init__() 76 if self.token_refresh_endpoint is not None: 77 self._token_refresh_endpoint: Optional[InterpolatedString] = InterpolatedString.create( 78 self.token_refresh_endpoint, parameters=parameters 79 ) 80 else: 81 self._token_refresh_endpoint = None 82 self._client_id_name = InterpolatedString.create(self.client_id_name, parameters=parameters) 83 self._client_id = ( 84 InterpolatedString.create(self.client_id, parameters=parameters) 85 if self.client_id 86 else self.client_id 87 ) 88 self._client_secret_name = InterpolatedString.create( 89 self.client_secret_name, parameters=parameters 90 ) 91 self._client_secret = ( 92 InterpolatedString.create(self.client_secret, parameters=parameters) 93 if self.client_secret 94 else self.client_secret 95 ) 96 self._refresh_token_name = InterpolatedString.create( 97 self.refresh_token_name, parameters=parameters 98 ) 99 if self.refresh_token is not None: 100 self._refresh_token: Optional[InterpolatedString] = InterpolatedString.create( 101 self.refresh_token, parameters=parameters 102 ) 103 else: 104 self._refresh_token = None 105 self.access_token_name = InterpolatedString.create( 106 self.access_token_name, parameters=parameters 107 ) 108 self.expires_in_name = InterpolatedString.create( 109 self.expires_in_name, parameters=parameters 110 ) 111 self.grant_type_name = InterpolatedString.create( 112 self.grant_type_name, parameters=parameters 113 ) 114 self.grant_type = InterpolatedString.create( 115 "urn:ietf:params:oauth:grant-type:jwt-bearer" 116 if self.use_profile_assertion 117 else self.grant_type, 118 parameters=parameters, 119 ) 120 self._refresh_request_body = InterpolatedMapping( 121 self.refresh_request_body or {}, parameters=parameters 122 ) 123 self._refresh_request_headers = InterpolatedMapping( 124 self.refresh_request_headers or {}, parameters=parameters 125 ) 126 try: 127 if ( 128 isinstance(self.token_expiry_date, (int, str)) 129 and str(self.token_expiry_date).isdigit() 130 ): 131 self._token_expiry_date = ab_datetime_parse(self.token_expiry_date) 132 else: 133 self._token_expiry_date = ( 134 ab_datetime_parse( 135 InterpolatedString.create( 136 self.token_expiry_date, parameters=parameters 137 ).eval(self.config) 138 ) 139 if self.token_expiry_date 140 else ab_datetime_now() - timedelta(days=1) 141 ) 142 except ValueError as e: 143 raise ValueError(f"Invalid token expiry date format: {e}") 144 self.use_profile_assertion = ( 145 InterpolatedBoolean(self.use_profile_assertion, parameters=parameters) 146 if isinstance(self.use_profile_assertion, str) 147 else self.use_profile_assertion 148 ) 149 self.assertion_name = "assertion" 150 151 if self.access_token_value is not None: 152 self._access_token_value = InterpolatedString.create( 153 self.access_token_value, parameters=parameters 154 ).eval(self.config) 155 else: 156 self._access_token_value = None 157 158 self._access_token: Optional[str] = ( 159 self._access_token_value if self.access_token_value else None 160 ) 161 162 if not self.use_profile_assertion and any( 163 client_creds is None for client_creds in [self.client_id, self.client_secret] 164 ): 165 raise ValueError( 166 "OAuthAuthenticator configuration error: Both 'client_id' and 'client_secret' are required for the " 167 "basic OAuth flow." 168 ) 169 if self.profile_assertion is None and self.use_profile_assertion: 170 raise ValueError( 171 "OAuthAuthenticator configuration error: 'profile_assertion' is required when using the profile assertion flow." 172 ) 173 if self.get_grant_type() == "refresh_token" and self._refresh_token is None: 174 raise ValueError( 175 "OAuthAuthenticator configuration error: A 'refresh_token' is required when the 'grant_type' is set to 'refresh_token'." 176 ) 177 178 def get_token_refresh_endpoint(self) -> Optional[str]: 179 if self._token_refresh_endpoint is not None: 180 refresh_token_endpoint: str = self._token_refresh_endpoint.eval(self.config) 181 if not refresh_token_endpoint: 182 raise ValueError( 183 "OAuthAuthenticator was unable to evaluate token_refresh_endpoint parameter" 184 ) 185 return refresh_token_endpoint 186 return None 187 188 def get_client_id_name(self) -> str: 189 return self._client_id_name.eval(self.config) # type: ignore # eval returns a string in this context 190 191 def get_client_id(self) -> str: 192 client_id = self._client_id.eval(self.config) if self._client_id else self._client_id 193 if not client_id: 194 raise ValueError("OAuthAuthenticator was unable to evaluate client_id parameter") 195 return client_id # type: ignore # value will be returned as a string, or an error will be raised 196 197 def get_client_secret_name(self) -> str: 198 return self._client_secret_name.eval(self.config) # type: ignore # eval returns a string in this context 199 200 def get_client_secret(self) -> str: 201 client_secret = ( 202 self._client_secret.eval(self.config) if self._client_secret else self._client_secret 203 ) 204 if not client_secret: 205 raise ValueError("OAuthAuthenticator was unable to evaluate client_secret parameter") 206 return client_secret # type: ignore # value will be returned as a string, or an error will be raised 207 208 def get_refresh_token_name(self) -> str: 209 return self._refresh_token_name.eval(self.config) # type: ignore # eval returns a string in this context 210 211 def get_refresh_token(self) -> Optional[str]: 212 return None if self._refresh_token is None else str(self._refresh_token.eval(self.config)) 213 214 def get_scopes(self) -> List[str]: 215 return self.scopes or [] 216 217 def get_access_token_name(self) -> str: 218 return self.access_token_name.eval(self.config) # type: ignore # eval returns a string in this context 219 220 def get_expires_in_name(self) -> str: 221 return self.expires_in_name.eval(self.config) # type: ignore # eval returns a string in this context 222 223 def get_grant_type_name(self) -> str: 224 return self.grant_type_name.eval(self.config) # type: ignore # eval returns a string in this context 225 226 def get_grant_type(self) -> str: 227 return self.grant_type.eval(self.config) # type: ignore # eval returns a string in this context 228 229 def get_refresh_request_body(self) -> Mapping[str, Any]: 230 return self._refresh_request_body.eval(self.config) 231 232 def get_refresh_request_headers(self) -> Mapping[str, Any]: 233 return self._refresh_request_headers.eval(self.config) 234 235 def get_token_expiry_date(self) -> AirbyteDateTime: 236 if not self._has_access_token_been_initialized(): 237 return AirbyteDateTime.from_datetime(datetime.min) 238 return self._token_expiry_date # type: ignore # _token_expiry_date is an AirbyteDateTime. It is never None despite what mypy thinks 239 240 def _has_access_token_been_initialized(self) -> bool: 241 return self._access_token is not None 242 243 def set_token_expiry_date(self, value: Union[str, int]) -> None: 244 self._token_expiry_date = self._parse_token_expiration_date(value) 245 246 def get_assertion_name(self) -> str: 247 return self.assertion_name 248 249 def get_assertion(self) -> str: 250 if self.profile_assertion is None: 251 raise ValueError("profile_assertion is not set") 252 return self.profile_assertion.token 253 254 def build_refresh_request_body(self) -> Mapping[str, Any]: 255 """ 256 Returns the request body to set on the refresh request 257 258 Override to define additional parameters 259 """ 260 if self.use_profile_assertion: 261 return { 262 self.get_grant_type_name(): self.get_grant_type(), 263 self.get_assertion_name(): self.get_assertion(), 264 } 265 return super().build_refresh_request_body() 266 267 @property 268 def access_token(self) -> str: 269 if self._access_token is None: 270 raise ValueError("access_token is not set") 271 return self._access_token 272 273 @access_token.setter 274 def access_token(self, value: str) -> None: 275 self._access_token = value 276 277 @property 278 def _message_repository(self) -> MessageRepository: 279 """ 280 Overriding AbstractOauth2Authenticator._message_repository to allow for HTTP request logs 281 """ 282 return self.message_repository
Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials based on a declarative connector configuration file. Credentials can be defined explicitly or via interpolation at runtime. The generated access token is attached to each request via the Authorization header.
Attributes:
- token_refresh_endpoint (Union[InterpolatedString, str]): The endpoint to refresh the access token
- client_id (Union[InterpolatedString, str]): The client id
- client_secret (Union[InterpolatedString, str]): Client secret
- refresh_token (Union[InterpolatedString, str]): The token used to refresh the access token
- access_token_name (Union[InterpolatedString, str]): THe field to extract access token from in the response
- expires_in_name (Union[InterpolatedString, str]): The field to extract expires_in from in the response
- config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
- scopes (Optional[List[str]]): The scopes to request
- token_expiry_date (Optional[Union[InterpolatedString, str]]): The access token expiration date
- token_expiry_date_format str: format of the datetime; provide it if expires_in is returned in datetime instead of seconds
- token_expiry_is_time_of_expiration bool: set True it if expires_in is returned as time of expiration instead of the number seconds until expiration
- refresh_request_body (Optional[Mapping[str, Any]]): The request body to send in the refresh request
- refresh_request_headers (Optional[Mapping[str, Any]]): The request headers to send in the refresh request
- grant_type: The grant_type to request for access_token. If set to refresh_token, the refresh_token parameter has to be provided
- message_repository (MessageRepository): the message repository used to emit logs on HTTP requests
If all of refresh_token_error_status_codes, refresh_token_error_key, and refresh_token_error_values are set, then http errors with such params will be wrapped in AirbyteTracedException.
Format of the datetime; exists it if expires_in is returned as the expiration datetime instead of seconds until it expires
Indicates that the Token Expiry returns the date until which the token will be valid, not the amount of time it will be valid.
178 def get_token_refresh_endpoint(self) -> Optional[str]: 179 if self._token_refresh_endpoint is not None: 180 refresh_token_endpoint: str = self._token_refresh_endpoint.eval(self.config) 181 if not refresh_token_endpoint: 182 raise ValueError( 183 "OAuthAuthenticator was unable to evaluate token_refresh_endpoint parameter" 184 ) 185 return refresh_token_endpoint 186 return None
Returns the endpoint to refresh the access token
188 def get_client_id_name(self) -> str: 189 return self._client_id_name.eval(self.config) # type: ignore # eval returns a string in this context
The client id name to authenticate
191 def get_client_id(self) -> str: 192 client_id = self._client_id.eval(self.config) if self._client_id else self._client_id 193 if not client_id: 194 raise ValueError("OAuthAuthenticator was unable to evaluate client_id parameter") 195 return client_id # type: ignore # value will be returned as a string, or an error will be raised
The client id to authenticate
197 def get_client_secret_name(self) -> str: 198 return self._client_secret_name.eval(self.config) # type: ignore # eval returns a string in this context
The client secret name to authenticate
200 def get_client_secret(self) -> str: 201 client_secret = ( 202 self._client_secret.eval(self.config) if self._client_secret else self._client_secret 203 ) 204 if not client_secret: 205 raise ValueError("OAuthAuthenticator was unable to evaluate client_secret parameter") 206 return client_secret # type: ignore # value will be returned as a string, or an error will be raised
The client secret to authenticate
208 def get_refresh_token_name(self) -> str: 209 return self._refresh_token_name.eval(self.config) # type: ignore # eval returns a string in this context
The refresh token name to authenticate
211 def get_refresh_token(self) -> Optional[str]: 212 return None if self._refresh_token is None else str(self._refresh_token.eval(self.config))
The token used to refresh the access token when it expires
217 def get_access_token_name(self) -> str: 218 return self.access_token_name.eval(self.config) # type: ignore # eval returns a string in this context
Field to extract access token from in the response
220 def get_expires_in_name(self) -> str: 221 return self.expires_in_name.eval(self.config) # type: ignore # eval returns a string in this context
Returns the expires_in field name
223 def get_grant_type_name(self) -> str: 224 return self.grant_type_name.eval(self.config) # type: ignore # eval returns a string in this context
Returns grant_type specified name for requesting access_token
226 def get_grant_type(self) -> str: 227 return self.grant_type.eval(self.config) # type: ignore # eval returns a string in this context
Returns grant_type specified for requesting access_token
229 def get_refresh_request_body(self) -> Mapping[str, Any]: 230 return self._refresh_request_body.eval(self.config)
Returns the request body to set on the refresh request
232 def get_refresh_request_headers(self) -> Mapping[str, Any]: 233 return self._refresh_request_headers.eval(self.config)
Returns the request headers to set on the refresh request
235 def get_token_expiry_date(self) -> AirbyteDateTime: 236 if not self._has_access_token_been_initialized(): 237 return AirbyteDateTime.from_datetime(datetime.min) 238 return self._token_expiry_date # type: ignore # _token_expiry_date is an AirbyteDateTime. It is never None despite what mypy thinks
Expiration date of the access token
243 def set_token_expiry_date(self, value: Union[str, int]) -> None: 244 self._token_expiry_date = self._parse_token_expiration_date(value)
Setter for access token expiration date
254 def build_refresh_request_body(self) -> Mapping[str, Any]: 255 """ 256 Returns the request body to set on the refresh request 257 258 Override to define additional parameters 259 """ 260 if self.use_profile_assertion: 261 return { 262 self.get_grant_type_name(): self.get_grant_type(), 263 self.get_assertion_name(): self.get_assertion(), 264 } 265 return super().build_refresh_request_body()
Returns the request body to set on the refresh request
Override to define additional parameters
267 @property 268 def access_token(self) -> str: 269 if self._access_token is None: 270 raise ValueError("access_token is not set") 271 return self._access_token
Returns the access token
Inherited Members
41@dataclass 42class JwtAuthenticator(DeclarativeAuthenticator): 43 """ 44 Generates a JSON Web Token (JWT) based on a declarative connector configuration file. The generated token is attached to each request via the Authorization header. 45 46 Attributes: 47 config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec 48 secret_key (Union[InterpolatedString, str]): The secret key used to sign the JWT 49 algorithm (Union[str, JwtAlgorithm]): The algorithm used to sign the JWT 50 token_duration (Optional[int]): The duration in seconds for which the token is valid 51 base64_encode_secret_key (Optional[Union[InterpolatedBoolean, str, bool]]): Whether to base64 encode the secret key 52 header_prefix (Optional[Union[InterpolatedString, str]]): The prefix to add to the Authorization header 53 kid (Optional[Union[InterpolatedString, str]]): The key identifier to be included in the JWT header 54 typ (Optional[Union[InterpolatedString, str]]): The type of the JWT. 55 cty (Optional[Union[InterpolatedString, str]]): The content type of the JWT. 56 iss (Optional[Union[InterpolatedString, str]]): The issuer of the JWT. 57 sub (Optional[Union[InterpolatedString, str]]): The subject of the JWT. 58 aud (Optional[Union[InterpolatedString, str]]): The audience of the JWT. 59 additional_jwt_headers (Optional[Mapping[str, Any]]): Additional headers to include in the JWT. 60 additional_jwt_payload (Optional[Mapping[str, Any]]): Additional payload to include in the JWT. 61 """ 62 63 config: Mapping[str, Any] 64 parameters: InitVar[Mapping[str, Any]] 65 secret_key: Union[InterpolatedString, str] 66 algorithm: Union[str, JwtAlgorithm] 67 token_duration: Optional[int] 68 base64_encode_secret_key: Optional[Union[InterpolatedBoolean, str, bool]] = False 69 header_prefix: Optional[Union[InterpolatedString, str]] = None 70 kid: Optional[Union[InterpolatedString, str]] = None 71 typ: Optional[Union[InterpolatedString, str]] = None 72 cty: Optional[Union[InterpolatedString, str]] = None 73 iss: Optional[Union[InterpolatedString, str]] = None 74 sub: Optional[Union[InterpolatedString, str]] = None 75 aud: Optional[Union[InterpolatedString, str]] = None 76 additional_jwt_headers: Optional[Mapping[str, Any]] = None 77 additional_jwt_payload: Optional[Mapping[str, Any]] = None 78 79 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 80 self._secret_key = InterpolatedString.create(self.secret_key, parameters=parameters) 81 self._algorithm = ( 82 JwtAlgorithm(self.algorithm) if isinstance(self.algorithm, str) else self.algorithm 83 ) 84 self._base64_encode_secret_key = ( 85 InterpolatedBoolean(self.base64_encode_secret_key, parameters=parameters) 86 if isinstance(self.base64_encode_secret_key, str) 87 else self.base64_encode_secret_key 88 ) 89 self._token_duration = self.token_duration 90 self._header_prefix = ( 91 InterpolatedString.create(self.header_prefix, parameters=parameters) 92 if self.header_prefix 93 else None 94 ) 95 self._kid = InterpolatedString.create(self.kid, parameters=parameters) if self.kid else None 96 self._typ = InterpolatedString.create(self.typ, parameters=parameters) if self.typ else None 97 self._cty = InterpolatedString.create(self.cty, parameters=parameters) if self.cty else None 98 self._iss = InterpolatedString.create(self.iss, parameters=parameters) if self.iss else None 99 self._sub = InterpolatedString.create(self.sub, parameters=parameters) if self.sub else None 100 self._aud = InterpolatedString.create(self.aud, parameters=parameters) if self.aud else None 101 self._additional_jwt_headers = InterpolatedMapping( 102 self.additional_jwt_headers or {}, parameters=parameters 103 ) 104 self._additional_jwt_payload = InterpolatedMapping( 105 self.additional_jwt_payload or {}, parameters=parameters 106 ) 107 108 def _get_jwt_headers(self) -> dict[str, Any]: 109 """ 110 Builds and returns the headers used when signing the JWT. 111 """ 112 headers = self._additional_jwt_headers.eval(self.config, json_loads=json.loads) 113 if any(prop in headers for prop in ["kid", "alg", "typ", "cty"]): 114 raise ValueError( 115 "'kid', 'alg', 'typ', 'cty' are reserved headers and should not be set as part of 'additional_jwt_headers'" 116 ) 117 118 if self._kid: 119 headers["kid"] = self._kid.eval(self.config, json_loads=json.loads) 120 if self._typ: 121 headers["typ"] = self._typ.eval(self.config, json_loads=json.loads) 122 if self._cty: 123 headers["cty"] = self._cty.eval(self.config, json_loads=json.loads) 124 headers["alg"] = self._algorithm 125 return headers 126 127 def _get_jwt_payload(self) -> dict[str, Any]: 128 """ 129 Builds and returns the payload used when signing the JWT. 130 """ 131 now = int(datetime.now().timestamp()) 132 exp = now + self._token_duration if isinstance(self._token_duration, int) else now 133 nbf = now 134 135 payload = self._additional_jwt_payload.eval(self.config, json_loads=json.loads) 136 if any(prop in payload for prop in ["iss", "sub", "aud", "iat", "exp", "nbf"]): 137 raise ValueError( 138 "'iss', 'sub', 'aud', 'iat', 'exp', 'nbf' are reserved properties and should not be set as part of 'additional_jwt_payload'" 139 ) 140 141 if self._iss: 142 payload["iss"] = self._iss.eval(self.config, json_loads=json.loads) 143 if self._sub: 144 payload["sub"] = self._sub.eval(self.config, json_loads=json.loads) 145 if self._aud: 146 payload["aud"] = self._aud.eval(self.config, json_loads=json.loads) 147 148 payload["iat"] = now 149 payload["exp"] = exp 150 payload["nbf"] = nbf 151 return payload 152 153 def _get_secret_key(self) -> str: 154 """ 155 Returns the secret key used to sign the JWT. 156 """ 157 secret_key: str = self._secret_key.eval(self.config, json_loads=json.loads) 158 return ( 159 base64.b64encode(secret_key.encode()).decode() 160 if self._base64_encode_secret_key 161 else secret_key 162 ) 163 164 def _get_signed_token(self) -> Union[str, Any]: 165 """ 166 Signed the JWT using the provided secret key and algorithm and the generated headers and payload. For additional information on PyJWT see: https://pyjwt.readthedocs.io/en/stable/ 167 """ 168 try: 169 return jwt.encode( 170 payload=self._get_jwt_payload(), 171 key=self._get_secret_key(), 172 algorithm=self._algorithm, 173 headers=self._get_jwt_headers(), 174 ) 175 except Exception as e: 176 raise ValueError(f"Failed to sign token: {e}") 177 178 def _get_header_prefix(self) -> Union[str, None]: 179 """ 180 Returns the header prefix to be used when attaching the token to the request. 181 """ 182 return ( 183 self._header_prefix.eval(self.config, json_loads=json.loads) 184 if self._header_prefix 185 else None 186 ) 187 188 @property 189 def auth_header(self) -> str: 190 return "Authorization" 191 192 @property 193 def token(self) -> str: 194 return ( 195 f"{self._get_header_prefix()} {self._get_signed_token()}" 196 if self._get_header_prefix() 197 else self._get_signed_token() 198 )
Generates a JSON Web Token (JWT) based on a declarative connector configuration file. The generated token is attached to each request via the Authorization header.
Attributes:
- config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
- secret_key (Union[InterpolatedString, str]): The secret key used to sign the JWT
- algorithm (Union[str, JwtAlgorithm]): The algorithm used to sign the JWT
- token_duration (Optional[int]): The duration in seconds for which the token is valid
- base64_encode_secret_key (Optional[Union[InterpolatedBoolean, str, bool]]): Whether to base64 encode the secret key
- header_prefix (Optional[Union[InterpolatedString, str]]): The prefix to add to the Authorization header
- kid (Optional[Union[InterpolatedString, str]]): The key identifier to be included in the JWT header
- typ (Optional[Union[InterpolatedString, str]]): The type of the JWT.
- cty (Optional[Union[InterpolatedString, str]]): The content type of the JWT.
- iss (Optional[Union[InterpolatedString, str]]): The issuer of the JWT.
- sub (Optional[Union[InterpolatedString, str]]): The subject of the JWT.
- aud (Optional[Union[InterpolatedString, str]]): The audience of the JWT.
- additional_jwt_headers (Optional[Mapping[str, Any]]): Additional headers to include in the JWT.
- additional_jwt_payload (Optional[Mapping[str, Any]]): Additional payload to include in the JWT.