airbyte_cdk.sources.declarative.auth.jwt

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5import base64
  6import json
  7from dataclasses import InitVar, dataclass
  8from datetime import datetime
  9from typing import Any, Mapping, Optional, Union
 10
 11import jwt
 12
 13from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator
 14from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
 15from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
 16from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
 17
 18
 19class JwtAlgorithm(str):
 20    """
 21    Enum for supported JWT algorithms
 22    """
 23
 24    HS256 = "HS256"
 25    HS384 = "HS384"
 26    HS512 = "HS512"
 27    ES256 = "ES256"
 28    ES256K = "ES256K"
 29    ES384 = "ES384"
 30    ES512 = "ES512"
 31    RS256 = "RS256"
 32    RS384 = "RS384"
 33    RS512 = "RS512"
 34    PS256 = "PS256"
 35    PS384 = "PS384"
 36    PS512 = "PS512"
 37    EdDSA = "EdDSA"
 38
 39
 40@dataclass
 41class JwtAuthenticator(DeclarativeAuthenticator):
 42    """
 43    Generates a JSON Web Token (JWT) based on a declarative connector configuration file. The generated token is attached to each request via the Authorization header.
 44
 45    Attributes:
 46        config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
 47        secret_key (Union[InterpolatedString, str]): The secret key used to sign the JWT
 48        algorithm (Union[str, JwtAlgorithm]): The algorithm used to sign the JWT
 49        token_duration (Optional[int]): The duration in seconds for which the token is valid
 50        base64_encode_secret_key (Optional[Union[InterpolatedBoolean, str, bool]]): Whether to base64 encode the secret key
 51        header_prefix (Optional[Union[InterpolatedString, str]]): The prefix to add to the Authorization header
 52        kid (Optional[Union[InterpolatedString, str]]): The key identifier to be included in the JWT header
 53        typ (Optional[Union[InterpolatedString, str]]): The type of the JWT.
 54        cty (Optional[Union[InterpolatedString, str]]): The content type of the JWT.
 55        iss (Optional[Union[InterpolatedString, str]]): The issuer of the JWT.
 56        sub (Optional[Union[InterpolatedString, str]]): The subject of the JWT.
 57        aud (Optional[Union[InterpolatedString, str]]): The audience of the JWT.
 58        additional_jwt_headers (Optional[Mapping[str, Any]]): Additional headers to include in the JWT.
 59        additional_jwt_payload (Optional[Mapping[str, Any]]): Additional payload to include in the JWT.
 60    """
 61
 62    config: Mapping[str, Any]
 63    parameters: InitVar[Mapping[str, Any]]
 64    secret_key: Union[InterpolatedString, str]
 65    algorithm: Union[str, JwtAlgorithm]
 66    token_duration: Optional[int]
 67    base64_encode_secret_key: Optional[Union[InterpolatedBoolean, str, bool]] = False
 68    header_prefix: Optional[Union[InterpolatedString, str]] = None
 69    kid: Optional[Union[InterpolatedString, str]] = None
 70    typ: Optional[Union[InterpolatedString, str]] = None
 71    cty: Optional[Union[InterpolatedString, str]] = None
 72    iss: Optional[Union[InterpolatedString, str]] = None
 73    sub: Optional[Union[InterpolatedString, str]] = None
 74    aud: Optional[Union[InterpolatedString, str]] = None
 75    additional_jwt_headers: Optional[Mapping[str, Any]] = None
 76    additional_jwt_payload: Optional[Mapping[str, Any]] = None
 77
 78    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 79        self._secret_key = InterpolatedString.create(self.secret_key, parameters=parameters)
 80        self._algorithm = (
 81            JwtAlgorithm(self.algorithm) if isinstance(self.algorithm, str) else self.algorithm
 82        )
 83        self._base64_encode_secret_key = (
 84            InterpolatedBoolean(self.base64_encode_secret_key, parameters=parameters)
 85            if isinstance(self.base64_encode_secret_key, str)
 86            else self.base64_encode_secret_key
 87        )
 88        self._token_duration = self.token_duration
 89        self._header_prefix = (
 90            InterpolatedString.create(self.header_prefix, parameters=parameters)
 91            if self.header_prefix
 92            else None
 93        )
 94        self._kid = InterpolatedString.create(self.kid, parameters=parameters) if self.kid else None
 95        self._typ = InterpolatedString.create(self.typ, parameters=parameters) if self.typ else None
 96        self._cty = InterpolatedString.create(self.cty, parameters=parameters) if self.cty else None
 97        self._iss = InterpolatedString.create(self.iss, parameters=parameters) if self.iss else None
 98        self._sub = InterpolatedString.create(self.sub, parameters=parameters) if self.sub else None
 99        self._aud = InterpolatedString.create(self.aud, parameters=parameters) if self.aud else None
