airbyte_cdk.sources.declarative.auth
27@dataclass 28class DeclarativeOauth2Authenticator(AbstractOauth2Authenticator, DeclarativeAuthenticator): 29 """ 30 Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials based on 31 a declarative connector configuration file. Credentials can be defined explicitly or via interpolation 32 at runtime. The generated access token is attached to each request via the Authorization header. 33 34 Attributes: 35 token_refresh_endpoint (Union[InterpolatedString, str]): The endpoint to refresh the access token 36 client_id (Union[InterpolatedString, str]): The client id 37 client_secret (Union[InterpolatedString, str]): Client secret (can be empty for APIs that support this) 38 refresh_token (Union[InterpolatedString, str]): The token used to refresh the access token 39 access_token_name (Union[InterpolatedString, str]): THe field to extract access token from in the response 40 expires_in_name (Union[InterpolatedString, str]): The field to extract expires_in from in the response 41 config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec 42 scopes (Optional[List[str]]): The scopes to request 43 token_expiry_date (Optional[Union[InterpolatedString, str]]): The access token expiration date 44 token_expiry_date_format str: format of the datetime; provide it if expires_in is returned in datetime instead of seconds 45 token_expiry_is_time_of_expiration bool: set True it if expires_in is returned as time of expiration instead of the number seconds until expiration 46 refresh_request_body (Optional[Mapping[str, Any]]): The request body to send in the refresh request 47 refresh_request_headers (Optional[Mapping[str, Any]]): The request headers to send in the refresh request 48 send_refresh_request_as_query_params (bool): When True, the standard refresh args (`grant_type`, `refresh_token`, client credentials when not in an `Authorization` header, scopes, plus any `refresh_request_body` extras) are sent on the URL query string instead of in the request body, and the body is emitted empty. Use this for OAuth providers like Gong that document their refresh endpoint with refresh args on the URL query string. Defaults to False. 49 grant_type: The grant_type to request for access_token. If set to refresh_token, the refresh_token parameter has to be provided 50 message_repository (MessageRepository): the message repository used to emit logs on HTTP requests 51 refresh_token_error_status_codes (Tuple[int, ...]): Status codes to identify refresh token errors in response 52 refresh_token_error_key (str): Key to identify refresh token error in response 53 refresh_token_error_values (Tuple[str, ...]): List of values to check for exception during token refresh process 54 """ 55 56 config: Mapping[str, Any] 57 parameters: InitVar[Mapping[str, Any]] 58 client_id: Optional[Union[InterpolatedString, str]] = None 59 client_secret: Optional[Union[InterpolatedString, str]] = None 60 token_refresh_endpoint: Optional[Union[InterpolatedString, str]] = None 61 refresh_token: Optional[Union[InterpolatedString, str]] = None 62 scopes: Optional[List[str]] = None 63 token_expiry_date: Optional[Union[InterpolatedString, str]] = None 64 _token_expiry_date: Optional[AirbyteDateTime] = field(init=False, repr=False, default=None) 65 token_expiry_date_format: Optional[str] = None 66 token_expiry_is_time_of_expiration: bool = False 67 access_token_name: Union[InterpolatedString, str] = "access_token" 68 access_token_value: Optional[Union[InterpolatedString, str]] = None 69 client_id_name: Union[InterpolatedString, str] = "client_id" 70 client_secret_name: Union[InterpolatedString, str] = "client_secret" 71 expires_in_name: Union[InterpolatedString, str] = "expires_in" 72 refresh_token_name: Union[InterpolatedString, str] = "refresh_token" 73 refresh_request_body: Optional[Mapping[str, Any]] = None 74 refresh_request_headers: Optional[Mapping[str, Any]] = None 75 send_refresh_request_as_query_params: bool = False 76 grant_type_name: Union[InterpolatedString, str] = "grant_type" 77 grant_type: Union[InterpolatedString, str] = "refresh_token" 78 message_repository: MessageRepository = NoopMessageRepository() 79 profile_assertion: Optional[DeclarativeAuthenticator] = None 80 use_profile_assertion: Optional[Union[InterpolatedBoolean, str, bool]] = False 81 refresh_token_error_status_codes: Tuple[int, ...] = () 82 refresh_token_error_key: str = "" 83 refresh_token_error_values: Tuple[str, ...] = () 84 85 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 86 super().__init__( 87 refresh_token_error_status_codes=self.refresh_token_error_status_codes, 88 refresh_token_error_key=self.refresh_token_error_key, 89 refresh_token_error_values=self.refresh_token_error_values, 90 ) 91 if self.token_refresh_endpoint is not None: 92 self._token_refresh_endpoint: Optional[InterpolatedString] = InterpolatedString.create( 93 self.token_refresh_endpoint, parameters=parameters 94 ) 95 else: 96 self._token_refresh_endpoint = None 97 self._client_id_name = InterpolatedString.create(self.client_id_name, parameters=parameters) 98 self._client_id = ( 99 InterpolatedString.create(self.client_id, parameters=parameters) 100 if self.client_id 101 else self.client_id 102 ) 103 self._client_secret_name = InterpolatedString.create( 104 self.client_secret_name, parameters=parameters 105 ) 106 self._client_secret = ( 107 InterpolatedString.create(self.client_secret, parameters=parameters) 108 if self.client_secret 109 else self.client_secret 110 ) 111 self._refresh_token_name = InterpolatedString.create( 112 self.refresh_token_name, parameters=parameters 113 ) 114 if self.refresh_token is not None: 115 self._refresh_token: Optional[InterpolatedString] = InterpolatedString.create( 116 self.refresh_token, parameters=parameters 117 ) 118 else: 119 self._refresh_token = None 120 self.access_token_name = InterpolatedString.create( 121 self.access_token_name, parameters=parameters 122 ) 123 self.expires_in_name = InterpolatedString.create( 124 self.expires_in_name, parameters=parameters 125 ) 126 self.grant_type_name = InterpolatedString.create( 127 self.grant_type_name, parameters=parameters 128 ) 129 self.grant_type = InterpolatedString.create( 130 "urn:ietf:params:oauth:grant-type:jwt-bearer" 131 if self.use_profile_assertion 132 else self.grant_type, 133 parameters=parameters, 134 ) 135 self._refresh_request_body = InterpolatedMapping( 136 self.refresh_request_body or {}, parameters=parameters 137 ) 138 self._refresh_request_headers = InterpolatedMapping( 139 self.refresh_request_headers or {}, parameters=parameters 140 ) 141 self._send_refresh_request_as_query_params = self.send_refresh_request_as_query_params 142 try: 143 if ( 144 isinstance(self.token_expiry_date, (int, str)) 145 and str(self.token_expiry_date).isdigit() 146 ): 147 self._token_expiry_date = ab_datetime_parse(self.token_expiry_date) 148 else: 149 self._token_expiry_date = ( 150 ab_datetime_parse( 151 InterpolatedString.create( 152 self.token_expiry_date, parameters=parameters 153 ).eval(self.config) 154 ) 155 if self.token_expiry_date 156 else ab_datetime_now() - timedelta(days=1) 157 ) 158 except ValueError as e: 159 raise ValueError(f"Invalid token expiry date format: {e}") 160 self.use_profile_assertion = ( 161 InterpolatedBoolean(self.use_profile_assertion, parameters=parameters) 162 if isinstance(self.use_profile_assertion, str) 163 else self.use_profile_assertion 164 ) 165 self.assertion_name = "assertion" 166 167 if self.access_token_value is not None: 168 self._access_token_value = InterpolatedString.create( 169 self.access_token_value, parameters=parameters 170 ).eval(self.config) 171 else: 172 self._access_token_value = None 173 174 self._access_token: Optional[str] = ( 175 self._access_token_value if self.access_token_value else None 176 ) 177 178 if not self.use_profile_assertion and any( 179 client_creds is None for client_creds in [self.client_id, self.client_secret] 180 ): 181 raise ValueError( 182 "OAuthAuthenticator configuration error: Both 'client_id' and 'client_secret' are required for the " 183 "basic OAuth flow." 184 ) 185 if self.profile_assertion is None and self.use_profile_assertion: 186 raise ValueError( 187 "OAuthAuthenticator configuration error: 'profile_assertion' is required when using the profile assertion flow." 188 ) 189 if self.get_grant_type() == "refresh_token" and self._refresh_token is None: 190 raise ValueError( 191 "OAuthAuthenticator configuration error: A 'refresh_token' is required when the 'grant_type' is set to 'refresh_token'." 192 ) 193 194 def get_token_refresh_endpoint(self) -> Optional[str]: 195 if self._token_refresh_endpoint is not None: 196 refresh_token_endpoint: str = self._token_refresh_endpoint.eval(self.config) 197 if not refresh_token_endpoint: 198 raise ValueError( 199 "OAuthAuthenticator was unable to evaluate token_refresh_endpoint parameter" 200 ) 201 return refresh_token_endpoint 202 return None 203 204 def get_client_id_name(self) -> str: 205 return self._client_id_name.eval(self.config) # type: ignore # eval returns a string in this context 206 207 def get_client_id(self) -> str: 208 client_id = self._client_id.eval(self.config) if self._client_id else self._client_id 209 if not client_id: 210 raise ValueError("OAuthAuthenticator was unable to evaluate client_id parameter") 211 return client_id # type: ignore # value will be returned as a string, or an error will be raised 212 213 def get_client_secret_name(self) -> str: 214 return self._client_secret_name.eval(self.config) # type: ignore # eval returns a string in this context 215 216 def get_client_secret(self) -> str: 217 client_secret = ( 218 self._client_secret.eval(self.config) if self._client_secret else self._client_secret 219 ) 220 if not client_secret: 221 # We've seen some APIs allowing empty client_secret so we will only log here 222 logger.warning( 223 "OAuthAuthenticator was unable to evaluate client_secret parameter hence it'll be empty" 224 ) 225 return client_secret # type: ignore # value will be returned as a string, which might be empty 226 227 def get_refresh_token_name(self) -> str: 228 return self._refresh_token_name.eval(self.config) # type: ignore # eval returns a string in this context 229 230 def get_refresh_token(self) -> Optional[str]: 231 return None if self._refresh_token is None else str(self._refresh_token.eval(self.config)) 232 233 def get_scopes(self) -> List[str]: 234 return self.scopes or [] 235 236 def get_access_token_name(self) -> str: 237 return self.access_token_name.eval(self.config) # type: ignore # eval returns a string in this context 238 239 def get_expires_in_name(self) -> str: 240 return self.expires_in_name.eval(self.config) # type: ignore # eval returns a string in this context 241 242 def get_grant_type_name(self) -> str: 243 return self.grant_type_name.eval(self.config) # type: ignore # eval returns a string in this context 244 245 def get_grant_type(self) -> str: 246 return self.grant_type.eval(self.config) # type: ignore # eval returns a string in this context 247 248 def get_refresh_request_body(self) -> Mapping[str, Any]: 249 return self._refresh_request_body.eval(self.config) 250 251 def get_refresh_request_headers(self) -> Mapping[str, Any]: 252 return self._refresh_request_headers.eval(self.config) 253 254 def should_send_refresh_request_as_query_params(self) -> bool: 255 return self._send_refresh_request_as_query_params 256 257 def get_token_expiry_date(self) -> AirbyteDateTime: 258 if not self._has_access_token_been_initialized(): 259 return AirbyteDateTime.from_datetime(datetime.min) 260 return self._token_expiry_date # type: ignore # _token_expiry_date is an AirbyteDateTime. It is never None despite what mypy thinks 261 262 def _has_access_token_been_initialized(self) -> bool: 263 return self._access_token is not None 264 265 def set_token_expiry_date(self, value: AirbyteDateTime) -> None: 266 self._token_expiry_date = value 267 268 def get_assertion_name(self) -> str: 269 return self.assertion_name 270 271 def get_assertion(self) -> str: 272 if self.profile_assertion is None: 273 raise ValueError("profile_assertion is not set") 274 return self.profile_assertion.token 275 276 def build_refresh_request_body(self) -> Mapping[str, Any]: 277 """ 278 Returns the request body to set on the refresh request 279 280 Override to define additional parameters 281 """ 282 if self.use_profile_assertion: 283 return { 284 self.get_grant_type_name(): self.get_grant_type(), 285 self.get_assertion_name(): self.get_assertion(), 286 } 287 return super().build_refresh_request_body() 288 289 @property 290 def access_token(self) -> str: 291 if self._access_token is None: 292 raise ValueError("access_token is not set") 293 return self._access_token 294 295 @access_token.setter 296 def access_token(self, value: str) -> None: 297 self._access_token = value 298 299 @property 300 def _message_repository(self) -> MessageRepository: 301 """ 302 Overriding AbstractOauth2Authenticator._message_repository to allow for HTTP request logs 303 """ 304 return self.message_repository
Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials based on a declarative connector configuration file. Credentials can be defined explicitly or via interpolation at runtime. The generated access token is attached to each request via the Authorization header.
Attributes:
- token_refresh_endpoint (Union[InterpolatedString, str]): The endpoint to refresh the access token
- client_id (Union[InterpolatedString, str]): The client id
- client_secret (Union[InterpolatedString, str]): Client secret (can be empty for APIs that support this)
- refresh_token (Union[InterpolatedString, str]): The token used to refresh the access token
- access_token_name (Union[InterpolatedString, str]): THe field to extract access token from in the response
- expires_in_name (Union[InterpolatedString, str]): The field to extract expires_in from in the response
- config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
- scopes (Optional[List[str]]): The scopes to request
- token_expiry_date (Optional[Union[InterpolatedString, str]]): The access token expiration date
- token_expiry_date_format str: format of the datetime; provide it if expires_in is returned in datetime instead of seconds
- token_expiry_is_time_of_expiration bool: set True it if expires_in is returned as time of expiration instead of the number seconds until expiration
- refresh_request_body (Optional[Mapping[str, Any]]): The request body to send in the refresh request
- refresh_request_headers (Optional[Mapping[str, Any]]): The request headers to send in the refresh request
- send_refresh_request_as_query_params (bool): When True, the standard refresh args (
grant_type,refresh_token, client credentials when not in anAuthorizationheader, scopes, plus anyrefresh_request_bodyextras) are sent on the URL query string instead of in the request body, and the body is emitted empty. Use this for OAuth providers like Gong that document their refresh endpoint with refresh args on the URL query string. Defaults to False. - grant_type: The grant_type to request for access_token. If set to refresh_token, the refresh_token parameter has to be provided
- message_repository (MessageRepository): the message repository used to emit logs on HTTP requests
- refresh_token_error_status_codes (Tuple[int, ...]): Status codes to identify refresh token errors in response
- refresh_token_error_key (str): Key to identify refresh token error in response
- refresh_token_error_values (Tuple[str, ...]): List of values to check for exception during token refresh process
If all of refresh_token_error_status_codes, refresh_token_error_key, and refresh_token_error_values are set, then http errors with such params will be wrapped in AirbyteTracedException.
Format of the datetime; exists it if expires_in is returned as the expiration datetime instead of seconds until it expires
Indicates that the Token Expiry returns the date until which the token will be valid, not the amount of time it will be valid.
194 def get_token_refresh_endpoint(self) -> Optional[str]: 195 if self._token_refresh_endpoint is not None: 196 refresh_token_endpoint: str = self._token_refresh_endpoint.eval(self.config) 197 if not refresh_token_endpoint: 198 raise ValueError( 199 "OAuthAuthenticator was unable to evaluate token_refresh_endpoint parameter" 200 ) 201 return refresh_token_endpoint 202 return None
Returns the endpoint to refresh the access token
204 def get_client_id_name(self) -> str: 205 return self._client_id_name.eval(self.config) # type: ignore # eval returns a string in this context
The client id name to authenticate
207 def get_client_id(self) -> str: 208 client_id = self._client_id.eval(self.config) if self._client_id else self._client_id 209 if not client_id: 210 raise ValueError("OAuthAuthenticator was unable to evaluate client_id parameter") 211 return client_id # type: ignore # value will be returned as a string, or an error will be raised
The client id to authenticate
213 def get_client_secret_name(self) -> str: 214 return self._client_secret_name.eval(self.config) # type: ignore # eval returns a string in this context
The client secret name to authenticate
216 def get_client_secret(self) -> str: 217 client_secret = ( 218 self._client_secret.eval(self.config) if self._client_secret else self._client_secret 219 ) 220 if not client_secret: 221 # We've seen some APIs allowing empty client_secret so we will only log here 222 logger.warning( 223 "OAuthAuthenticator was unable to evaluate client_secret parameter hence it'll be empty" 224 ) 225 return client_secret # type: ignore # value will be returned as a string, which might be empty
The client secret to authenticate
227 def get_refresh_token_name(self) -> str: 228 return self._refresh_token_name.eval(self.config) # type: ignore # eval returns a string in this context
The refresh token name to authenticate
230 def get_refresh_token(self) -> Optional[str]: 231 return None if self._refresh_token is None else str(self._refresh_token.eval(self.config))
The token used to refresh the access token when it expires
236 def get_access_token_name(self) -> str: 237 return self.access_token_name.eval(self.config) # type: ignore # eval returns a string in this context
Field to extract access token from in the response
239 def get_expires_in_name(self) -> str: 240 return self.expires_in_name.eval(self.config) # type: ignore # eval returns a string in this context
Returns the expires_in field name
242 def get_grant_type_name(self) -> str: 243 return self.grant_type_name.eval(self.config) # type: ignore # eval returns a string in this context
Returns grant_type specified name for requesting access_token
245 def get_grant_type(self) -> str: 246 return self.grant_type.eval(self.config) # type: ignore # eval returns a string in this context
Returns grant_type specified for requesting access_token
248 def get_refresh_request_body(self) -> Mapping[str, Any]: 249 return self._refresh_request_body.eval(self.config)
Returns the request body to set on the refresh request
251 def get_refresh_request_headers(self) -> Mapping[str, Any]: 252 return self._refresh_request_headers.eval(self.config)
Returns the request headers to set on the refresh request
254 def should_send_refresh_request_as_query_params(self) -> bool: 255 return self._send_refresh_request_as_query_params
Returns True if the standard refresh args should be sent on the URL
query string instead of in the request body.
Defaults to False so existing authenticators retain their previous
behavior (params in body, no query params on the refresh URL). Subclasses
can override this to opt into the URL-query-string shape required by OAuth
providers like Gong.
257 def get_token_expiry_date(self) -> AirbyteDateTime: 258 if not self._has_access_token_been_initialized(): 259 return AirbyteDateTime.from_datetime(datetime.min) 260 return self._token_expiry_date # type: ignore # _token_expiry_date is an AirbyteDateTime. It is never None despite what mypy thinks
Expiration date of the access token
265 def set_token_expiry_date(self, value: AirbyteDateTime) -> None: 266 self._token_expiry_date = value
Setter for access token expiration date
276 def build_refresh_request_body(self) -> Mapping[str, Any]: 277 """ 278 Returns the request body to set on the refresh request 279 280 Override to define additional parameters 281 """ 282 if self.use_profile_assertion: 283 return { 284 self.get_grant_type_name(): self.get_grant_type(), 285 self.get_assertion_name(): self.get_assertion(), 286 } 287 return super().build_refresh_request_body()
Returns the request body to set on the refresh request
Override to define additional parameters
289 @property 290 def access_token(self) -> str: 291 if self._access_token is None: 292 raise ValueError("access_token is not set") 293 return self._access_token
Returns the access token
Inherited Members
55@dataclass 56class JwtAuthenticator(DeclarativeAuthenticator): 57 """ 58 Generates a JSON Web Token (JWT) based on a declarative connector configuration file. The generated token is attached to each request via the Authorization header. 59 60 Attributes: 61 config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec 62 secret_key (Union[InterpolatedString, str]): The secret key used to sign the JWT 63 algorithm (Union[str, JwtAlgorithm]): The algorithm used to sign the JWT 64 token_duration (Optional[int]): The duration in seconds for which the token is valid 65 base64_encode_secret_key (Optional[Union[InterpolatedBoolean, str, bool]]): Whether to base64 encode the secret key 66 header_prefix (Optional[Union[InterpolatedString, str]]): The prefix to add to the Authorization header 67 kid (Optional[Union[InterpolatedString, str]]): The key identifier to be included in the JWT header 68 typ (Optional[Union[InterpolatedString, str]]): The type of the JWT. 69 cty (Optional[Union[InterpolatedString, str]]): The content type of the JWT. 70 iss (Optional[Union[InterpolatedString, str]]): The issuer of the JWT. 71 sub (Optional[Union[InterpolatedString, str]]): The subject of the JWT. 72 aud (Optional[Union[InterpolatedString, str]]): The audience of the JWT. 73 additional_jwt_headers (Optional[Mapping[str, Any]]): Additional headers to include in the JWT. 74 additional_jwt_payload (Optional[Mapping[str, Any]]): Additional payload to include in the JWT. 75 """ 76 77 config: Mapping[str, Any] 78 parameters: InitVar[Mapping[str, Any]] 79 secret_key: Union[InterpolatedString, str] 80 algorithm: Union[str, JwtAlgorithm] 81 token_duration: Optional[int] 82 base64_encode_secret_key: Optional[Union[InterpolatedBoolean, str, bool]] = False 83 header_prefix: Optional[Union[InterpolatedString, str]] = None 84 kid: Optional[Union[InterpolatedString, str]] = None 85 typ: Optional[Union[InterpolatedString, str]] = None 86 cty: Optional[Union[InterpolatedString, str]] = None 87 iss: Optional[Union[InterpolatedString, str]] = None 88 sub: Optional[Union[InterpolatedString, str]] = None 89 aud: Optional[Union[InterpolatedString, str]] = None 90 additional_jwt_headers: Optional[Mapping[str, Any]] = None 91 additional_jwt_payload: Optional[Mapping[str, Any]] = None 92 passphrase: Optional[Union[InterpolatedString, str]] = None 93 request_option: Optional[RequestOption] = None 94 95 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 96 self._secret_key = InterpolatedString.create(self.secret_key, parameters=parameters) 97 self._algorithm = ( 98 JwtAlgorithm(self.algorithm) if isinstance(self.algorithm, str) else self.algorithm 99 ) 100 self._base64_encode_secret_key = ( 101 InterpolatedBoolean(self.base64_encode_secret_key, parameters=parameters) 102 if isinstance(self.base64_encode_secret_key, str) 103 else self.base64_encode_secret_key 104 ) 105 self._token_duration = self.token_duration 106 self._header_prefix = ( 107 InterpolatedString.create(self.header_prefix, parameters=parameters) 108 if self.header_prefix 109 else None 110 ) 111 self._kid = InterpolatedString.create(self.kid, parameters=parameters) if self.kid else None 112 self._typ = InterpolatedString.create(self.typ, parameters=parameters) if self.typ else None 113 self._cty = InterpolatedString.create(self.cty, parameters=parameters) if self.cty else None 114 self._iss = InterpolatedString.create(self.iss, parameters=parameters) if self.iss else None 115 self._sub = InterpolatedString.create(self.sub, parameters=parameters) if self.sub else None 116 self._aud = InterpolatedString.create(self.aud, parameters=parameters) if self.aud else None 117 self._additional_jwt_headers = InterpolatedMapping( 118 self.additional_jwt_headers or {}, parameters=parameters 119 ) 120 self._additional_jwt_payload = InterpolatedMapping( 121 self.additional_jwt_payload or {}, parameters=parameters 122 ) 123 self._passphrase = ( 124 InterpolatedString.create(self.passphrase, parameters=parameters) 125 if self.passphrase 126 else None 127 ) 128 129 # When we first implemented the JWT authenticator, we assumed that the signed token was always supposed 130 # to be loaded into the request headers under the `Authorization` key. This is not always the case, but 131 # this default option allows for backwards compatibility to be retained for existing connectors 132 self._request_option = self.request_option or RequestOption( 133 inject_into=RequestOptionType.header, field_name="Authorization", parameters=parameters 134 ) 135 136 def _get_jwt_headers(self) -> dict[str, Any]: 137 """ 138 Builds and returns the headers used when signing the JWT. 139 """ 140 headers = self._additional_jwt_headers.eval(self.config, json_loads=json.loads) 141 if any(prop in headers for prop in ["kid", "alg", "typ", "cty"]): 142 raise ValueError( 143 "'kid', 'alg', 'typ', 'cty' are reserved headers and should not be set as part of 'additional_jwt_headers'" 144 ) 145 146 if self._kid: 147 headers["kid"] = self._kid.eval(self.config, json_loads=json.loads) 148 if self._typ: 149 headers["typ"] = self._typ.eval(self.config, json_loads=json.loads) 150 if self._cty: 151 headers["cty"] = self._cty.eval(self.config, json_loads=json.loads) 152 headers["alg"] = self._algorithm 153 return headers 154 155 def _get_jwt_payload(self) -> dict[str, Any]: 156 """ 157 Builds and returns the payload used when signing the JWT. 158 """ 159 now = int(datetime.now().timestamp()) 160 exp = now + self._token_duration if isinstance(self._token_duration, int) else now 161 nbf = now 162 163 payload = self._additional_jwt_payload.eval(self.config, json_loads=json.loads) 164 if any(prop in payload for prop in ["iss", "sub", "aud", "iat", "exp", "nbf"]): 165 raise ValueError( 166 "'iss', 'sub', 'aud', 'iat', 'exp', 'nbf' are reserved properties and should not be set as part of 'additional_jwt_payload'" 167 ) 168 169 if self._iss: 170 payload["iss"] = self._iss.eval(self.config, json_loads=json.loads) 171 if self._sub: 172 payload["sub"] = self._sub.eval(self.config, json_loads=json.loads) 173 if self._aud: 174 payload["aud"] = self._aud.eval(self.config, json_loads=json.loads) 175 176 payload["iat"] = now 177 payload["exp"] = exp 178 payload["nbf"] = nbf 179 return payload 180 181 def _get_secret_key(self) -> JwtKeyTypes: 182 """ 183 Returns the secret key used to sign the JWT. 184 """ 185 secret_key: str = self._secret_key.eval(self.config, json_loads=json.loads) 186 187 if self._passphrase: 188 passphrase_value = self._passphrase.eval(self.config, json_loads=json.loads) 189 if passphrase_value: 190 private_key = serialization.load_pem_private_key( 191 secret_key.encode(), 192 password=passphrase_value.encode(), 193 ) 194 return cast(JwtKeyTypes, private_key) 195 196 return ( 197 base64.b64encode(secret_key.encode()).decode() 198 if self._base64_encode_secret_key 199 else secret_key 200 ) 201 202 def _get_signed_token(self) -> Union[str, Any]: 203 """ 204 Signed the JWT using the provided secret key and algorithm and the generated headers and payload. For additional information on PyJWT see: https://pyjwt.readthedocs.io/en/stable/ 205 """ 206 try: 207 return jwt.encode( 208 payload=self._get_jwt_payload(), 209 key=self._get_secret_key(), 210 algorithm=self._algorithm, 211 headers=self._get_jwt_headers(), 212 ) 213 except Exception as e: 214 raise ValueError(f"Failed to sign token: {e}") 215 216 def _get_header_prefix(self) -> Union[str, None]: 217 """ 218 Returns the header prefix to be used when attaching the token to the request. 219 """ 220 return ( 221 self._header_prefix.eval(self.config, json_loads=json.loads) 222 if self._header_prefix 223 else None 224 ) 225 226 @property 227 def auth_header(self) -> str: 228 options = self._get_request_options(RequestOptionType.header) 229 return next(iter(options.keys()), "") 230 231 @property 232 def token(self) -> str: 233 return ( 234 f"{self._get_header_prefix()} {self._get_signed_token()}" 235 if self._get_header_prefix() 236 else self._get_signed_token() 237 ) 238 239 def get_request_params(self) -> Mapping[str, Any]: 240 return self._get_request_options(RequestOptionType.request_parameter) 241 242 def get_request_body_data(self) -> Union[Mapping[str, Any], str]: 243 return self._get_request_options(RequestOptionType.body_data) 244 245 def get_request_body_json(self) -> Mapping[str, Any]: 246 return self._get_request_options(RequestOptionType.body_json) 247 248 def _get_request_options(self, option_type: RequestOptionType) -> Mapping[str, Any]: 249 options: MutableMapping[str, Any] = {} 250 if self._request_option.inject_into == option_type: 251 self._request_option.inject_into_request(options, self.token, self.config) 252 return options
Generates a JSON Web Token (JWT) based on a declarative connector configuration file. The generated token is attached to each request via the Authorization header.
Attributes:
- config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
- secret_key (Union[InterpolatedString, str]): The secret key used to sign the JWT
- algorithm (Union[str, JwtAlgorithm]): The algorithm used to sign the JWT
- token_duration (Optional[int]): The duration in seconds for which the token is valid
- base64_encode_secret_key (Optional[Union[InterpolatedBoolean, str, bool]]): Whether to base64 encode the secret key
- header_prefix (Optional[Union[InterpolatedString, str]]): The prefix to add to the Authorization header
- kid (Optional[Union[InterpolatedString, str]]): The key identifier to be included in the JWT header
- typ (Optional[Union[InterpolatedString, str]]): The type of the JWT.
- cty (Optional[Union[InterpolatedString, str]]): The content type of the JWT.
- iss (Optional[Union[InterpolatedString, str]]): The issuer of the JWT.
- sub (Optional[Union[InterpolatedString, str]]): The subject of the JWT.
- aud (Optional[Union[InterpolatedString, str]]): The audience of the JWT.
- additional_jwt_headers (Optional[Mapping[str, Any]]): Additional headers to include in the JWT.
- additional_jwt_payload (Optional[Mapping[str, Any]]): Additional payload to include in the JWT.
226 @property 227 def auth_header(self) -> str: 228 options = self._get_request_options(RequestOptionType.header) 229 return next(iter(options.keys()), "")
HTTP header to set on the requests
231 @property 232 def token(self) -> str: 233 return ( 234 f"{self._get_header_prefix()} {self._get_signed_token()}" 235 if self._get_header_prefix() 236 else self._get_signed_token() 237 )
The header value to set on outgoing HTTP requests
239 def get_request_params(self) -> Mapping[str, Any]: 240 return self._get_request_options(RequestOptionType.request_parameter)
HTTP request parameter to add to the requests
242 def get_request_body_data(self) -> Union[Mapping[str, Any], str]: 243 return self._get_request_options(RequestOptionType.body_data)
Form-encoded body data to set on the requests