airbyte_cdk.sources.declarative.auth.jwt
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import base64 6import json 7from dataclasses import InitVar, dataclass 8from datetime import datetime 9from typing import Any, Mapping, Optional, Union 10 11import jwt 12 13from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator 14from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean 15from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping 16from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString 17 18 19class JwtAlgorithm(str): 20 """ 21 Enum for supported JWT algorithms 22 """ 23 24 HS256 = "HS256" 25 HS384 = "HS384" 26 HS512 = "HS512" 27 ES256 = "ES256" 28 ES256K = "ES256K" 29 ES384 = "ES384" 30 ES512 = "ES512" 31 RS256 = "RS256" 32 RS384 = "RS384" 33 RS512 = "RS512" 34 PS256 = "PS256" 35 PS384 = "PS384" 36 PS512 = "PS512" 37 EdDSA = "EdDSA" 38 39 40@dataclass 41class JwtAuthenticator(DeclarativeAuthenticator): 42 """ 43 Generates a JSON Web Token (JWT) based on a declarative connector configuration file. The generated token is attached to each request via the Authorization header. 44 45 Attributes: 46 config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec 47 secret_key (Union[InterpolatedString, str]): The secret key used to sign the JWT 48 algorithm (Union[str, JwtAlgorithm]): The algorithm used to sign the JWT 49 token_duration (Optional[int]): The duration in seconds for which the token is valid 50 base64_encode_secret_key (Optional[Union[InterpolatedBoolean, str, bool]]): Whether to base64 encode the secret key 51 header_prefix (Optional[Union[InterpolatedString, str]]): The prefix to add to the Authorization header 52 kid (Optional[Union[InterpolatedString, str]]): The key identifier to be included in the JWT header 53 typ (Optional[Union[InterpolatedString, str]]): The type of the JWT. 54 cty (Optional[Union[InterpolatedString, str]]): The content type of the JWT. 55 iss (Optional[Union[InterpolatedString, str]]): The issuer of the JWT. 56 sub (Optional[Union[InterpolatedString, str]]): The subject of the JWT. 57 aud (Optional[Union[InterpolatedString, str]]): The audience of the JWT. 58 additional_jwt_headers (Optional[Mapping[str, Any]]): Additional headers to include in the JWT. 59 additional_jwt_payload (Optional[Mapping[str, Any]]): Additional payload to include in the JWT. 60 """ 61 62 config: Mapping[str, Any] 63 parameters: InitVar[Mapping[str, Any]] 64 secret_key: Union[InterpolatedString, str] 65 algorithm: Union[str, JwtAlgorithm] 66 token_duration: Optional[int] 67 base64_encode_secret_key: Optional[Union[InterpolatedBoolean, str, bool]] = False 68 header_prefix: Optional[Union[InterpolatedString, str]] = None 69 kid: Optional[Union[InterpolatedString, str]] = None 70 typ: Optional[Union[InterpolatedString, str]] = None 71 cty: Optional[Union[InterpolatedString, str]] = None 72 iss: Optional[Union[InterpolatedString, str]] = None 73 sub: Optional[Union[InterpolatedString, str]] = None 74 aud: Optional[Union[InterpolatedString, str]] = None 75 additional_jwt_headers: Optional[Mapping[str, Any]] = None 76 additional_jwt_payload: Optional[Mapping[str, Any]] = None 77 78 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 79 self._secret_key = InterpolatedString.create(self.secret_key, parameters=parameters) 80 self._algorithm = ( 81 JwtAlgorithm(self.algorithm) if isinstance(self.algorithm, str) else self.algorithm 82 ) 83 self._base64_encode_secret_key = ( 84 InterpolatedBoolean(self.base64_encode_secret_key, parameters=parameters) 85 if isinstance(self.base64_encode_secret_key, str) 86 else self.base64_encode_secret_key 87 ) 88 self._token_duration = self.token_duration 89 self._header_prefix = ( 90 InterpolatedString.create(self.header_prefix, parameters=parameters) 91 if self.header_prefix 92 else None 93 ) 94 self._kid = InterpolatedString.create(self.kid, parameters=parameters) if self.kid else None 95 self._typ = InterpolatedString.create(self.typ, parameters=parameters) if self.typ else None 96 self._cty = InterpolatedString.create(self.cty, parameters=parameters) if self.cty else None 97 self._iss = InterpolatedString.create(self.iss, parameters=parameters) if self.iss else None 98 self._sub = InterpolatedString.create(self.sub, parameters=parameters) if self.sub else None 99 self._aud = InterpolatedString.create(self.aud, parameters=parameters) if self.aud else None 100 self._additional_jwt_headers = InterpolatedMapping( 101 self.additional_jwt_headers or {}, parameters=parameters 102 ) 103 self._additional_jwt_payload = InterpolatedMapping( 104 self.additional_jwt_payload or {}, parameters=parameters 105 ) 106 107 def _get_jwt_headers(self) -> dict[str, Any]: 108 """ 109 Builds and returns the headers used when signing the JWT. 110 """ 111 headers = self._additional_jwt_headers.eval(self.config, json_loads=json.loads) 112 if any(prop in headers for prop in ["kid", "alg", "typ", "cty"]): 113 raise ValueError( 114 "'kid', 'alg', 'typ', 'cty' are reserved headers and should not be set as part of 'additional_jwt_headers'" 115 ) 116 117 if self._kid: 118 headers["kid"] = self._kid.eval(self.config, json_loads=json.loads) 119 if self._typ: 120 headers["typ"] = self._typ.eval(self.config, json_loads=json.loads) 121 if self._cty: 122 headers["cty"] = self._cty.eval(self.config, json_loads=json.loads) 123 headers["alg"] = self._algorithm 124 return headers 125 126 def _get_jwt_payload(self) -> dict[str, Any]: 127 """ 128 Builds and returns the payload used when signing the JWT. 129 """ 130 now = int(datetime.now().timestamp()) 131 exp = now + self._token_duration if isinstance(self._token_duration, int) else now 132 nbf = now 133 134 payload = self._additional_jwt_payload.eval(self.config, json_loads=json.loads) 135 if any(prop in payload for prop in ["iss", "sub", "aud", "iat", "exp", "nbf"]): 136 raise ValueError( 137 "'iss', 'sub', 'aud', 'iat', 'exp', 'nbf' are reserved properties and should not be set as part of 'additional_jwt_payload'" 138 ) 139 140 if self._iss: 141 payload["iss"] = self._iss.eval(self.config, json_loads=json.loads) 142 if self._sub: 143 payload["sub"] = self._sub.eval(self.config, json_loads=json.loads) 144 if self._aud: 145 payload["aud"] = self._aud.eval(self.config, json_loads=json.loads) 146 147 payload["iat"] = now 148 payload["exp"] = exp 149 payload["nbf"] = nbf 150 return payload 151 152 def _get_secret_key(self) -> str: 153 """ 154 Returns the secret key used to sign the JWT. 155 """ 156 secret_key: str = self._secret_key.eval(self.config, json_loads=json.loads) 157 return ( 158 base64.b64encode(secret_key.encode()).decode() 159 if self._base64_encode_secret_key 160 else secret_key 161 ) 162 163 def _get_signed_token(self) -> Union[str, Any]: 164 """ 165 Signed the JWT using the provided secret key and algorithm and the generated headers and payload. For additional information on PyJWT see: https://pyjwt.readthedocs.io/en/stable/ 166 """ 167 try: 168 return jwt.encode( 169 payload=self._get_jwt_payload(), 170 key=self._get_secret_key(), 171 algorithm=self._algorithm, 172 headers=self._get_jwt_headers(), 173 ) 174 except Exception as e: 175 raise ValueError(f"Failed to sign token: {e}") 176 177 def _get_header_prefix(self) -> Union[str, None]: 178 """ 179 Returns the header prefix to be used when attaching the token to the request. 180 """ 181 return ( 182 self._header_prefix.eval(self.config, json_loads=json.loads) 183 if self._header_prefix 184 else None 185 ) 186 187 @property 188 def auth_header(self) -> str: 189 return "Authorization" 190 191 @property 192 def token(self) -> str: 193 return ( 194 f"{self._get_header_prefix()} {self._get_signed_token()}" 195 if self._get_header_prefix() 196 else self._get_signed_token() 197 )
class
JwtAlgorithm(builtins.str):
20class JwtAlgorithm(str): 21 """ 22 Enum for supported JWT algorithms 23 """ 24 25 HS256 = "HS256" 26 HS384 = "HS384" 27 HS512 = "HS512" 28 ES256 = "ES256" 29 ES256K = "ES256K" 30 ES384 = "ES384" 31 ES512 = "ES512" 32 RS256 = "RS256" 33 RS384 = "RS384" 34 RS512 = "RS512" 35 PS256 = "PS256" 36 PS384 = "PS384" 37 PS512 = "PS512" 38 EdDSA = "EdDSA"
Enum for supported JWT algorithms
@dataclass
class
JwtAuthenticator41@dataclass 42class JwtAuthenticator(DeclarativeAuthenticator): 43 """ 44 Generates a JSON Web Token (JWT) based on a declarative connector configuration file. The generated token is attached to each request via the Authorization header. 45 46 Attributes: 47 config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec 48 secret_key (Union[InterpolatedString, str]): The secret key used to sign the JWT 49 algorithm (Union[str, JwtAlgorithm]): The algorithm used to sign the JWT 50 token_duration (Optional[int]): The duration in seconds for which the token is valid 51 base64_encode_secret_key (Optional[Union[InterpolatedBoolean, str, bool]]): Whether to base64 encode the secret key 52 header_prefix (Optional[Union[InterpolatedString, str]]): The prefix to add to the Authorization header 53 kid (Optional[Union[InterpolatedString, str]]): The key identifier to be included in the JWT header 54 typ (Optional[Union[InterpolatedString, str]]): The type of the JWT. 55 cty (Optional[Union[InterpolatedString, str]]): The content type of the JWT. 56 iss (Optional[Union[InterpolatedString, str]]): The issuer of the JWT. 57 sub (Optional[Union[InterpolatedString, str]]): The subject of the JWT. 58 aud (Optional[Union[InterpolatedString, str]]): The audience of the JWT. 59 additional_jwt_headers (Optional[Mapping[str, Any]]): Additional headers to include in the JWT. 60 additional_jwt_payload (Optional[Mapping[str, Any]]): Additional payload to include in the JWT. 61 """ 62 63 config: Mapping[str, Any] 64 parameters: InitVar[Mapping[str, Any]] 65 secret_key: Union[InterpolatedString, str] 66 algorithm: Union[str, JwtAlgorithm] 67 token_duration: Optional[int] 68 base64_encode_secret_key: Optional[Union[InterpolatedBoolean, str, bool]] = False 69 header_prefix: Optional[Union[InterpolatedString, str]] = None 70 kid: Optional[Union[InterpolatedString, str]] = None 71 typ: Optional[Union[InterpolatedString, str]] = None 72 cty: Optional[Union[InterpolatedString, str]] = None 73 iss: Optional[Union[InterpolatedString, str]] = None 74 sub: Optional[Union[InterpolatedString, str]] = None 75 aud: Optional[Union[InterpolatedString, str]] = None 76 additional_jwt_headers: Optional[Mapping[str, Any]] = None 77 additional_jwt_payload: Optional[Mapping[str, Any]] = None 78 79 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 80 self._secret_key = InterpolatedString.create(self.secret_key, parameters=parameters) 81 self._algorithm = ( 82 JwtAlgorithm(self.algorithm) if isinstance(self.algorithm, str) else self.algorithm 83 ) 84 self._base64_encode_secret_key = ( 85 InterpolatedBoolean(self.base64_encode_secret_key, parameters=parameters) 86 if isinstance(self.base64_encode_secret_key, str) 87 else self.base64_encode_secret_key 88 ) 89 self._token_duration = self.token_duration 90 self._header_prefix = ( 91 InterpolatedString.create(self.header_prefix, parameters=parameters) 92 if self.header_prefix 93 else None 94 ) 95 self._kid = InterpolatedString.create(self.kid, parameters=parameters) if self.kid else None 96 self._typ = InterpolatedString.create(self.typ, parameters=parameters) if self.typ else None 97 self._cty = InterpolatedString.create(self.cty, parameters=parameters) if self.cty else None 98 self._iss = InterpolatedString.create(self.iss, parameters=parameters) if self.iss else None 99 self._sub = InterpolatedString.create(self.sub, parameters=parameters) if self.sub else None 100 self._aud = InterpolatedString.create(self.aud, parameters=parameters) if self.aud else None 101 self._additional_jwt_headers = InterpolatedMapping( 102 self.additional_jwt_headers or {}, parameters=parameters 103 ) 104 self._additional_jwt_payload = InterpolatedMapping( 105 self.additional_jwt_payload or {}, parameters=parameters 106 ) 107 108 def _get_jwt_headers(self) -> dict[str, Any]: 109 """ 110 Builds and returns the headers used when signing the JWT. 111 """ 112 headers = self._additional_jwt_headers.eval(self.config, json_loads=json.loads) 113 if any(prop in headers for prop in ["kid", "alg", "typ", "cty"]): 114 raise ValueError( 115 "'kid', 'alg', 'typ', 'cty' are reserved headers and should not be set as part of 'additional_jwt_headers'" 116 ) 117 118 if self._kid: 119 headers["kid"] = self._kid.eval(self.config, json_loads=json.loads) 120 if self._typ: 121 headers["typ"] = self._typ.eval(self.config, json_loads=json.loads) 122 if self._cty: 123 headers["cty"] = self._cty.eval(self.config, json_loads=json.loads) 124 headers["alg"] = self._algorithm 125 return headers 126 127 def _get_jwt_payload(self) -> dict[str, Any]: 128 """ 129 Builds and returns the payload used when signing the JWT. 130 """ 131 now = int(datetime.now().timestamp()) 132 exp = now + self._token_duration if isinstance(self._token_duration, int) else now 133 nbf = now 134 135 payload = self._additional_jwt_payload.eval(self.config, json_loads=json.loads) 136 if any(prop in payload for prop in ["iss", "sub", "aud", "iat", "exp", "nbf"]): 137 raise ValueError( 138 "'iss', 'sub', 'aud', 'iat', 'exp', 'nbf' are reserved properties and should not be set as part of 'additional_jwt_payload'" 139 ) 140 141 if self._iss: 142 payload["iss"] = self._iss.eval(self.config, json_loads=json.loads) 143 if self._sub: 144 payload["sub"] = self._sub.eval(self.config, json_loads=json.loads) 145 if self._aud: 146 payload["aud"] = self._aud.eval(self.config, json_loads=json.loads) 147 148 payload["iat"] = now 149 payload["exp"] = exp 150 payload["nbf"] = nbf 151 return payload 152 153 def _get_secret_key(self) -> str: 154 """ 155 Returns the secret key used to sign the JWT. 156 """ 157 secret_key: str = self._secret_key.eval(self.config, json_loads=json.loads) 158 return ( 159 base64.b64encode(secret_key.encode()).decode() 160 if self._base64_encode_secret_key 161 else secret_key 162 ) 163 164 def _get_signed_token(self) -> Union[str, Any]: 165 """ 166 Signed the JWT using the provided secret key and algorithm and the generated headers and payload. For additional information on PyJWT see: https://pyjwt.readthedocs.io/en/stable/ 167 """ 168 try: 169 return jwt.encode( 170 payload=self._get_jwt_payload(), 171 key=self._get_secret_key(), 172 algorithm=self._algorithm, 173 headers=self._get_jwt_headers(), 174 ) 175 except Exception as e: 176 raise ValueError(f"Failed to sign token: {e}") 177 178 def _get_header_prefix(self) -> Union[str, None]: 179 """ 180 Returns the header prefix to be used when attaching the token to the request. 181 """ 182 return ( 183 self._header_prefix.eval(self.config, json_loads=json.loads) 184 if self._header_prefix 185 else None 186 ) 187 188 @property 189 def auth_header(self) -> str: 190 return "Authorization" 191 192 @property 193 def token(self) -> str: 194 return ( 195 f"{self._get_header_prefix()} {self._get_signed_token()}" 196 if self._get_header_prefix() 197 else self._get_signed_token() 198 )
Generates a JSON Web Token (JWT) based on a declarative connector configuration file. The generated token is attached to each request via the Authorization header.
Attributes:
- config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
- secret_key (Union[InterpolatedString, str]): The secret key used to sign the JWT
- algorithm (Union[str, JwtAlgorithm]): The algorithm used to sign the JWT
- token_duration (Optional[int]): The duration in seconds for which the token is valid
- base64_encode_secret_key (Optional[Union[InterpolatedBoolean, str, bool]]): Whether to base64 encode the secret key
- header_prefix (Optional[Union[InterpolatedString, str]]): The prefix to add to the Authorization header
- kid (Optional[Union[InterpolatedString, str]]): The key identifier to be included in the JWT header
- typ (Optional[Union[InterpolatedString, str]]): The type of the JWT.
- cty (Optional[Union[InterpolatedString, str]]): The content type of the JWT.
- iss (Optional[Union[InterpolatedString, str]]): The issuer of the JWT.
- sub (Optional[Union[InterpolatedString, str]]): The subject of the JWT.
- aud (Optional[Union[InterpolatedString, str]]): The audience of the JWT.
- additional_jwt_headers (Optional[Mapping[str, Any]]): Additional headers to include in the JWT.
- additional_jwt_payload (Optional[Mapping[str, Any]]): Additional payload to include in the JWT.
JwtAuthenticator( config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], secret_key: Union[airbyte_cdk.InterpolatedString, str], algorithm: Union[str, JwtAlgorithm], token_duration: Optional[int], base64_encode_secret_key: Union[airbyte_cdk.InterpolatedBoolean, str, bool, NoneType] = False, header_prefix: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, kid: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, typ: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, cty: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, iss: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, sub: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, aud: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, additional_jwt_headers: Optional[Mapping[str, Any]] = None, additional_jwt_payload: Optional[Mapping[str, Any]] = None)
secret_key: Union[airbyte_cdk.InterpolatedString, str]
algorithm: Union[str, JwtAlgorithm]