100        self._additional_jwt_headers = InterpolatedMapping(
101            self.additional_jwt_headers or {}, parameters=parameters
102        )
103        self._additional_jwt_payload = InterpolatedMapping(
104            self.additional_jwt_payload or {}, parameters=parameters
105        )
106
107    def _get_jwt_headers(self) -> dict[str, Any]:
108        """
109        Builds and returns the headers used when signing the JWT.
110        """
111        headers = self._additional_jwt_headers.eval(self.config, json_loads=json.loads)
112        if any(prop in headers for prop in ["kid", "alg", "typ", "cty"]):
113            raise ValueError(
114                "'kid', 'alg', 'typ', 'cty' are reserved headers and should not be set as part of 'additional_jwt_headers'"
115            )
116
117        if self._kid:
118            headers["kid"] = self._kid.eval(self.config, json_loads=json.loads)
119        if self._typ:
120            headers["typ"] = self._typ.eval(self.config, json_loads=json.loads)
121        if self._cty:
122            headers["cty"] = self._cty.eval(self.config, json_loads=json.loads)
123        headers["alg"] = self._algorithm
124        return headers
125
126    def _get_jwt_payload(self) -> dict[str, Any]:
127        """
128        Builds and returns the payload used when signing the JWT.
129        """
130        now = int(datetime.now().timestamp())
131        exp = now + self._token_duration if isinstance(self._token_duration, int) else now
132        nbf = now
133
134        payload = self._additional_jwt_payload.eval(self.config, json_loads=json.loads)
135        if any(prop in payload for prop in ["iss", "sub", "aud", "iat", "exp", "nbf"]):
136            raise ValueError(
137                "'iss', 'sub', 'aud', 'iat', 'exp', 'nbf' are reserved properties and should not be set as part of 'additional_jwt_payload'"
138            )
139
140        if self._iss:
141            payload["iss"] = self._iss.eval(self.config, json_loads=json.loads)
142        if self._sub:
143            payload["sub"] = self._sub.eval(self.config, json_loads=json.loads)
144        if self._aud:
145            payload["aud"] = self._aud.eval(self.config, json_loads=json.loads)
146
147        payload["iat"] = now
148        payload["exp"] = exp
149        payload["nbf"] = nbf
150        return payload
151
152    def _get_secret_key(self) -> str:
153        """
154        Returns the secret key used to sign the JWT.
155        """
156        secret_key: str = self._secret_key.eval(self.config, json_loads=json.loads)
157        return (
158            base64.b64encode(secret_key.encode()).decode()
159            if self._base64_encode_secret_key
160            else secret_key
161        )
162
163    def _get_signed_token(self) -> Union[str, Any]:
164        """
165        Signed the JWT using the provided secret key and algorithm and the generated headers and payload. For additional information on PyJWT see: https://pyjwt.readthedocs.io/en/stable/
166        """
167        try:
168            return jwt.encode(
169                payload=self._get_jwt_payload(),
170                key=self._get_secret_key(),
171                algorithm=self._algorithm,
172                headers=self._get_jwt_headers(),
173            )
174        except Exception as e:
175            raise ValueError(f"Failed to sign token: {e}")
176
177    def _get_header_prefix(self) -> Union[str, None]:
178        """
179        Returns the header prefix to be used when attaching the token to the request.
180        """
181        return (
182            self._header_prefix.eval(self.config, json_loads=json.loads)
183            if self._header_prefix
184            else None
185        )
186
187    @property
188    def auth_header(self) -> str:
189        return "Authorization"
190
191    @property
192    def token(self) -> str:
193        return (
194            f"{self._get_header_prefix()} {self._get_signed_token()}"
195            if self._get_header_prefix()
196            else self._get_signed_token()
197        )
class JwtAlgorithm(builtins.str):
20class JwtAlgorithm(str):
21    """
22    Enum for supported JWT algorithms
23    """
24
25    HS256 = "HS256"
26    HS384 = "HS384"
27    HS512 = "HS512"
28    ES256 = "ES256"
29    ES256K = "ES256K"
30    ES384 = "ES384"
31    ES512 = "ES512"
32    RS256 = "RS256"
33    RS384 = "RS384"
34    RS512 = "RS512"
35    PS256 = "PS256"
36    PS384 = "PS384"
37    PS512 = "PS512"
38    EdDSA = "EdDSA"

