
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  5import base64
  6import json
  7from dataclasses import InitVar, dataclass
  8from datetime import datetime
  9from typing import Any, Mapping, Optional, Union
 11import jwt
 13from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator
 14from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
 15from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
 16from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
 19class JwtAlgorithm(str):
 20    """
 21    Enum for supported JWT algorithms
 22    """
 24    HS256 = "HS256"
 25    HS384 = "HS384"
 26    HS512 = "HS512"
 27    ES256 = "ES256"
 28    ES256K = "ES256K"
 29    ES384 = "ES384"
 30    ES512 = "ES512"
 31    RS256 = "RS256"
 32    RS384 = "RS384"
 33    RS512 = "RS512"
 34    PS256 = "PS256"
 35    PS384 = "PS384"
 36    PS512 = "PS512"
 37    EdDSA = "EdDSA"
 41class JwtAuthenticator(DeclarativeAuthenticator):
 42    """
 43    Generates a JSON Web Token (JWT) based on a declarative connector configuration file. The generated token is attached to each request via the Authorization header.
 45    Attributes:
 46        config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
 47        secret_key (Union[InterpolatedString, str]): The secret key used to sign the JWT
 48        algorithm (Union[str, JwtAlgorithm]): The algorithm used to sign the JWT
 49        token_duration (Optional[int]): The duration in seconds for which the token is valid
 50        base64_encode_secret_key (Optional[Union[InterpolatedBoolean, str, bool]]): Whether to base64 encode the secret key
 51        header_prefix (Optional[Union[InterpolatedString, str]]): The prefix to add to the Authorization header
 52        kid (Optional[Union[InterpolatedString, str]]): The key identifier to be included in the JWT header
 53        typ (Optional[Union[InterpolatedString, str]]): The type of the JWT.
 54        cty (Optional[Union[InterpolatedString, str]]): The content type of the JWT.
 55        iss (Optional[Union[InterpolatedString, str]]): The issuer of the JWT.
 56        sub (Optional[Union[InterpolatedString, str]]): The subject of the JWT.
 57        aud (Optional[Union[InterpolatedString, str]]): The audience of the JWT.
 58        additional_jwt_headers (Optional[Mapping[str, Any]]): Additional headers to include in the JWT.
 59        additional_jwt_payload (Optional[Mapping[str, Any]]): Additional payload to include in the JWT.
 60    """
 62    config: Mapping[str, Any]
 63    parameters: InitVar[Mapping[str, Any]]
 64    secret_key: Union[InterpolatedString, str]
 65    algorithm: Union[str, JwtAlgorithm]
 66    token_duration: Optional[int]
 67    base64_encode_secret_key: Optional[Union[InterpolatedBoolean, str, bool]] = False
 68    header_prefix: Optional[Union[InterpolatedString, str]] = None
 69    kid: Optional[Union[InterpolatedString, str]] = None
 70    typ: Optional[Union[InterpolatedString, str]] = None
 71    cty: Optional[Union[InterpolatedString, str]] = None
 72    iss: Optional[Union[InterpolatedString, str]] = None
 73    sub: Optional[Union[InterpolatedString, str]] = None
 74    aud: Optional[Union[InterpolatedString, str]] = None
 75    additional_jwt_headers: Optional[Mapping[str, Any]] = None
 76    additional_jwt_payload: Optional[Mapping[str, Any]] = None
 78    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 79        self._secret_key = InterpolatedString.create(self.secret_key, parameters=parameters)
 80        self._algorithm = (
 81            JwtAlgorithm(self.algorithm) if isinstance(self.algorithm, str) else self.algorithm
 82        )
 83        self._base64_encode_secret_key = (
 84            InterpolatedBoolean(self.base64_encode_secret_key, parameters=parameters)
 85            if isinstance(self.base64_encode_secret_key, str)
 86            else self.base64_encode_secret_key
 87        )
 88        self._token_duration = self.token_duration
 89        self._header_prefix = (
 90            InterpolatedString.create(self.header_prefix, parameters=parameters)
 91            if self.header_prefix
 92            else None
 93        )
 94        self._kid = InterpolatedString.create(self.kid, parameters=parameters) if self.kid else None
 95        self._typ = InterpolatedString.create(self.typ, parameters=parameters) if self.typ else None
 96        self._cty = InterpolatedString.create(self.cty, parameters=parameters) if self.cty else None
 97        self._iss = InterpolatedString.create(self.iss, parameters=parameters) if self.iss else None
 98        self._sub = InterpolatedString.create(self.sub, parameters=parameters) if self.sub else None
 99        self._aud = InterpolatedString.create(self.aud, parameters=parameters) if self.aud else None
