airbyte_cdk.sources.declarative.auth.jwt

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5import base64
  6import json
  7from dataclasses import InitVar, dataclass
  8from datetime import datetime
  9from typing import Any, Mapping, MutableMapping, Optional, Union, cast
 10
 11import jwt
 12from cryptography.hazmat.primitives import serialization
 13from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey
 14from cryptography.hazmat.primitives.asymmetric.ed448 import Ed448PrivateKey
 15from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
 16from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
 17
 18from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator
 19from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
 20from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
 21from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
 22from airbyte_cdk.sources.declarative.requesters.request_option import (
 23    RequestOption,
 24    RequestOptionType,
 25)
 26
 27# Type alias for keys that JWT library accepts
 28JwtKeyTypes = Union[
 29    RSAPrivateKey, EllipticCurvePrivateKey, Ed25519PrivateKey, Ed448PrivateKey, str, bytes
 30]
 31
 32
 33class JwtAlgorithm(str):
 34    """
 35    Enum for supported JWT algorithms
 36    """
 37
 38    HS256 = "HS256"
 39    HS384 = "HS384"
 40    HS512 = "HS512"
 41    ES256 = "ES256"
 42    ES256K = "ES256K"
 43    ES384 = "ES384"
 44    ES512 = "ES512"
 45    RS256 = "RS256"
 46    RS384 = "RS384"
 47    RS512 = "RS512"
 48    PS256 = "PS256"
 49    PS384 = "PS384"
 50    PS512 = "PS512"
 51    EdDSA = "EdDSA"
 52
 53
 54@dataclass
 55class JwtAuthenticator(DeclarativeAuthenticator):
 56    """
 57    Generates a JSON Web Token (JWT) based on a declarative connector configuration file. The generated token is attached to each request via the Authorization header.
 58
 59    Attributes:
 60        config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
 61        secret_key (Union[InterpolatedString, str]): The secret key used to sign the JWT
 62        algorithm (Union[str, JwtAlgorithm]): The algorithm used to sign the JWT
 63        token_duration (Optional[int]): The duration in seconds for which the token is valid
 64        base64_encode_secret_key (Optional[Union[InterpolatedBoolean, str, bool]]): Whether to base64 encode the secret key
 65        header_prefix (Optional[Union[InterpolatedString, str]]): The prefix to add to the Authorization header
 66        kid (Optional[Union[InterpolatedString, str]]): The key identifier to be included in the JWT header
 67        typ (Optional[Union[InterpolatedString, str]]): The type of the JWT.
 68        cty (Optional[Union[InterpolatedString, str]]): The content type of the JWT.
 69        iss (Optional[Union[InterpolatedString, str]]): The issuer of the JWT.
 70        sub (Optional[Union[InterpolatedString, str]]): The subject of the JWT.
 71        aud (Optional[Union[InterpolatedString, str]]): The audience of the JWT.
 72        additional_jwt_headers (Optional[Mapping[str, Any]]): Additional headers to include in the JWT.
 73        additional_jwt_payload (Optional[Mapping[str, Any]]): Additional payload to include in the JWT.
 74    """
 75
 76    config: Mapping[str, Any]
 77    parameters: InitVar[Mapping[str, Any]]
 78    secret_key: Union[InterpolatedString, str]
 79    algorithm: Union[str, JwtAlgorithm]
 80    token_duration: Optional[int]
 81    base64_encode_secret_key: Optional[Union[InterpolatedBoolean, str, bool]] = False
 82    header_prefix: Optional[Union[InterpolatedString, str]] = None
 83    kid: Optional[Union[InterpolatedString, str]] = None
 84    typ: Optional[Union[InterpolatedString, str]] = None
 85    cty: Optional[Union[InterpolatedString, str]] = None
 86    iss: Optional[Union[InterpolatedString, str]] = None
 87    sub: Optional[Union[InterpolatedString, str]] = None
 88    aud: Optional[Union[InterpolatedString, str]] = None
 89    additional_jwt_headers: Optional[Mapping[str, Any]] = None
 90    additional_jwt_payload: Optional[Mapping[str, Any]] = None
 91    passphrase: Optional[Union[InterpolatedString, str]] = None
 92    request_option: Optional[RequestOption] = None
 93
 94    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 95        self._secret_key = InterpolatedString.create(self.secret_key, parameters=parameters)
 96        self._algorithm = (
 97            JwtAlgorithm(self.algorithm) if isinstance(self.algorithm, str) else self.algorithm
 98        )
 99        self._base64_encode_secret_key = (
100            InterpolatedBoolean(self.base64_encode_secret_key, parameters=parameters)
101            if isinstance(self.base64_encode_secret_key, str)
102            else self.base64_encode_secret_key
103        )
104        self._token_duration = self.token_duration
105        self._header_prefix = (
106            InterpolatedString.create(self.header_prefix, parameters=parameters)
107            if self.header_prefix
108            else None
109        )
110        self._kid = InterpolatedString.create(self.kid, parameters=parameters) if self.kid else None
111        self._typ = InterpolatedString.create(self.typ, parameters=parameters) if self.typ else None
112        self._cty = InterpolatedString.create(self.cty, parameters=parameters) if self.cty else None
113        self._iss = InterpolatedString.create(self.iss, parameters=parameters) if self.iss else None
114        self._sub = InterpolatedString.create(self.sub, parameters=parameters) if self.sub else None
115        self._aud = InterpolatedString.create(self.aud, parameters=parameters) if self.aud else None
116        self._additional_jwt_headers = InterpolatedMapping(
117            self.additional_jwt_headers or {}, parameters=parameters
118        )
119        self._additional_jwt_payload = InterpolatedMapping(
120            self.additional_jwt_payload or {}, parameters=parameters
121        )
122        self._passphrase = (
123            InterpolatedString.create(self.passphrase, parameters=parameters)
124            if self.passphrase
125            else None
126        )
127
128        # When we first implemented the JWT authenticator, we assumed that the signed token was always supposed
129        # to be loaded into the request headers under the `Authorization` key. This is not always the case, but
130        # this default option allows for backwards compatibility to be retained for existing connectors
131        self._request_option = self.request_option or RequestOption(
132            inject_into=RequestOptionType.header, field_name="Authorization", parameters=parameters
133        )
134
135    def _get_jwt_headers(self) -> dict[str, Any]:
136        """
137        Builds and returns the headers used when signing the JWT.
138        """
139        headers = self._additional_jwt_headers.eval(self.config, json_loads=json.loads)
140        if any(prop in headers for prop in ["kid", "alg", "typ", "cty"]):
141            raise ValueError(
142                "'kid', 'alg', 'typ', 'cty' are reserved headers and should not be set as part of 'additional_jwt_headers'"
143            )
144
145        if self._kid:
146            headers["kid"] = self._kid.eval(self.config, json_loads=json.loads)
147        if self._typ:
148            headers["typ"] = self._typ.eval(self.config, json_loads=json.loads)
149        if self._cty:
150            headers["cty"] = self._cty.eval(self.config, json_loads=json.loads)
151        headers["alg"] = self._algorithm
152        return headers
153
154    def _get_jwt_payload(self) -> dict[str, Any]:
155        """
156        Builds and returns the payload used when signing the JWT.
157        """
158        now = int(datetime.now().timestamp())
159        exp = now + self._token_duration if isinstance(self._token_duration, int) else now
160        nbf = now
161
162        payload = self._additional_jwt_payload.eval(self.config, json_loads=json.loads)
163        if any(prop in payload for prop in ["iss", "sub", "aud", "iat", "exp", "nbf"]):
164            raise ValueError(
165                "'iss', 'sub', 'aud', 'iat', 'exp', 'nbf' are reserved properties and should not be set as part of 'additional_jwt_payload'"
166            )
167
168        if self._iss:
169            payload["iss"] = self._iss.eval(self.config, json_loads=json.loads)
170        if self._sub:
171            payload["sub"] = self._sub.eval(self.config, json_loads=json.loads)
172        if self._aud:
173            payload["aud"] = self._aud.eval(self.config, json_loads=json.loads)
174
175        payload["iat"] = now
176        payload["exp"] = exp
177        payload["nbf"] = nbf
178        return payload
179
180    def _get_secret_key(self) -> JwtKeyTypes:
181        """
182        Returns the secret key used to sign the JWT.
183        """
184        secret_key: str = self._secret_key.eval(self.config, json_loads=json.loads)
185
186        if self._passphrase:
187            passphrase_value = self._passphrase.eval(self.config, json_loads=json.loads)
188            if passphrase_value:
189                private_key = serialization.load_pem_private_key(
190                    secret_key.encode(),
191                    password=passphrase_value.encode(),
192                )
193                return cast(JwtKeyTypes, private_key)
194
195        return (
196            base64.b64encode(secret_key.encode()).decode()
197            if self._base64_encode_secret_key
198            else secret_key
199        )
200
201    def _get_signed_token(self) -> Union[str, Any]:
202        """
203        Signed the JWT using the provided secret key and algorithm and the generated headers and payload. For additional information on PyJWT see: https://pyjwt.readthedocs.io/en/stable/
204        """
205        try:
206            return jwt.encode(
207                payload=self._get_jwt_payload(),
208                key=self._get_secret_key(),
209                algorithm=self._algorithm,
210                headers=self._get_jwt_headers(),
211            )
212        except Exception as e:
213            raise ValueError(f"Failed to sign token: {e}")
214
215    def _get_header_prefix(self) -> Union[str, None]:
216        """
217        Returns the header prefix to be used when attaching the token to the request.
218        """
219        return (
220            self._header_prefix.eval(self.config, json_loads=json.loads)
221            if self._header_prefix
222            else None
223        )
224
225    @property
226    def auth_header(self) -> str:
227        options = self._get_request_options(RequestOptionType.header)
228        return next(iter(options.keys()), "")
229
230    @property
231    def token(self) -> str:
232        return (
233            f"{self._get_header_prefix()} {self._get_signed_token()}"
234            if self._get_header_prefix()
235            else self._get_signed_token()
236        )
237
238    def get_request_params(self) -> Mapping[str, Any]:
239        return self._get_request_options(RequestOptionType.request_parameter)
240
241    def get_request_body_data(self) -> Union[Mapping[str, Any], str]:
242        return self._get_request_options(RequestOptionType.body_data)
243
244    def get_request_body_json(self) -> Mapping[str, Any]:
245        return self._get_request_options(RequestOptionType.body_json)
246
247    def _get_request_options(self, option_type: RequestOptionType) -> Mapping[str, Any]:
248        options: MutableMapping[str, Any] = {}
249        if self._request_option.inject_into == option_type:
250            self._request_option.inject_into_request(options, self.token, self.config)
251        return options
JwtKeyTypes = typing.Union[cryptography.hazmat.primitives.asymmetric.rsa.RSAPrivateKey, cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePrivateKey, cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey, cryptography.hazmat.primitives.asymmetric.ed448.Ed448PrivateKey, str, bytes]
class JwtAlgorithm(builtins.str):
34class JwtAlgorithm(str):
35    """
36    Enum for supported JWT algorithms
37    """
38
39    HS256 = "HS256"
40    HS384 = "HS384"
41    HS512 = "HS512"
42    ES256 = "ES256"
43    ES256K = "ES256K"
44    ES384 = "ES384"
45    ES512 = "ES512"
46    RS256 = "RS256"
47    RS384 = "RS384"
48    RS512 = "RS512"
49    PS256 = "PS256"
50    PS384 = "PS384"
51    PS512 = "PS512"
52    EdDSA = "EdDSA"