Enum for supported JWT algorithms

HS256 = 'HS256'
HS384 = 'HS384'
HS512 = 'HS512'
ES256 = 'ES256'
ES256K = 'ES256K'
ES384 = 'ES384'
ES512 = 'ES512'
RS256 = 'RS256'
RS384 = 'RS384'
RS512 = 'RS512'
PS256 = 'PS256'
PS384 = 'PS384'
PS512 = 'PS512'
EdDSA = 'EdDSA'
 41@dataclass
 42class JwtAuthenticator(DeclarativeAuthenticator):
 43    """
 44    Generates a JSON Web Token (JWT) based on a declarative connector configuration file. The generated token is attached to each request via the Authorization header.
 45
 46    Attributes:
 47        config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
 48        secret_key (Union[InterpolatedString, str]): The secret key used to sign the JWT
 49        algorithm (Union[str, JwtAlgorithm]): The algorithm used to sign the JWT
 50        token_duration (Optional[int]): The duration in seconds for which the token is valid
 51        base64_encode_secret_key (Optional[Union[InterpolatedBoolean, str, bool]]): Whether to base64 encode the secret key
 52        header_prefix (Optional[Union[InterpolatedString, str]]): The prefix to add to the Authorization header
 53        kid (Optional[Union[InterpolatedString, str]]): The key identifier to be included in the JWT header
 54        typ (Optional[Union[InterpolatedString, str]]): The type of the JWT.
 55        cty (Optional[Union[InterpolatedString, str]]): The content type of the JWT.
 56        iss (Optional[Union[InterpolatedString, str]]): The issuer of the JWT.
 57        sub (Optional[Union[InterpolatedString, str]]): The subject of the JWT.
 58        aud (Optional[Union[InterpolatedString, str]]): The audience of the JWT.
 59        additional_jwt_headers (Optional[Mapping[str, Any]]): Additional headers to include in the JWT.
 60        additional_jwt_payload (Optional[Mapping[str, Any]]): Additional payload to include in the JWT.
 61    """
 62
 63    config: Mapping[str, Any]
 64    parameters: InitVar[Mapping[str, Any]]
 65    secret_key: Union[InterpolatedString, str]
 66    algorithm: Union[str, JwtAlgorithm]
 67    token_duration: Optional[int]
 68    base64_encode_secret_key: Optional[Union[InterpolatedBoolean, str, bool]] = False
 69    header_prefix: Optional[Union[InterpolatedString, str]] = None
 70    kid: Optional[Union[InterpolatedString, str]] = None
 71    typ: Optional[Union[InterpolatedString, str]] = None
 72    cty: Optional[Union[InterpolatedString, str]] = None
 73    iss: Optional[Union[InterpolatedString, str]] = None
 74    sub: Optional[Union[InterpolatedString, str]] = None
 75    aud: Optional[Union[InterpolatedString, str]] = None
 76    additional_jwt_headers: Optional[Mapping[str, Any]] = None
 77    additional_jwt_payload: Optional[Mapping[str, Any]] = None
 78
 79    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 80        self._secret_key = InterpolatedString.create(self.secret_key, parameters=parameters)
 81        self._algorithm = (
 82            JwtAlgorithm(self.algorithm) if isinstance(self.algorithm, str) else self.algorithm
 83        )
 84        self._base64_encode_secret_key = (
 85            InterpolatedBoolean(self.base64_encode_secret_key, parameters=parameters)
 86            if isinstance(self.base64_encode_secret_key, str)
 87            else self.base64_encode_secret_key
 88        )
 89        self._token_duration = self.token_duration
 90        self._header_prefix = (
 91            InterpolatedString.create(self.header_prefix, parameters=parameters)
 92            if self.header_prefix
 93            else None
 94        )
 95        self._kid = InterpolatedString.create(self.kid, parameters=parameters) if self.kid else None
 96        self._typ = InterpolatedString.create(self.typ, parameters=parameters) if self.typ else None
 97        self._cty = InterpolatedString.create(self.cty, parameters=parameters) if self.cty else None
 98        self._iss = InterpolatedString.create(self.iss, parameters=parameters) if self.iss else None
 99        self._sub = InterpolatedString.create(self.sub, parameters=parameters) if self.sub else None