100        self._additional_jwt_headers = InterpolatedMapping(
101            self.additional_jwt_headers or {}, parameters=parameters
102        )
103        self._additional_jwt_payload = InterpolatedMapping(
104            self.additional_jwt_payload or {}, parameters=parameters
105        )
107    def _get_jwt_headers(self) -> dict[str, Any]:
108        """
109        Builds and returns the headers used when signing the JWT.
110        """
111        headers = self._additional_jwt_headers.eval(self.config, json_loads=json.loads)
112        if any(prop in headers for prop in ["kid", "alg", "typ", "cty"]):
113            raise ValueError(
114                "'kid', 'alg', 'typ', 'cty' are reserved headers and should not be set as part of 'additional_jwt_headers'"
115            )
117        if self._kid:
118            headers["kid"] = self._kid.eval(self.config, json_loads=json.loads)
119        if self._typ:
120            headers["typ"] = self._typ.eval(self.config, json_loads=json.loads)
121        if self._cty:
122            headers["cty"] = self._cty.eval(self.config, json_loads=json.loads)
123        headers["alg"] = self._algorithm
124        return headers
126    def _get_jwt_payload(self) -> dict[str, Any]:
127        """
128        Builds and returns the payload used when signing the JWT.
129        """
130        now = int(
131        exp = now + self._token_duration if isinstance(self._token_duration, int) else now
132        nbf = now
134        payload = self._additional_jwt_payload.eval(self.config, json_loads=json.loads)
135        if any(prop in payload for prop in ["iss", "sub", "aud", "iat", "exp", "nbf"]):
136            raise ValueError(
137                "'iss', 'sub', 'aud', 'iat', 'exp', 'nbf' are reserved properties and should not be set as part of 'additional_jwt_payload'"
138            )
140        if self._iss:
141            payload["iss"] = self._iss.eval(self.config, json_loads=json.loads)
142        if self._sub:
143            payload["sub"] = self._sub.eval(self.config, json_loads=json.loads)
144        if self._aud:
145            payload["aud"] = self._aud.eval(self.config, json_loads=json.loads)
147        payload["iat"] = now
148        payload["exp"] = exp
149        payload["nbf"] = nbf
150        return payload
152    def _get_secret_key(self) -> str:
153        """
154        Returns the secret key used to sign the JWT.
155        """
156        secret_key: str = self._secret_key.eval(self.config, json_loads=json.loads)
157        return (
158            base64.b64encode(secret_key.encode()).decode()
159            if self._base64_encode_secret_key
160            else secret_key
161        )
163    def _get_signed_token(self) -> Union[str, Any]:
164        """
165        Signed the JWT using the provided secret key and algorithm and the generated headers and payload. For additional information on PyJWT see:
166        """
167        try:
168            return jwt.encode(
169                payload=self._get_jwt_payload(),
170                key=self._get_secret_key(),
171                algorithm=self._algorithm,
172                headers=self._get_jwt_headers(),
173            )
174        except Exception as e:
175            raise ValueError(f"Failed to sign token: {e}")
177    def _get_header_prefix(self) -> Union[str, None]:
178        """
179        Returns the header prefix to be used when attaching the token to the request.
180        """
181        return (
182            self._header_prefix.eval(self.config, json_loads=json.loads)
183            if self._header_prefix
184            else None
185        )
187    @property
188    def auth_header(self) -> str:
189        return "Authorization"
191    @property
192    def token(self) -> str:
193        return (
194            f"{self._get_header_prefix()} {self._get_signed_token()}"
195            if self._get_header_prefix()
196            else self._get_signed_token()
197        )
class JwtAlgorithm(builtins.str):
20class JwtAlgorithm(str):
21    """
22    Enum for supported JWT algorithms
23    """
25    HS256 = "HS256"
26    HS384 = "HS384"
27    HS512 = "HS512"
28    ES256 = "ES256"
29    ES256K = "ES256K"
30    ES384 = "ES384"
31    ES512 = "ES512"
32    RS256 = "RS256"
33    RS384 = "RS384"
34    RS512 = "RS512"
35    PS256 = "PS256"
36    PS384 = "PS384"
37    PS512 = "PS512"
38    EdDSA = "EdDSA"

Enum for supported JWT algorithms

HS256 = 'HS256'
HS384 = 'HS384'
HS512 = 'HS512'
ES256 = 'ES256'
ES256K = 'ES256K'
ES384 = 'ES384'
ES512 = 'ES512'
RS256 = 'RS256'
RS384 = 'RS384'
RS512 = 'RS512'
PS256 = 'PS256'
PS384 = 'PS384'
PS512 = 'PS512'
 42class JwtAuthenticator(DeclarativeAuthenticator):
 43    """
 44    Generates a JSON Web Token (JWT) based on a declarative connector configuration file. The generated token is attached to each request via the Authorization header.
 46    Attributes:
 47        config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
 48        secret_key (Union[InterpolatedString, str]): The secret key used to sign the JWT
 49        algorithm (Union[str, JwtAlgorithm]): The algorithm used to sign the JWT
 50        token_duration (Optional[int]): The duration in seconds for which the token is valid
 51        base64_encode_secret_key (Optional[Union[InterpolatedBoolean, str, bool]]): Whether to base64 encode the secret key
 52        header_prefix (Optional[Union[InterpolatedString, str]]): The prefix to add to the Authorization header
 53        kid (Optional[Union[InterpolatedString, str]]): The key identifier to be included in the JWT header
 54        typ (Optional[Union[InterpolatedString, str]]): The type of the JWT.
 55        cty (Optional[Union[InterpolatedString, str]]): The content type of the JWT.
 56        iss (Optional[Union[InterpolatedString, str]]): The issuer of the JWT.
 57        sub (Optional[Union[InterpolatedString, str]]): The subject of the JWT.
 58        aud (Optional[Union[InterpolatedString, str]]): The audience of the JWT.
 59        additional_jwt_headers (Optional[Mapping[str, Any]]): Additional headers to include in the JWT.
 60        additional_jwt_payload (Optional[Mapping[str, Any]]): Additional payload to include in the JWT.
 61    """
 63    config: Mapping[str, Any]
 64    parameters: InitVar[Mapping[str, Any]]
 65    secret_key: Union[InterpolatedString, str]
 66    algorithm: Union[str, JwtAlgorithm]
 67    token_duration: Optional[int]
 68    base64_encode_secret_key: Optional[Union[InterpolatedBoolean, str, bool]] = False
 69    header_prefix: Optional[Union[InterpolatedString, str]] = None
 70    kid: Optional[Union[InterpolatedString, str]] = None
 71    typ: Optional[Union[InterpolatedString, str]] = None
 72    cty: Optional[Union[InterpolatedString, str]] = None
 73    iss: Optional[Union[InterpolatedString, str]] = None
 74    sub: Optional[Union[InterpolatedString, str]] = None
 75    aud: Optional[Union[InterpolatedString, str]] = None
 76    additional_jwt_headers: Optional[Mapping[str, Any]] = None
 77    additional_jwt_payload: Optional[Mapping[str, Any]] = None
 79    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 80        self._secret_key = InterpolatedString.create(self.secret_key, parameters=parameters)
 81        self._algorithm = (
 82            JwtAlgorithm(self.algorithm) if isinstance(self.algorithm, str) else self.algorithm
 83        )
 84        self._base64_encode_secret_key = (
 85            InterpolatedBoolean(self.base64_encode_secret_key, parameters=parameters)
 86            if isinstance(self.base64_encode_secret_key, str)
 87            else self.base64_encode_secret_key
 88        )
 89        self._token_duration = self.token_duration
 90        self._header_prefix = (
 91            InterpolatedString.create(self.header_prefix, parameters=parameters)
 92            if self.header_prefix
 93            else None
 94        )
 95        self._kid = InterpolatedString.create(self.kid, parameters=parameters) if self.kid else None
 96        self._typ = InterpolatedString.create(self.typ, parameters=parameters) if self.typ else None
 97        self._cty = InterpolatedString.create(self.cty, parameters=parameters) if self.cty else None
 98        self._iss = InterpolatedString.create(self.iss, parameters=parameters) if self.iss else None
 99        self._sub = InterpolatedString.create(self.sub, parameters=parameters) if self.sub else None
100        self._aud = InterpolatedString.create(self.aud, parameters=parameters) if self.aud else None
101        self._additional_jwt_headers = InterpolatedMapping(
102            self.additional_jwt_headers or {}, parameters=parameters
103        )
104        self._additional_jwt_payload = InterpolatedMapping(
105            self.additional_jwt_payload or {}, parameters=parameters
106        )
108    def _get_jwt_headers(self) -> dict[str, Any]:
109        """
110        Builds and returns the headers used when signing the JWT.
111        """
112        headers = self._additional_jwt_headers.eval(self.config, json_loads=json.loads)
113        if any(prop in headers for prop in ["kid", "alg", "typ", "cty"]):
114            raise ValueError(
115                "'kid', 'alg', 'typ', 'cty' are reserved headers and should not be set as part of 'additional_jwt_headers'"
116            )
118        if self._kid:
119            headers["kid"] = self._kid.eval(self.config, json_loads=json.loads)
120        if self._typ:
121            headers["typ"] = self._typ.eval(self.config, json_loads=json.loads)
122        if self._cty:
123            headers["cty"] = self._cty.eval(self.config, json_loads=json.loads)
124        headers["alg"] = self._algorithm
125        return headers
127    def _get_jwt_payload(self) -> dict[str, Any]:
128        """
129        Builds and returns the payload used when signing the JWT.
130        """
131        now = int(
132        exp = now + self._token_duration if isinstance(self._token_duration, int) else now
133        nbf = now
135        payload = self._additional_jwt_payload.eval(self.config, json_loads=json.loads)
136        if any(prop in payload for prop in ["iss", "sub", "aud", "iat", "exp", "nbf"]):
137            raise ValueError(
138                "'iss', 'sub', 'aud', 'iat', 'exp', 'nbf' are reserved properties and should not be set as part of 'additional_jwt_payload'"
139            )
141        if self._iss:
142            payload["iss"] = self._iss.eval(self.config, json_loads=json.loads)
143        if self._sub:
144            payload["sub"] = self._sub.eval(self.config, json_loads=json.loads)
145        if self._aud:
146            payload["aud"] = self._aud.eval(self.config, json_loads=json.loads)
148        payload["iat"] = now
149        payload["exp"] = exp
150        payload["nbf"] = nbf
151        return payload
153    def _get_secret_key(self) -> str:
154        """
155        Returns the secret key used to sign the JWT.
156        """
157        secret_key: str = self._secret_key.eval(self.config, json_loads=json.loads)
158        return (
159            base64.b64encode(secret_key.encode()).decode()
160            if self._base64_encode_secret_key
161            else secret_key
162        )
164    def _get_signed_token(self) -> Union[str, Any]:
165        """
166        Signed the JWT using the provided secret key and algorithm and the generated headers and payload. For additional information on PyJWT see:
167        """
168        try:
169            return jwt.encode(
170                payload=self._get_jwt_payload(),
171                key=self._get_secret_key(),
172                algorithm=self._algorithm,
173                headers=self._get_jwt_headers(),
174            )
175        except Exception as e:
176            raise ValueError(f"Failed to sign token: {e}")
178    def _get_header_prefix(self) -> Union[str, None]:
179        """
180        Returns the header prefix to be used when attaching the token to the request.
181        """
182        return (
183            self._header_prefix.eval(self.config, json_loads=json.loads)
184            if self._header_prefix
185            else None
186        )
188    @property
189    def auth_header(self) -> str:
190        return "Authorization"
192    @property
193    def token(self) -> str:
194        return (
195            f"{self._get_header_prefix()} {self._get_signed_token()}"
196            if self._get_header_prefix()
197            else self._get_signed_token()
198        )

Generates a JSON Web Token (JWT) based on a declarative connector configuration file. The generated token is attached to each request via the Authorization header.

  • config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
  • secret_key (Union[InterpolatedString, str]): The secret key used to sign the JWT
  • algorithm (Union[str, JwtAlgorithm]): The algorithm used to sign the JWT
  • token_duration (Optional[int]): The duration in seconds for which the token is valid
  • base64_encode_secret_key (Optional[Union[InterpolatedBoolean, str, bool]]): Whether to base64 encode the secret key
  • header_prefix (Optional[Union[InterpolatedString, str]]): The prefix to add to the Authorization header
  • kid (Optional[Union[InterpolatedString, str]]): The key identifier to be included in the JWT header
  • typ (Optional[Union[InterpolatedString, str]]): The type of the JWT.
  • cty (Optional[Union[InterpolatedString, str]]): The content type of the JWT.
  • iss (Optional[Union[InterpolatedString, str]]): The issuer of the JWT.
  • sub (Optional[Union[InterpolatedString, str]]): The subject of the JWT.
  • aud (Optional[Union[InterpolatedString, str]]): The audience of the JWT.
  • additional_jwt_headers (Optional[Mapping[str, Any]]): Additional headers to include in the JWT.
  • additional_jwt_payload (Optional[Mapping[str, Any]]): Additional payload to include in the JWT.
JwtAuthenticator( config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], secret_key: Union[airbyte_cdk.InterpolatedString, str], algorithm: Union[str, JwtAlgorithm], token_duration: Optional[int], base64_encode_secret_key: Union[airbyte_cdk.InterpolatedBoolean, str, bool, NoneType] = False, header_prefix: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, kid: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, typ: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, cty: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, iss: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, sub: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, aud: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, additional_jwt_headers: Optional[Mapping[str, Any]] = None, additional_jwt_payload: Optional[Mapping[str, Any]] = None)
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
secret_key: Union[airbyte_cdk.InterpolatedString, str]
algorithm: Union[str, JwtAlgorithm]
token_duration: Optional[int]
base64_encode_secret_key: Union[airbyte_cdk.InterpolatedBoolean, str, bool, NoneType] = False
header_prefix: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
kid: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
typ: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
cty: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
iss: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
sub: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
aud: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
additional_jwt_headers: Optional[Mapping[str, Any]] = None
additional_jwt_payload: Optional[Mapping[str, Any]] = None
auth_header: str
188    @property
189    def auth_header(self) -> str:
190        return "Authorization"

HTTP header to set on the requests

token: str
192    @property
193    def token(self) -> str:
194        return (
195            f"{self._get_header_prefix()} {self._get_signed_token()}"
196            if self._get_header_prefix()
197            else self._get_signed_token()
198        )

The header value to set on outgoing HTTP requests