Enum for supported JWT algorithms

HS256 = 'HS256'
HS384 = 'HS384'
HS512 = 'HS512'
ES256 = 'ES256'
ES256K = 'ES256K'
ES384 = 'ES384'
ES512 = 'ES512'
RS256 = 'RS256'
RS384 = 'RS384'
RS512 = 'RS512'
PS256 = 'PS256'
PS384 = 'PS384'
PS512 = 'PS512'
EdDSA = 'EdDSA'
 55@dataclass
 56class JwtAuthenticator(DeclarativeAuthenticator):
 57    """
 58    Generates a JSON Web Token (JWT) based on a declarative connector configuration file. The generated token is attached to each request via the Authorization header.
 59
 60    Attributes:
 61        config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
 62        secret_key (Union[InterpolatedString, str]): The secret key used to sign the JWT
 63        algorithm (Union[str, JwtAlgorithm]): The algorithm used to sign the JWT
 64        token_duration (Optional[int]): The duration in seconds for which the token is valid
 65        base64_encode_secret_key (Optional[Union[InterpolatedBoolean, str, bool]]): Whether to base64 encode the secret key
 66        header_prefix (Optional[Union[InterpolatedString, str]]): The prefix to add to the Authorization header
 67        kid (Optional[Union[InterpolatedString, str]]): The key identifier to be included in the JWT header
 68        typ (Optional[Union[InterpolatedString, str]]): The type of the JWT.
 69        cty (Optional[Union[InterpolatedString, str]]): The content type of the JWT.
 70        iss (Optional[Union[InterpolatedString, str]]): The issuer of the JWT.
 71        sub (Optional[Union[InterpolatedString, str]]): The subject of the JWT.
 72        aud (Optional[Union[InterpolatedString, str]]): The audience of the JWT.
 73        additional_jwt_headers (Optional[Mapping[str, Any]]): Additional headers to include in the JWT.
 74        additional_jwt_payload (Optional[Mapping[str, Any]]): Additional payload to include in the JWT.
 75    """
 76
 77    config: Mapping[str, Any]
 78    parameters: InitVar[Mapping[str, Any]]
 79    secret_key: Union[InterpolatedString, str]
 80    algorithm: Union[str, JwtAlgorithm]
 81    token_duration: Optional[int]
 82    base64_encode_secret_key: Optional[Union[InterpolatedBoolean, str, bool]] = False
 83    header_prefix: Optional[Union[InterpolatedString, str]] = None
 84    kid: Optional[Union[InterpolatedString, str]] = None
 85    typ: Optional[Union[InterpolatedString, str]] = None
 86    cty: Optional[Union[InterpolatedString, str]] = None
 87    iss: Optional[Union[InterpolatedString, str]] = None
 88    sub: Optional[Union[InterpolatedString, str]] = None
 89    aud: Optional[Union[InterpolatedString, str]] = None
 90    additional_jwt_headers: Optional[Mapping[str, Any]] = None
 91    additional_jwt_payload: Optional[Mapping[str, Any]] = None
 92    passphrase: Optional[Union[InterpolatedString, str]] = None
 93    request_option: Optional[RequestOption] = None
 94
 95    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 96        self._secret_key = InterpolatedString.create(self.secret_key, parameters=parameters)
 97        self._algorithm = (
 98            JwtAlgorithm(self.algorithm) if isinstance(self.algorithm, str) else self.algorithm
 99        )