100        self._aud = InterpolatedString.create(self.aud, parameters=parameters) if self.aud else None
101        self._additional_jwt_headers = InterpolatedMapping(
102            self.additional_jwt_headers or {}, parameters=parameters
103        )
104        self._additional_jwt_payload = InterpolatedMapping(
105            self.additional_jwt_payload or {}, parameters=parameters
106        )
107
108    def _get_jwt_headers(self) -> dict[str, Any]:
109        """
110        Builds and returns the headers used when signing the JWT.
111        """
112        headers = self._additional_jwt_headers.eval(self.config, json_loads=json.loads)
113        if any(prop in headers for prop in ["kid", "alg", "typ", "cty"]):
114            raise ValueError(
115                "'kid', 'alg', 'typ', 'cty' are reserved headers and should not be set as part of 'additional_jwt_headers'"
116            )
117
118        if self._kid:
119            headers["kid"] = self._kid.eval(self.config, json_loads=json.loads)
120        if self._typ:
121            headers["typ"] = self._typ.eval(self.config, json_loads=json.loads)
122        if self._cty:
123            headers["cty"] = self._cty.eval(self.config, json_loads=json.loads)
124        headers["alg"] = self._algorithm
125        return headers
126
127    def _get_jwt_payload(self) -> dict[str, Any]:
128        """
129        Builds and returns the payload used when signing the JWT.
130        """
131        now = int(datetime.now().timestamp())
132        exp = now + self._token_duration if isinstance(self._token_duration, int) else now
133        nbf = now
134
135        payload = self._additional_jwt_payload.eval(self.config, json_loads=json.loads)
136        if any(prop in payload for prop in ["iss", "sub", "aud", "iat", "exp", "nbf"]):
137            raise ValueError(
138                "'iss', 'sub', 'aud', 'iat', 'exp', 'nbf' are reserved properties and should not be set as part of 'additional_jwt_payload'"
139            )
140
141        if self._iss:
142            payload["iss"] = self._iss.eval(self.config, json_loads=json.loads)
143        if self._sub:
144            payload["sub"] = self._sub.eval(self.config, json_loads=json.loads)
145        if self._aud:
146            payload["aud"] = self._aud.eval(self.config, json_loads=json.loads)
147
148        payload["iat"] = now
149        payload["exp"] = exp
150        payload["nbf"] = nbf
151        return payload
152
153    def _get_secret_key(self) -> str:
154        """
155        Returns the secret key used to sign the JWT.
156        """
157        secret_key: str = self._secret_key.eval(self.config, json_loads=json.loads)
158        return (
159            base64.b64encode(secret_key.encode()).decode()
160            if self._base64_encode_secret_key
161            else secret_key
162        )
163
164    def _get_signed_token(self) -> Union[str, Any]:
165        """
166        Signed the JWT using the provided secret key and algorithm and the generated headers and payload. For additional information on PyJWT see: https://pyjwt.readthedocs.io/en/stable/
167        """
168        try:
169            return jwt.encode(
170                payload=self._get_jwt_payload(),
171                key=self._get_secret_key(),
172                algorithm=self._algorithm,
173                headers=self._get_jwt_headers(),
174            )
175        except Exception as e:
176            raise ValueError(f"Failed to sign token: {e}")
177
178    def _get_header_prefix(self) -> Union[str, None]:
179        """
180        Returns the header prefix to be used when attaching the token to the request.
181        """
182        return (
183            self._header_prefix.eval(self.config, json_loads=json.loads)
184            if self._header_prefix
185            else None
186        )
187
188    @property
189    def auth_header(self) -> str:
190        return "Authorization"
191
192    @property
193    def token(self) -> str:
194        return (
195            f"{self._get_header_prefix()} {self._get_signed_token()}"
196            if self._get_header_prefix()
197            else self._get_signed_token()
198        )

