airbyte_cdk.sources.declarative.auth
27@dataclass 28class DeclarativeOauth2Authenticator(AbstractOauth2Authenticator, DeclarativeAuthenticator): 29 """ 30 Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials based on 31 a declarative connector configuration file. Credentials can be defined explicitly or via interpolation 32 at runtime. The generated access token is attached to each request via the Authorization header. 33 34 Attributes: 35 token_refresh_endpoint (Union[InterpolatedString, str]): The endpoint to refresh the access token 36 client_id (Union[InterpolatedString, str]): The client id 37 client_secret (Union[InterpolatedString, str]): Client secret (can be empty for APIs that support this) 38 refresh_token (Union[InterpolatedString, str]): The token used to refresh the access token 39 access_token_name (Union[InterpolatedString, str]): THe field to extract access token from in the response 40 expires_in_name (Union[InterpolatedString, str]): The field to extract expires_in from in the response 41 config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec 42 scopes (Optional[List[str]]): The scopes to request 43 token_expiry_date (Optional[Union[InterpolatedString, str]]): The access token expiration date 44 token_expiry_date_format str: format of the datetime; provide it if expires_in is returned in datetime instead of seconds 45 token_expiry_is_time_of_expiration bool: set True it if expires_in is returned as time of expiration instead of the number seconds until expiration 46 refresh_request_body (Optional[Mapping[str, Any]]): The request body to send in the refresh request 47 refresh_request_headers (Optional[Mapping[str, Any]]): The request headers to send in the refresh request 48 grant_type: The grant_type to request for access_token. If set to refresh_token, the refresh_token parameter has to be provided 49 message_repository (MessageRepository): the message repository used to emit logs on HTTP requests 50 """ 51 52 config: Mapping[str, Any] 53 parameters: InitVar[Mapping[str, Any]] 54 client_id: Optional[Union[InterpolatedString, str]] = None 55 client_secret: Optional[Union[InterpolatedString, str]] = None 56 token_refresh_endpoint: Optional[Union[InterpolatedString, str]] = None 57 refresh_token: Optional[Union[InterpolatedString, str]] = None 58 scopes: Optional[List[str]] = None 59 token_expiry_date: Optional[Union[InterpolatedString, str]] = None 60 _token_expiry_date: Optional[AirbyteDateTime] = field(init=False, repr=False, default=None) 61 token_expiry_date_format: Optional[str] = None 62 token_expiry_is_time_of_expiration: bool = False 63 access_token_name: Union[InterpolatedString, str] = "access_token" 64 access_token_value: Optional[Union[InterpolatedString, str]] = None 65 client_id_name: Union[InterpolatedString, str] = "client_id" 66 client_secret_name: Union[InterpolatedString, str] = "client_secret" 67 expires_in_name: Union[InterpolatedString, str] = "expires_in" 68 refresh_token_name: Union[InterpolatedString, str] = "refresh_token" 69 refresh_request_body: Optional[Mapping[str, Any]] = None 70 refresh_request_headers: Optional[Mapping[str, Any]] = None 71 grant_type_name: Union[InterpolatedString, str] = "grant_type" 72 grant_type: Union[InterpolatedString, str] = "refresh_token" 73 message_repository: MessageRepository = NoopMessageRepository() 74 profile_assertion: Optional[DeclarativeAuthenticator] = None 75 use_profile_assertion: Optional[Union[InterpolatedBoolean, str, bool]] = False 76 77 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 78 super().__init__() 79 if self.token_refresh_endpoint is not None: 80 self._token_refresh_endpoint: Optional[InterpolatedString] = InterpolatedString.create( 81 self.token_refresh_endpoint, parameters=parameters 82 ) 83 else: 84 self._token_refresh_endpoint = None 85 self._client_id_name = InterpolatedString.create(self.client_id_name, parameters=parameters) 86 self._client_id = ( 87 InterpolatedString.create(self.client_id, parameters=parameters) 88 if self.client_id 89 else self.client_id 90 ) 91 self._client_secret_name = InterpolatedString.create( 92 self.client_secret_name, parameters=parameters 93 ) 94 self._client_secret = ( 95 InterpolatedString.create(self.client_secret, parameters=parameters) 96 if self.client_secret 97 else self.client_secret 98 ) 99 self._refresh_token_name = InterpolatedString.create( 100 self.refresh_token_name, parameters=parameters 101 ) 102 if self.refresh_token is not None: 103 self._refresh_token: Optional[InterpolatedString] = InterpolatedString.create( 104 self.refresh_token, parameters=parameters 105 ) 106 else: 107 self._refresh_token = None 108 self.access_token_name = InterpolatedString.create( 109 self.access_token_name, parameters=parameters 110 ) 111 self.expires_in_name = InterpolatedString.create( 112 self.expires_in_name, parameters=parameters 113 ) 114 self.grant_type_name = InterpolatedString.create( 115 self.grant_type_name, parameters=parameters 116 ) 117 self.grant_type = InterpolatedString.create( 118 "urn:ietf:params:oauth:grant-type:jwt-bearer" 119 if self.use_profile_assertion 120 else self.grant_type, 121 parameters=parameters, 122 ) 123 self._refresh_request_body = InterpolatedMapping( 124 self.refresh_request_body or {}, parameters=parameters 125 ) 126 self._refresh_request_headers = InterpolatedMapping( 127 self.refresh_request_headers or {}, parameters=parameters 128 ) 129 try: 130 if ( 131 isinstance(self.token_expiry_date, (int, str)) 132 and str(self.token_expiry_date).isdigit() 133 ): 134 self._token_expiry_date = ab_datetime_parse(self.token_expiry_date) 135 else: 136 self._token_expiry_date = ( 137 ab_datetime_parse( 138 InterpolatedString.create( 139 self.token_expiry_date, parameters=parameters 140 ).eval(self.config) 141 ) 142 if self.token_expiry_date 143 else ab_datetime_now() - timedelta(days=1) 144 ) 145 except ValueError as e: 146 raise ValueError(f"Invalid token expiry date format: {e}") 147 self.use_profile_assertion = ( 148 InterpolatedBoolean(self.use_profile_assertion, parameters=parameters) 149 if isinstance(self.use_profile_assertion, str) 150 else self.use_profile_assertion 151 ) 152 self.assertion_name = "assertion" 153 154 if self.access_token_value is not None: 155 self._access_token_value = InterpolatedString.create( 156 self.access_token_value, parameters=parameters 157 ).eval(self.config) 158 else: 159 self._access_token_value = None 160 161 self._access_token: Optional[str] = ( 162 self._access_token_value if self.access_token_value else None 163 ) 164 165 if not self.use_profile_assertion and any( 166 client_creds is None for client_creds in [self.client_id, self.client_secret] 167 ): 168 raise ValueError( 169 "OAuthAuthenticator configuration error: Both 'client_id' and 'client_secret' are required for the " 170 "basic OAuth flow." 171 ) 172 if self.profile_assertion is None and self.use_profile_assertion: 173 raise ValueError( 174 "OAuthAuthenticator configuration error: 'profile_assertion' is required when using the profile assertion flow." 175 ) 176 if self.get_grant_type() == "refresh_token" and self._refresh_token is None: 177 raise ValueError( 178 "OAuthAuthenticator configuration error: A 'refresh_token' is required when the 'grant_type' is set to 'refresh_token'." 179 ) 180 181 def get_token_refresh_endpoint(self) -> Optional[str]: 182 if self._token_refresh_endpoint is not None: 183 refresh_token_endpoint: str = self._token_refresh_endpoint.eval(self.config) 184 if not refresh_token_endpoint: 185 raise ValueError( 186 "OAuthAuthenticator was unable to evaluate token_refresh_endpoint parameter" 187 ) 188 return refresh_token_endpoint 189 return None 190 191 def get_client_id_name(self) -> str: 192 return self._client_id_name.eval(self.config) # type: ignore # eval returns a string in this context 193 194 def get_client_id(self) -> str: 195 client_id = self._client_id.eval(self.config) if self._client_id else self._client_id 196 if not client_id: 197 raise ValueError("OAuthAuthenticator was unable to evaluate client_id parameter") 198 return client_id # type: ignore # value will be returned as a string, or an error will be raised 199 200 def get_client_secret_name(self) -> str: 201 return self._client_secret_name.eval(self.config) # type: ignore # eval returns a string in this context 202 203 def get_client_secret(self) -> str: 204 client_secret = ( 205 self._client_secret.eval(self.config) if self._client_secret else self._client_secret 206 ) 207 if not client_secret: 208 # We've seen some APIs allowing empty client_secret so we will only log here 209 logger.warning( 210 "OAuthAuthenticator was unable to evaluate client_secret parameter hence it'll be empty" 211 ) 212 return client_secret # type: ignore # value will be returned as a string, which might be empty 213 214 def get_refresh_token_name(self) -> str: 215 return self._refresh_token_name.eval(self.config) # type: ignore # eval returns a string in this context 216 217 def get_refresh_token(self) -> Optional[str]: 218 return None if self._refresh_token is None else str(self._refresh_token.eval(self.config)) 219 220 def get_scopes(self) -> List[str]: 221 return self.scopes or [] 222 223 def get_access_token_name(self) -> str: 224 return self.access_token_name.eval(self.config) # type: ignore # eval returns a string in this context 225 226 def get_expires_in_name(self) -> str: 227 return self.expires_in_name.eval(self.config) # type: ignore # eval returns a string in this context 228 229 def get_grant_type_name(self) -> str: 230 return self.grant_type_name.eval(self.config) # type: ignore # eval returns a string in this context 231 232 def get_grant_type(self) -> str: 233 return self.grant_type.eval(self.config) # type: ignore # eval returns a string in this context 234 235 def get_refresh_request_body(self) -> Mapping[str, Any]: 236 return self._refresh_request_body.eval(self.config) 237 238 def get_refresh_request_headers(self) -> Mapping[str, Any]: 239 return self._refresh_request_headers.eval(self.config) 240 241 def get_token_expiry_date(self) -> AirbyteDateTime: 242 if not self._has_access_token_been_initialized(): 243 return AirbyteDateTime.from_datetime(datetime.min) 244 return self._token_expiry_date # type: ignore # _token_expiry_date is an AirbyteDateTime. It is never None despite what mypy thinks 245 246 def _has_access_token_been_initialized(self) -> bool: 247 return self._access_token is not None 248 249 def set_token_expiry_date(self, value: AirbyteDateTime) -> None: 250 self._token_expiry_date = value 251 252 def get_assertion_name(self) -> str: 253 return self.assertion_name 254 255 def get_assertion(self) -> str: 256 if self.profile_assertion is None: 257 raise ValueError("profile_assertion is not set") 258 return self.profile_assertion.token 259 260 def build_refresh_request_body(self) -> Mapping[str, Any]: 261 """ 262 Returns the request body to set on the refresh request 263 264 Override to define additional parameters 265 """ 266 if self.use_profile_assertion: 267 return { 268 self.get_grant_type_name(): self.get_grant_type(), 269 self.get_assertion_name(): self.get_assertion(), 270 } 271 return super().build_refresh_request_body() 272 273 @property 274 def access_token(self) -> str: 275 if self._access_token is None: 276 raise ValueError("access_token is not set") 277 return self._access_token 278 279 @access_token.setter 280 def access_token(self, value: str) -> None: 281 self._access_token = value 282 283 @property 284 def _message_repository(self) -> MessageRepository: 285 """ 286 Overriding AbstractOauth2Authenticator._message_repository to allow for HTTP request logs 287 """ 288 return self.message_repository
Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials based on a declarative connector configuration file. Credentials can be defined explicitly or via interpolation at runtime. The generated access token is attached to each request via the Authorization header.
Attributes:
- token_refresh_endpoint (Union[InterpolatedString, str]): The endpoint to refresh the access token
- client_id (Union[InterpolatedString, str]): The client id
- client_secret (Union[InterpolatedString, str]): Client secret (can be empty for APIs that support this)
- refresh_token (Union[InterpolatedString, str]): The token used to refresh the access token
- access_token_name (Union[InterpolatedString, str]): THe field to extract access token from in the response
- expires_in_name (Union[InterpolatedString, str]): The field to extract expires_in from in the response
- config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
- scopes (Optional[List[str]]): The scopes to request
- token_expiry_date (Optional[Union[InterpolatedString, str]]): The access token expiration date
- token_expiry_date_format str: format of the datetime; provide it if expires_in is returned in datetime instead of seconds
- token_expiry_is_time_of_expiration bool: set True it if expires_in is returned as time of expiration instead of the number seconds until expiration
- refresh_request_body (Optional[Mapping[str, Any]]): The request body to send in the refresh request
- refresh_request_headers (Optional[Mapping[str, Any]]): The request headers to send in the refresh request
- grant_type: The grant_type to request for access_token. If set to refresh_token, the refresh_token parameter has to be provided
- message_repository (MessageRepository): the message repository used to emit logs on HTTP requests
If all of refresh_token_error_status_codes, refresh_token_error_key, and refresh_token_error_values are set, then http errors with such params will be wrapped in AirbyteTracedException.
Format of the datetime; exists it if expires_in is returned as the expiration datetime instead of seconds until it expires
Indicates that the Token Expiry returns the date until which the token will be valid, not the amount of time it will be valid.
181 def get_token_refresh_endpoint(self) -> Optional[str]: 182 if self._token_refresh_endpoint is not None: 183 refresh_token_endpoint: str = self._token_refresh_endpoint.eval(self.config) 184 if not refresh_token_endpoint: 185 raise ValueError( 186 "OAuthAuthenticator was unable to evaluate token_refresh_endpoint parameter" 187 ) 188 return refresh_token_endpoint 189 return None
Returns the endpoint to refresh the access token
191 def get_client_id_name(self) -> str: 192 return self._client_id_name.eval(self.config) # type: ignore # eval returns a string in this context
The client id name to authenticate
194 def get_client_id(self) -> str: 195 client_id = self._client_id.eval(self.config) if self._client_id else self._client_id 196 if not client_id: 197 raise ValueError("OAuthAuthenticator was unable to evaluate client_id parameter") 198 return client_id # type: ignore # value will be returned as a string, or an error will be raised
The client id to authenticate
200 def get_client_secret_name(self) -> str: 201 return self._client_secret_name.eval(self.config) # type: ignore # eval returns a string in this context
The client secret name to authenticate
203 def get_client_secret(self) -> str: 204 client_secret = ( 205 self._client_secret.eval(self.config) if self._client_secret else self._client_secret 206 ) 207 if not client_secret: 208 # We've seen some APIs allowing empty client_secret so we will only log here 209 logger.warning( 210 "OAuthAuthenticator was unable to evaluate client_secret parameter hence it'll be empty" 211 ) 212 return client_secret # type: ignore # value will be returned as a string, which might be empty
The client secret to authenticate
214 def get_refresh_token_name(self) -> str: 215 return self._refresh_token_name.eval(self.config) # type: ignore # eval returns a string in this context
The refresh token name to authenticate
217 def get_refresh_token(self) -> Optional[str]: 218 return None if self._refresh_token is None else str(self._refresh_token.eval(self.config))
The token used to refresh the access token when it expires
223 def get_access_token_name(self) -> str: 224 return self.access_token_name.eval(self.config) # type: ignore # eval returns a string in this context
Field to extract access token from in the response
226 def get_expires_in_name(self) -> str: 227 return self.expires_in_name.eval(self.config) # type: ignore # eval returns a string in this context
Returns the expires_in field name
229 def get_grant_type_name(self) -> str: 230 return self.grant_type_name.eval(self.config) # type: ignore # eval returns a string in this context
Returns grant_type specified name for requesting access_token
232 def get_grant_type(self) -> str: 233 return self.grant_type.eval(self.config) # type: ignore # eval returns a string in this context
Returns grant_type specified for requesting access_token
235 def get_refresh_request_body(self) -> Mapping[str, Any]: 236 return self._refresh_request_body.eval(self.config)
Returns the request body to set on the refresh request
238 def get_refresh_request_headers(self) -> Mapping[str, Any]: 239 return self._refresh_request_headers.eval(self.config)
Returns the request headers to set on the refresh request
241 def get_token_expiry_date(self) -> AirbyteDateTime: 242 if not self._has_access_token_been_initialized(): 243 return AirbyteDateTime.from_datetime(datetime.min) 244 return self._token_expiry_date # type: ignore # _token_expiry_date is an AirbyteDateTime. It is never None despite what mypy thinks
Expiration date of the access token
249 def set_token_expiry_date(self, value: AirbyteDateTime) -> None: 250 self._token_expiry_date = value
Setter for access token expiration date
260 def build_refresh_request_body(self) -> Mapping[str, Any]: 261 """ 262 Returns the request body to set on the refresh request 263 264 Override to define additional parameters 265 """ 266 if self.use_profile_assertion: 267 return { 268 self.get_grant_type_name(): self.get_grant_type(), 269 self.get_assertion_name(): self.get_assertion(), 270 } 271 return super().build_refresh_request_body()
Returns the request body to set on the refresh request
Override to define additional parameters
273 @property 274 def access_token(self) -> str: 275 if self._access_token is None: 276 raise ValueError("access_token is not set") 277 return self._access_token
Returns the access token
Inherited Members
55@dataclass 56class JwtAuthenticator(DeclarativeAuthenticator): 57 """ 58 Generates a JSON Web Token (JWT) based on a declarative connector configuration file. The generated token is attached to each request via the Authorization header. 59 60 Attributes: 61 config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec 62 secret_key (Union[InterpolatedString, str]): The secret key used to sign the JWT 63 algorithm (Union[str, JwtAlgorithm]): The algorithm used to sign the JWT 64 token_duration (Optional[int]): The duration in seconds for which the token is valid 65 base64_encode_secret_key (Optional[Union[InterpolatedBoolean, str, bool]]): Whether to base64 encode the secret key 66 header_prefix (Optional[Union[InterpolatedString, str]]): The prefix to add to the Authorization header 67 kid (Optional[Union[InterpolatedString, str]]): The key identifier to be included in the JWT header 68 typ (Optional[Union[InterpolatedString, str]]): The type of the JWT. 69 cty (Optional[Union[InterpolatedString, str]]): The content type of the JWT. 70 iss (Optional[Union[InterpolatedString, str]]): The issuer of the JWT. 71 sub (Optional[Union[InterpolatedString, str]]): The subject of the JWT. 72 aud (Optional[Union[InterpolatedString, str]]): The audience of the JWT. 73 additional_jwt_headers (Optional[Mapping[str, Any]]): Additional headers to include in the JWT. 74 additional_jwt_payload (Optional[Mapping[str, Any]]): Additional payload to include in the JWT. 75 """ 76 77 config: Mapping[str, Any] 78 parameters: InitVar[Mapping[str, Any]] 79 secret_key: Union[InterpolatedString, str] 80 algorithm: Union[str, JwtAlgorithm] 81 token_duration: Optional[int] 82 base64_encode_secret_key: Optional[Union[InterpolatedBoolean, str, bool]] = False 83 header_prefix: Optional[Union[InterpolatedString, str]] = None 84 kid: Optional[Union[InterpolatedString, str]] = None 85 typ: Optional[Union[InterpolatedString, str]] = None 86 cty: Optional[Union[InterpolatedString, str]] = None 87 iss: Optional[Union[InterpolatedString, str]] = None 88 sub: Optional[Union[InterpolatedString, str]] = None 89 aud: Optional[Union[InterpolatedString, str]] = None 90 additional_jwt_headers: Optional[Mapping[str, Any]] = None 91 additional_jwt_payload: Optional[Mapping[str, Any]] = None 92 passphrase: Optional[Union[InterpolatedString, str]] = None 93 request_option: Optional[RequestOption] = None 94 95 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 96 self._secret_key = InterpolatedString.create(self.secret_key, parameters=parameters) 97 self._algorithm = ( 98 JwtAlgorithm(self.algorithm) if isinstance(self.algorithm, str) else self.algorithm 99 ) 100 self._base64_encode_secret_key = ( 101 InterpolatedBoolean(self.base64_encode_secret_key, parameters=parameters) 102 if isinstance(self.base64_encode_secret_key, str) 103 else self.base64_encode_secret_key 104 ) 105 self._token_duration = self.token_duration 106 self._header_prefix = ( 107 InterpolatedString.create(self.header_prefix, parameters=parameters) 108 if self.header_prefix 109 else None 110 ) 111 self._kid = InterpolatedString.create(self.kid, parameters=parameters) if self.kid else None 112 self._typ = InterpolatedString.create(self.typ, parameters=parameters) if self.typ else None 113 self._cty = InterpolatedString.create(self.cty, parameters=parameters) if self.cty else None 114 self._iss = InterpolatedString.create(self.iss, parameters=parameters) if self.iss else None 115 self._sub = InterpolatedString.create(self.sub, parameters=parameters) if self.sub else None 116 self._aud = InterpolatedString.create(self.aud, parameters=parameters) if self.aud else None 117 self._additional_jwt_headers = InterpolatedMapping( 118 self.additional_jwt_headers or {}, parameters=parameters 119 ) 120 self._additional_jwt_payload = InterpolatedMapping( 121 self.additional_jwt_payload or {}, parameters=parameters 122 ) 123 self._passphrase = ( 124 InterpolatedString.create(self.passphrase, parameters=parameters) 125 if self.passphrase 126 else None 127 ) 128 129 # When we first implemented the JWT authenticator, we assumed that the signed token was always supposed 130 # to be loaded into the request headers under the `Authorization` key. This is not always the case, but 131 # this default option allows for backwards compatibility to be retained for existing connectors 132 self._request_option = self.request_option or RequestOption( 133 inject_into=RequestOptionType.header, field_name="Authorization", parameters=parameters 134 ) 135 136 def _get_jwt_headers(self) -> dict[str, Any]: 137 """ 138 Builds and returns the headers used when signing the JWT. 139 """ 140 headers = self._additional_jwt_headers.eval(self.config, json_loads=json.loads) 141 if any(prop in headers for prop in ["kid", "alg", "typ", "cty"]): 142 raise ValueError( 143 "'kid', 'alg', 'typ', 'cty' are reserved headers and should not be set as part of 'additional_jwt_headers'" 144 ) 145 146 if self._kid: 147 headers["kid"] = self._kid.eval(self.config, json_loads=json.loads) 148 if self._typ: 149 headers["typ"] = self._typ.eval(self.config, json_loads=json.loads) 150 if self._cty: 151 headers["cty"] = self._cty.eval(self.config, json_loads=json.loads) 152 headers["alg"] = self._algorithm 153 return headers 154 155 def _get_jwt_payload(self) -> dict[str, Any]: 156 """ 157 Builds and returns the payload used when signing the JWT. 158 """ 159 now = int(datetime.now().timestamp()) 160 exp = now + self._token_duration if isinstance(self._token_duration, int) else now 161 nbf = now 162 163 payload = self._additional_jwt_payload.eval(self.config, json_loads=json.loads) 164 if any(prop in payload for prop in ["iss", "sub", "aud", "iat", "exp", "nbf"]): 165 raise ValueError( 166 "'iss', 'sub', 'aud', 'iat', 'exp', 'nbf' are reserved properties and should not be set as part of 'additional_jwt_payload'" 167 ) 168 169 if self._iss: 170 payload["iss"] = self._iss.eval(self.config, json_loads=json.loads) 171 if self._sub: 172 payload["sub"] = self._sub.eval(self.config, json_loads=json.loads) 173 if self._aud: 174 payload["aud"] = self._aud.eval(self.config, json_loads=json.loads) 175 176 payload["iat"] = now 177 payload["exp"] = exp 178 payload["nbf"] = nbf 179 return payload 180 181 def _get_secret_key(self) -> JwtKeyTypes: 182 """ 183 Returns the secret key used to sign the JWT. 184 """ 185 secret_key: str = self._secret_key.eval(self.config, json_loads=json.loads) 186 187 if self._passphrase: 188 passphrase_value = self._passphrase.eval(self.config, json_loads=json.loads) 189 if passphrase_value: 190 private_key = serialization.load_pem_private_key( 191 secret_key.encode(), 192 password=passphrase_value.encode(), 193 ) 194 return cast(JwtKeyTypes, private_key) 195 196 return ( 197 base64.b64encode(secret_key.encode()).decode() 198 if self._base64_encode_secret_key 199 else secret_key 200 ) 201 202 def _get_signed_token(self) -> Union[str, Any]: 203 """ 204 Signed the JWT using the provided secret key and algorithm and the generated headers and payload. For additional information on PyJWT see: https://pyjwt.readthedocs.io/en/stable/ 205 """ 206 try: 207 return jwt.encode( 208 payload=self._get_jwt_payload(), 209 key=self._get_secret_key(), 210 algorithm=self._algorithm, 211 headers=self._get_jwt_headers(), 212 ) 213 except Exception as e: 214 raise ValueError(f"Failed to sign token: {e}") 215 216 def _get_header_prefix(self) -> Union[str, None]: 217 """ 218 Returns the header prefix to be used when attaching the token to the request. 219 """ 220 return ( 221 self._header_prefix.eval(self.config, json_loads=json.loads) 222 if self._header_prefix 223 else None 224 ) 225 226 @property 227 def auth_header(self) -> str: 228 options = self._get_request_options(RequestOptionType.header) 229 return next(iter(options.keys()), "") 230 231 @property 232 def token(self) -> str: 233 return ( 234 f"{self._get_header_prefix()} {self._get_signed_token()}" 235 if self._get_header_prefix() 236 else self._get_signed_token() 237 ) 238 239 def get_request_params(self) -> Mapping[str, Any]: 240 return self._get_request_options(RequestOptionType.request_parameter) 241 242 def get_request_body_data(self) -> Union[Mapping[str, Any], str]: 243 return self._get_request_options(RequestOptionType.body_data) 244 245 def get_request_body_json(self) -> Mapping[str, Any]: 246 return self._get_request_options(RequestOptionType.body_json) 247 248 def _get_request_options(self, option_type: RequestOptionType) -> Mapping[str, Any]: 249 options: MutableMapping[str, Any] = {} 250 if self._request_option.inject_into == option_type: 251 self._request_option.inject_into_request(options, self.token, self.config) 252 return options
Generates a JSON Web Token (JWT) based on a declarative connector configuration file. The generated token is attached to each request via the Authorization header.
Attributes:
- config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
- secret_key (Union[InterpolatedString, str]): The secret key used to sign the JWT
- algorithm (Union[str, JwtAlgorithm]): The algorithm used to sign the JWT
- token_duration (Optional[int]): The duration in seconds for which the token is valid
- base64_encode_secret_key (Optional[Union[InterpolatedBoolean, str, bool]]): Whether to base64 encode the secret key
- header_prefix (Optional[Union[InterpolatedString, str]]): The prefix to add to the Authorization header
- kid (Optional[Union[InterpolatedString, str]]): The key identifier to be included in the JWT header
- typ (Optional[Union[InterpolatedString, str]]): The type of the JWT.
- cty (Optional[Union[InterpolatedString, str]]): The content type of the JWT.
- iss (Optional[Union[InterpolatedString, str]]): The issuer of the JWT.
- sub (Optional[Union[InterpolatedString, str]]): The subject of the JWT.
- aud (Optional[Union[InterpolatedString, str]]): The audience of the JWT.
- additional_jwt_headers (Optional[Mapping[str, Any]]): Additional headers to include in the JWT.
- additional_jwt_payload (Optional[Mapping[str, Any]]): Additional payload to include in the JWT.
226 @property 227 def auth_header(self) -> str: 228 options = self._get_request_options(RequestOptionType.header) 229 return next(iter(options.keys()), "")
HTTP header to set on the requests
231 @property 232 def token(self) -> str: 233 return ( 234 f"{self._get_header_prefix()} {self._get_signed_token()}" 235 if self._get_header_prefix() 236 else self._get_signed_token() 237 )
The header value to set on outgoing HTTP requests
239 def get_request_params(self) -> Mapping[str, Any]: 240 return self._get_request_options(RequestOptionType.request_parameter)
HTTP request parameter to add to the requests
242 def get_request_body_data(self) -> Union[Mapping[str, Any], str]: 243 return self._get_request_options(RequestOptionType.body_data)
Form-encoded body data to set on the requests