100        self._base64_encode_secret_key = (
101            InterpolatedBoolean(self.base64_encode_secret_key, parameters=parameters)
102            if isinstance(self.base64_encode_secret_key, str)
103            else self.base64_encode_secret_key
104        )
105        self._token_duration = self.token_duration
106        self._header_prefix = (
107            InterpolatedString.create(self.header_prefix, parameters=parameters)
108            if self.header_prefix
109            else None
110        )
111        self._kid = InterpolatedString.create(self.kid, parameters=parameters) if self.kid else None
112        self._typ = InterpolatedString.create(self.typ, parameters=parameters) if self.typ else None
113        self._cty = InterpolatedString.create(self.cty, parameters=parameters) if self.cty else None
114        self._iss = InterpolatedString.create(self.iss, parameters=parameters) if self.iss else None
115        self._sub = InterpolatedString.create(self.sub, parameters=parameters) if self.sub else None
116        self._aud = InterpolatedString.create(self.aud, parameters=parameters) if self.aud else None
117        self._additional_jwt_headers = InterpolatedMapping(
118            self.additional_jwt_headers or {}, parameters=parameters
119        )
120        self._additional_jwt_payload = InterpolatedMapping(
121            self.additional_jwt_payload or {}, parameters=parameters
122        )
123        self._passphrase = (
124            InterpolatedString.create(self.passphrase, parameters=parameters)
125            if self.passphrase
126            else None
127        )
128
129        # When we first implemented the JWT authenticator, we assumed that the signed token was always supposed
130        # to be loaded into the request headers under the `Authorization` key. This is not always the case, but
131        # this default option allows for backwards compatibility to be retained for existing connectors
132        self._request_option = self.request_option or RequestOption(
133            inject_into=RequestOptionType.header, field_name="Authorization", parameters=parameters
134        )
135
136    def _get_jwt_headers(self) -> dict[str, Any]:
137        """
138        Builds and returns the headers used when signing the JWT.
139        """
140        headers = self._additional_jwt_headers.eval(self.config, json_loads=json.loads)
141        if any(prop in headers for prop in ["kid", "alg", "typ", "cty"]):
142            raise ValueError(
143                "'kid', 'alg', 'typ', 'cty' are reserved headers and should not be set as part of 'additional_jwt_headers'"
144            )
145
146        if self._kid:
147            headers["kid"] = self._kid.eval(self.config, json_loads=json.loads)
148        if self._typ:
149            headers["typ"] = self._typ.eval(self.config, json_loads=json.loads)
150        if self._cty:
151            headers["cty"] = self._cty.eval(self.config, json_loads=json.loads)
152        headers["alg"] = self._algorithm
153        return headers
154
155    def _get_jwt_payload(self) -> dict[str, Any]:
156        """
157        Builds and returns the payload used when signing the JWT.
158        """
159        now = int(datetime.now().timestamp())
160        exp = now + self._token_duration if isinstance(self._token_duration, int) else now
161        nbf = now
162
163        payload = self._additional_jwt_payload.eval(self.config, json_loads=json.loads)
164        if any(prop in payload for prop in ["iss", "sub", "aud", "iat", "exp", "nbf"]):
165            raise ValueError(
166                "'iss', 'sub', 'aud', 'iat', 'exp', 'nbf' are reserved properties and should not be set as part of 'additional_jwt_payload'"
167            )
168
169        if self._iss:
170            payload["iss"] = self._iss.eval(self.config, json_loads=json.loads)
171        if self._sub:
172            payload["sub"] = self._sub.eval(self.config, json_loads=json.loads)
173        if self._aud:
174            payload["aud"] = self._aud.eval(self.config, json_loads=json.loads)
175
176        payload["iat"] = now
177        payload["exp"] = exp
178        payload["nbf"] = nbf
179        return payload
180
181    def _get_secret_key(self) -> JwtKeyTypes:
182        """
183        Returns the secret key used to sign the JWT.
184        """
185        secret_key: str = self._secret_key.eval(self.config, json_loads=json.loads)
186
187        if self._passphrase:
188            passphrase_value = self._passphrase.eval(self.config, json_loads=json.loads)
189            if passphrase_value:
190                private_key = serialization.load_pem_private_key(
191                    secret_key.encode(),
192                    password=passphrase_value.encode(),
193                )
194                return cast(JwtKeyTypes, private_key)
195
196        return (
197            base64.b64encode(secret_key.encode()).decode()
198            if self._base64_encode_secret_key
199            else secret_key
200        )
201
202    def _get_signed_token(self) -> Union[str, Any]:
203        """
204        Signed the JWT using the provided secret key and algorithm and the generated headers and payload. For additional information on PyJWT see: https://pyjwt.readthedocs.io/en/stable/
205        """
206        try:
207            return jwt.encode(
208                payload=self._get_jwt_payload(),
209                key=self._get_secret_key(),
210                algorithm=self._algorithm,
211                headers=self._get_jwt_headers(),
212            )
213        except Exception as e:
214            raise ValueError(f"Failed to sign token: {e}")
215
216    def _get_header_prefix(self) -> Union[str, None]:
217        """
218        Returns the header prefix to be used when attaching the token to the request.
219        """
220        return (
221            self._header_prefix.eval(self.config, json_loads=json.loads)
222            if self._header_prefix
223            else None
224        )
225
226    @property
227    def auth_header(self) -> str:
228        options = self._get_request_options(RequestOptionType.header)
229        return next(iter(options.keys()), "")
230
231    @property
232    def token(self) -> str:
233        return (
234            f"{self._get_header_prefix()} {self._get_signed_token()}"
235            if self._get_header_prefix()
236            else self._get_signed_token()
237        )
238
239    def get_request_params(self) -> Mapping[str, Any]:
240        return self._get_request_options(RequestOptionType.request_parameter)
241
242    def get_request_body_data(self) -> Union[Mapping[str, Any], str]:
243        return self._get_request_options(RequestOptionType.body_data)
244
245    def get_request_body_json(self) -> Mapping[str, Any]:
246        return self._get_request_options(RequestOptionType.body_json)
247
248    def _get_request_options(self, option_type: RequestOptionType) -> Mapping[str, Any]:
249        options: MutableMapping[str, Any] = {}
250        if self._request_option.inject_into == option_type:
251            self._request_option.inject_into_request(options, self.token, self.config)
252        return options