Generates a JSON Web Token (JWT) based on a declarative connector configuration file. The generated token is attached to each request via the Authorization header.

Attributes:
  • config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
  • secret_key (Union[InterpolatedString, str]): The secret key used to sign the JWT
  • algorithm (Union[str, JwtAlgorithm]): The algorithm used to sign the JWT
  • token_duration (Optional[int]): The duration in seconds for which the token is valid
  • base64_encode_secret_key (Optional[Union[InterpolatedBoolean, str, bool]]): Whether to base64 encode the secret key
  • header_prefix (Optional[Union[InterpolatedString, str]]): The prefix to add to the Authorization header
  • kid (Optional[Union[InterpolatedString, str]]): The key identifier to be included in the JWT header
  • typ (Optional[Union[InterpolatedString, str]]): The type of the JWT.
  • cty (Optional[Union[InterpolatedString, str]]): The content type of the JWT.
  • iss (Optional[Union[InterpolatedString, str]]): The issuer of the JWT.
  • sub (Optional[Union[InterpolatedString, str]]): The subject of the JWT.
  • aud (Optional[Union[InterpolatedString, str]]): The audience of the JWT.
  • additional_jwt_headers (Optional[Mapping[str, Any]]): Additional headers to include in the JWT.
  • additional_jwt_payload (Optional[Mapping[str, Any]]): Additional payload to include in the JWT.
JwtAuthenticator( config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], secret_key: Union[airbyte_cdk.InterpolatedString, str], algorithm: Union[str, JwtAlgorithm], token_duration: Optional[int], base64_encode_secret_key: Union[airbyte_cdk.InterpolatedBoolean, str, bool, NoneType] = False, header_prefix: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, kid: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, typ: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, cty: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, iss: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, sub: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, aud: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, additional_jwt_headers: Optional[Mapping[str, Any]] = None, additional_jwt_payload: Optional[Mapping[str, Any]] = None)
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
secret_key: Union[airbyte_cdk.InterpolatedString, str]
algorithm: Union[str, JwtAlgorithm]
token_duration: Optional[int]
base64_encode_secret_key: Union[airbyte_cdk.InterpolatedBoolean, str, bool, NoneType] = False
header_prefix: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
kid: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
typ: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
cty: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
iss: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
sub: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
aud: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
additional_jwt_headers: Optional[Mapping[str, Any]] = None
additional_jwt_payload: Optional[Mapping[str, Any]] = None
auth_header: str
188    @property
189    def auth_header(self) -> str:
190        return "Authorization"

HTTP header to set on the requests

token: str
192    @property
193    def token(self) -> str:
194        return (
195            f"{self._get_header_prefix()} {self._get_signed_token()}"
196            if self._get_header_prefix()
197            else self._get_signed_token()
198        )

The header value to set on outgoing HTTP requests