Generates a JSON Web Token (JWT) based on a declarative connector configuration file. The generated token is attached to each request via the Authorization header.

Attributes:
  • config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
  • secret_key (Union[InterpolatedString, str]): The secret key used to sign the JWT
  • algorithm (Union[str, JwtAlgorithm]): The algorithm used to sign the JWT
  • token_duration (Optional[int]): The duration in seconds for which the token is valid
  • base64_encode_secret_key (Optional[Union[InterpolatedBoolean, str, bool]]): Whether to base64 encode the secret key
  • header_prefix (Optional[Union[InterpolatedString, str]]): The prefix to add to the Authorization header
  • kid (Optional[Union[InterpolatedString, str]]): The key identifier to be included in the JWT header
  • typ (Optional[Union[InterpolatedString, str]]): The type of the JWT.
  • cty (Optional[Union[InterpolatedString, str]]): The content type of the JWT.
  • iss (Optional[Union[InterpolatedString, str]]): The issuer of the JWT.
  • sub (Optional[Union[InterpolatedString, str]]): The subject of the JWT.
  • aud (Optional[Union[InterpolatedString, str]]): The audience of the JWT.
  • additional_jwt_headers (Optional[Mapping[str, Any]]): Additional headers to include in the JWT.
  • additional_jwt_payload (Optional[Mapping[str, Any]]): Additional payload to include in the JWT.
JwtAuthenticator( config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], secret_key: Union[airbyte_cdk.InterpolatedString, str], algorithm: Union[str, JwtAlgorithm], token_duration: Optional[int], base64_encode_secret_key: Union[airbyte_cdk.InterpolatedBoolean, str, bool, NoneType] = False, header_prefix: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, kid: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, typ: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, cty: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, iss: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, sub: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, aud: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, additional_jwt_headers: Optional[Mapping[str, Any]] = None, additional_jwt_payload: Optional[Mapping[str, Any]] = None, passphrase: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, request_option: Optional[airbyte_cdk.RequestOption] = None)
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
secret_key: Union[airbyte_cdk.InterpolatedString, str]
algorithm: Union[str, JwtAlgorithm]
token_duration: Optional[int]
base64_encode_secret_key: Union[airbyte_cdk.InterpolatedBoolean, str, bool, NoneType] = False
header_prefix: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
kid: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
typ: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
cty: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
iss: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
sub: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
aud: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
additional_jwt_headers: Optional[Mapping[str, Any]] = None
additional_jwt_payload: Optional[Mapping[str, Any]] = None
passphrase: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
request_option: Optional[airbyte_cdk.RequestOption] = None
auth_header: str
226    @property
227    def auth_header(self) -> str:
228        options = self._get_request_options(RequestOptionType.header)
229        return next(iter(options.keys()), "")

HTTP header to set on the requests

token: str
231    @property
232    def token(self) -> str:
233        return (
234            f"{self._get_header_prefix()} {self._get_signed_token()}"
235            if self._get_header_prefix()
236            else self._get_signed_token()
237        )

The header value to set on outgoing HTTP requests

def get_request_params(self) -> Mapping[str, Any]:
239    def get_request_params(self) -> Mapping[str, Any]:
240        return self._get_request_options(RequestOptionType.request_parameter)

HTTP request parameter to add to the requests

def get_request_body_data(self) -> Union[Mapping[str, Any], str]:
242    def get_request_body_data(self) -> Union[Mapping[str, Any], str]:
243        return self._get_request_options(RequestOptionType.body_data)

Form-encoded body data to set on the requests

def get_request_body_json(self) -> Mapping[str, Any]:
245    def get_request_body_json(self) -> Mapping[str, Any]:
246        return self._get_request_options(RequestOptionType.body_json)

JSON-encoded body data to set on the requests