airbyte_cdk.sources.declarative.auth

1#
2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3#
4
5from airbyte_cdk.sources.declarative.auth.jwt import JwtAuthenticator
6from airbyte_cdk.sources.declarative.auth.oauth import DeclarativeOauth2Authenticator
7
8__all__ = ["DeclarativeOauth2Authenticator", "JwtAuthenticator"]
 27@dataclass
 28class DeclarativeOauth2Authenticator(AbstractOauth2Authenticator, DeclarativeAuthenticator):
 29    """
 30    Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials based on
 31    a declarative connector configuration file. Credentials can be defined explicitly or via interpolation
 32    at runtime. The generated access token is attached to each request via the Authorization header.
 33
 34    Attributes:
 35        token_refresh_endpoint (Union[InterpolatedString, str]): The endpoint to refresh the access token
 36        client_id (Union[InterpolatedString, str]): The client id
 37        client_secret (Union[InterpolatedString, str]): Client secret (can be empty for APIs that support this)
 38        refresh_token (Union[InterpolatedString, str]): The token used to refresh the access token
 39        access_token_name (Union[InterpolatedString, str]): THe field to extract access token from in the response
 40        expires_in_name (Union[InterpolatedString, str]): The field to extract expires_in from in the response
 41        config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
 42        scopes (Optional[List[str]]): The scopes to request
 43        token_expiry_date (Optional[Union[InterpolatedString, str]]): The access token expiration date
 44        token_expiry_date_format str: format of the datetime; provide it if expires_in is returned in datetime instead of seconds
 45        token_expiry_is_time_of_expiration bool: set True it if expires_in is returned as time of expiration instead of the number seconds until expiration
 46        refresh_request_body (Optional[Mapping[str, Any]]): The request body to send in the refresh request
 47        refresh_request_headers (Optional[Mapping[str, Any]]): The request headers to send in the refresh request
 48        send_refresh_request_as_query_params (bool): When True, the standard refresh args (`grant_type`, `refresh_token`, client credentials when not in an `Authorization` header, scopes, plus any `refresh_request_body` extras) are sent on the URL query string instead of in the request body, and the body is emitted empty. Use this for OAuth providers like Gong that document their refresh endpoint with refresh args on the URL query string. Defaults to False.
 49        grant_type: The grant_type to request for access_token. If set to refresh_token, the refresh_token parameter has to be provided
 50        message_repository (MessageRepository): the message repository used to emit logs on HTTP requests
 51        refresh_token_error_status_codes (Tuple[int, ...]): Status codes to identify refresh token errors in response
 52        refresh_token_error_key (str): Key to identify refresh token error in response
 53        refresh_token_error_values (Tuple[str, ...]): List of values to check for exception during token refresh process
 54    """
 55
 56    config: Mapping[str, Any]
 57    parameters: InitVar[Mapping[str, Any]]
 58    client_id: Optional[Union[InterpolatedString, str]] = None
 59    client_secret: Optional[Union[InterpolatedString, str]] = None
 60    token_refresh_endpoint: Optional[Union[InterpolatedString, str]] = None
 61    refresh_token: Optional[Union[InterpolatedString, str]] = None
 62    scopes: Optional[List[str]] = None
 63    token_expiry_date: Optional[Union[InterpolatedString, str]] = None
 64    _token_expiry_date: Optional[AirbyteDateTime] = field(init=False, repr=False, default=None)
 65    token_expiry_date_format: Optional[str] = None
 66    token_expiry_is_time_of_expiration: bool = False
 67    access_token_name: Union[InterpolatedString, str] = "access_token"
 68    access_token_value: Optional[Union[InterpolatedString, str]] = None
 69    client_id_name: Union[InterpolatedString, str] = "client_id"
 70    client_secret_name: Union[InterpolatedString, str] = "client_secret"
 71    expires_in_name: Union[InterpolatedString, str] = "expires_in"
 72    refresh_token_name: Union[InterpolatedString, str] = "refresh_token"
 73    refresh_request_body: Optional[Mapping[str, Any]] = None
 74    refresh_request_headers: Optional[Mapping[str, Any]] = None
 75    send_refresh_request_as_query_params: bool = False
 76    grant_type_name: Union[InterpolatedString, str] = "grant_type"
 77    grant_type: Union[InterpolatedString, str] = "refresh_token"
 78    message_repository: MessageRepository = NoopMessageRepository()
 79    profile_assertion: Optional[DeclarativeAuthenticator] = None
 80    use_profile_assertion: Optional[Union[InterpolatedBoolean, str, bool]] = False
 81    refresh_token_error_status_codes: Tuple[int, ...] = ()
 82    refresh_token_error_key: str = ""
 83    refresh_token_error_values: Tuple[str, ...] = ()
 84
 85    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 86        super().__init__(
 87            refresh_token_error_status_codes=self.refresh_token_error_status_codes,
 88            refresh_token_error_key=self.refresh_token_error_key,
 89            refresh_token_error_values=self.refresh_token_error_values,
 90        )
 91        if self.token_refresh_endpoint is not None:
 92            self._token_refresh_endpoint: Optional[InterpolatedString] = InterpolatedString.create(
 93                self.token_refresh_endpoint, parameters=parameters
 94            )
 95        else:
 96            self._token_refresh_endpoint = None
 97        self._client_id_name = InterpolatedString.create(self.client_id_name, parameters=parameters)
 98        self._client_id = (
 99            InterpolatedString.create(self.client_id, parameters=parameters)
100            if self.client_id
101            else self.client_id
102        )
103        self._client_secret_name = InterpolatedString.create(
104            self.client_secret_name, parameters=parameters
105        )
106        self._client_secret = (
107            InterpolatedString.create(self.client_secret, parameters=parameters)
108            if self.client_secret
109            else self.client_secret
110        )
111        self._refresh_token_name = InterpolatedString.create(
112            self.refresh_token_name, parameters=parameters
113        )
114        if self.refresh_token is not None:
115            self._refresh_token: Optional[InterpolatedString] = InterpolatedString.create(
116                self.refresh_token, parameters=parameters
117            )
118        else:
119            self._refresh_token = None
120        self.access_token_name = InterpolatedString.create(
121            self.access_token_name, parameters=parameters
122        )
123        self.expires_in_name = InterpolatedString.create(
124            self.expires_in_name, parameters=parameters
125        )
126        self.grant_type_name = InterpolatedString.create(
127            self.grant_type_name, parameters=parameters
128        )
129        self.grant_type = InterpolatedString.create(
130            "urn:ietf:params:oauth:grant-type:jwt-bearer"
131            if self.use_profile_assertion
132            else self.grant_type,
133            parameters=parameters,
134        )
135        self._refresh_request_body = InterpolatedMapping(
136            self.refresh_request_body or {}, parameters=parameters
137        )
138        self._refresh_request_headers = InterpolatedMapping(
139            self.refresh_request_headers or {}, parameters=parameters
140        )
141        self._send_refresh_request_as_query_params = self.send_refresh_request_as_query_params
142        try:
143            if (
144                isinstance(self.token_expiry_date, (int, str))
145                and str(self.token_expiry_date).isdigit()
146            ):
147                self._token_expiry_date = ab_datetime_parse(self.token_expiry_date)
148            else:
149                self._token_expiry_date = (
150                    ab_datetime_parse(
151                        InterpolatedString.create(
152                            self.token_expiry_date, parameters=parameters
153                        ).eval(self.config)
154                    )
155                    if self.token_expiry_date
156                    else ab_datetime_now() - timedelta(days=1)
157                )
158        except ValueError as e:
159            raise ValueError(f"Invalid token expiry date format: {e}")
160        self.use_profile_assertion = (
161            InterpolatedBoolean(self.use_profile_assertion, parameters=parameters)
162            if isinstance(self.use_profile_assertion, str)
163            else self.use_profile_assertion
164        )
165        self.assertion_name = "assertion"
166
167        if self.access_token_value is not None:
168            self._access_token_value = InterpolatedString.create(
169                self.access_token_value, parameters=parameters
170            ).eval(self.config)
171        else:
172            self._access_token_value = None
173
174        self._access_token: Optional[str] = (
175            self._access_token_value if self.access_token_value else None
176        )
177
178        if not self.use_profile_assertion and any(
179            client_creds is None for client_creds in [self.client_id, self.client_secret]
180        ):
181            raise ValueError(
182                "OAuthAuthenticator configuration error: Both 'client_id' and 'client_secret' are required for the "
183                "basic OAuth flow."
184            )
185        if self.profile_assertion is None and self.use_profile_assertion:
186            raise ValueError(
187                "OAuthAuthenticator configuration error: 'profile_assertion' is required when using the profile assertion flow."
188            )
189        if self.get_grant_type() == "refresh_token" and self._refresh_token is None:
190            raise ValueError(
191                "OAuthAuthenticator configuration error: A 'refresh_token' is required when the 'grant_type' is set to 'refresh_token'."
192            )
193
194    def get_token_refresh_endpoint(self) -> Optional[str]:
195        if self._token_refresh_endpoint is not None:
196            refresh_token_endpoint: str = self._token_refresh_endpoint.eval(self.config)
197            if not refresh_token_endpoint:
198                raise ValueError(
199                    "OAuthAuthenticator was unable to evaluate token_refresh_endpoint parameter"
200                )
201            return refresh_token_endpoint
202        return None
203
204    def get_client_id_name(self) -> str:
205        return self._client_id_name.eval(self.config)  # type: ignore # eval returns a string in this context
206
207    def get_client_id(self) -> str:
208        client_id = self._client_id.eval(self.config) if self._client_id else self._client_id
209        if not client_id:
210            raise ValueError("OAuthAuthenticator was unable to evaluate client_id parameter")
211        return client_id  # type: ignore # value will be returned as a string, or an error will be raised
212
213    def get_client_secret_name(self) -> str:
214        return self._client_secret_name.eval(self.config)  # type: ignore # eval returns a string in this context
215
216    def get_client_secret(self) -> str:
217        client_secret = (
218            self._client_secret.eval(self.config) if self._client_secret else self._client_secret
219        )
220        if not client_secret:
221            # We've seen some APIs allowing empty client_secret so we will only log here
222            logger.warning(
223                "OAuthAuthenticator was unable to evaluate client_secret parameter hence it'll be empty"
224            )
225        return client_secret  # type: ignore # value will be returned as a string, which might be empty
226
227    def get_refresh_token_name(self) -> str:
228        return self._refresh_token_name.eval(self.config)  # type: ignore # eval returns a string in this context
229
230    def get_refresh_token(self) -> Optional[str]:
231        return None if self._refresh_token is None else str(self._refresh_token.eval(self.config))
232
233    def get_scopes(self) -> List[str]:
234        return self.scopes or []
235
236    def get_access_token_name(self) -> str:
237        return self.access_token_name.eval(self.config)  # type: ignore # eval returns a string in this context
238
239    def get_expires_in_name(self) -> str:
240        return self.expires_in_name.eval(self.config)  # type: ignore # eval returns a string in this context
241
242    def get_grant_type_name(self) -> str:
243        return self.grant_type_name.eval(self.config)  # type: ignore # eval returns a string in this context
244
245    def get_grant_type(self) -> str:
246        return self.grant_type.eval(self.config)  # type: ignore # eval returns a string in this context
247
248    def get_refresh_request_body(self) -> Mapping[str, Any]:
249        return self._refresh_request_body.eval(self.config)
250
251    def get_refresh_request_headers(self) -> Mapping[str, Any]:
252        return self._refresh_request_headers.eval(self.config)
253
254    def should_send_refresh_request_as_query_params(self) -> bool:
255        return self._send_refresh_request_as_query_params
256
257    def get_token_expiry_date(self) -> AirbyteDateTime:
258        if not self._has_access_token_been_initialized():
259            return AirbyteDateTime.from_datetime(datetime.min)
260        return self._token_expiry_date  # type: ignore # _token_expiry_date is an AirbyteDateTime. It is never None despite what mypy thinks
261
262    def _has_access_token_been_initialized(self) -> bool:
263        return self._access_token is not None
264
265    def set_token_expiry_date(self, value: AirbyteDateTime) -> None:
266        self._token_expiry_date = value
267
268    def get_assertion_name(self) -> str:
269        return self.assertion_name
270
271    def get_assertion(self) -> str:
272        if self.profile_assertion is None:
273            raise ValueError("profile_assertion is not set")
274        return self.profile_assertion.token
275
276    def build_refresh_request_body(self) -> Mapping[str, Any]:
277        """
278        Returns the request body to set on the refresh request
279
280        Override to define additional parameters
281        """
282        if self.use_profile_assertion:
283            return {
284                self.get_grant_type_name(): self.get_grant_type(),
285                self.get_assertion_name(): self.get_assertion(),
286            }
287        return super().build_refresh_request_body()
288
289    @property
290    def access_token(self) -> str:
291        if self._access_token is None:
292            raise ValueError("access_token is not set")
293        return self._access_token
294
295    @access_token.setter
296    def access_token(self, value: str) -> None:
297        self._access_token = value
298
299    @property
300    def _message_repository(self) -> MessageRepository:
301        """
302        Overriding AbstractOauth2Authenticator._message_repository to allow for HTTP request logs
303        """
304        return self.message_repository

Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials based on a declarative connector configuration file. Credentials can be defined explicitly or via interpolation at runtime. The generated access token is attached to each request via the Authorization header.

Attributes:
  • token_refresh_endpoint (Union[InterpolatedString, str]): The endpoint to refresh the access token
  • client_id (Union[InterpolatedString, str]): The client id
  • client_secret (Union[InterpolatedString, str]): Client secret (can be empty for APIs that support this)
  • refresh_token (Union[InterpolatedString, str]): The token used to refresh the access token
  • access_token_name (Union[InterpolatedString, str]): THe field to extract access token from in the response
  • expires_in_name (Union[InterpolatedString, str]): The field to extract expires_in from in the response
  • config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
  • scopes (Optional[List[str]]): The scopes to request
  • token_expiry_date (Optional[Union[InterpolatedString, str]]): The access token expiration date
  • token_expiry_date_format str: format of the datetime; provide it if expires_in is returned in datetime instead of seconds
  • token_expiry_is_time_of_expiration bool: set True it if expires_in is returned as time of expiration instead of the number seconds until expiration
  • refresh_request_body (Optional[Mapping[str, Any]]): The request body to send in the refresh request
  • refresh_request_headers (Optional[Mapping[str, Any]]): The request headers to send in the refresh request
  • send_refresh_request_as_query_params (bool): When True, the standard refresh args (grant_type, refresh_token, client credentials when not in an Authorization header, scopes, plus any refresh_request_body extras) are sent on the URL query string instead of in the request body, and the body is emitted empty. Use this for OAuth providers like Gong that document their refresh endpoint with refresh args on the URL query string. Defaults to False.
  • grant_type: The grant_type to request for access_token. If set to refresh_token, the refresh_token parameter has to be provided
  • message_repository (MessageRepository): the message repository used to emit logs on HTTP requests
  • refresh_token_error_status_codes (Tuple[int, ...]): Status codes to identify refresh token errors in response
  • refresh_token_error_key (str): Key to identify refresh token error in response
  • refresh_token_error_values (Tuple[str, ...]): List of values to check for exception during token refresh process
DeclarativeOauth2Authenticator( config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], client_id: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, client_secret: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, token_refresh_endpoint: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, refresh_token: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, scopes: Optional[List[str]] = None, token_expiry_date: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, token_expiry_date_format: Optional[str] = None, token_expiry_is_time_of_expiration: bool = False, access_token_name: Union[airbyte_cdk.InterpolatedString, str] = 'access_token', access_token_value: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, client_id_name: Union[airbyte_cdk.InterpolatedString, str] = 'client_id', client_secret_name: Union[airbyte_cdk.InterpolatedString, str] = 'client_secret', expires_in_name: Union[airbyte_cdk.InterpolatedString, str] = 'expires_in', refresh_token_name: Union[airbyte_cdk.InterpolatedString, str] = 'refresh_token', refresh_request_body: Optional[Mapping[str, Any]] = None, refresh_request_headers: Optional[Mapping[str, Any]] = None, send_refresh_request_as_query_params: bool = False, grant_type_name: Union[airbyte_cdk.InterpolatedString, str] = 'grant_type', grant_type: Union[airbyte_cdk.InterpolatedString, str] = 'refresh_token', message_repository: airbyte_cdk.MessageRepository = <airbyte_cdk.sources.message.NoopMessageRepository object>, profile_assertion: Optional[airbyte_cdk.DeclarativeAuthenticator] = None, use_profile_assertion: Union[airbyte_cdk.InterpolatedBoolean, str, bool, NoneType] = False, refresh_token_error_status_codes: Tuple[int, ...] = (), refresh_token_error_key: str = '', refresh_token_error_values: Tuple[str, ...] = ())

If all of refresh_token_error_status_codes, refresh_token_error_key, and refresh_token_error_values are set, then http errors with such params will be wrapped in AirbyteTracedException.

config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
client_id: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
client_secret: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
token_refresh_endpoint: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
refresh_token: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
scopes: Optional[List[str]] = None
token_expiry_date: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
token_expiry_date_format: Optional[str] = None

Format of the datetime; exists it if expires_in is returned as the expiration datetime instead of seconds until it expires

token_expiry_is_time_of_expiration: bool = False

Indicates that the Token Expiry returns the date until which the token will be valid, not the amount of time it will be valid.

access_token_name: Union[airbyte_cdk.InterpolatedString, str] = 'access_token'
access_token_value: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
client_id_name: Union[airbyte_cdk.InterpolatedString, str] = 'client_id'
client_secret_name: Union[airbyte_cdk.InterpolatedString, str] = 'client_secret'
expires_in_name: Union[airbyte_cdk.InterpolatedString, str] = 'expires_in'
refresh_token_name: Union[airbyte_cdk.InterpolatedString, str] = 'refresh_token'
refresh_request_body: Optional[Mapping[str, Any]] = None
refresh_request_headers: Optional[Mapping[str, Any]] = None
send_refresh_request_as_query_params: bool = False
grant_type_name: Union[airbyte_cdk.InterpolatedString, str] = 'grant_type'
grant_type: Union[airbyte_cdk.InterpolatedString, str] = 'refresh_token'
profile_assertion: Optional[airbyte_cdk.DeclarativeAuthenticator] = None
use_profile_assertion: Union[airbyte_cdk.InterpolatedBoolean, str, bool, NoneType] = False
refresh_token_error_status_codes: Tuple[int, ...] = ()
refresh_token_error_key: str = ''
refresh_token_error_values: Tuple[str, ...] = ()
def get_token_refresh_endpoint(self) -> Optional[str]:
194    def get_token_refresh_endpoint(self) -> Optional[str]:
195        if self._token_refresh_endpoint is not None:
196            refresh_token_endpoint: str = self._token_refresh_endpoint.eval(self.config)
197            if not refresh_token_endpoint:
198                raise ValueError(
199                    "OAuthAuthenticator was unable to evaluate token_refresh_endpoint parameter"
200                )
201            return refresh_token_endpoint
202        return None

Returns the endpoint to refresh the access token

def get_client_id_name(self) -> str:
204    def get_client_id_name(self) -> str:
205        return self._client_id_name.eval(self.config)  # type: ignore # eval returns a string in this context

The client id name to authenticate

def get_client_id(self) -> str:
207    def get_client_id(self) -> str:
208        client_id = self._client_id.eval(self.config) if self._client_id else self._client_id
209        if not client_id:
210            raise ValueError("OAuthAuthenticator was unable to evaluate client_id parameter")
211        return client_id  # type: ignore # value will be returned as a string, or an error will be raised

The client id to authenticate

def get_client_secret_name(self) -> str:
213    def get_client_secret_name(self) -> str:
214        return self._client_secret_name.eval(self.config)  # type: ignore # eval returns a string in this context

The client secret name to authenticate

def get_client_secret(self) -> str:
216    def get_client_secret(self) -> str:
217        client_secret = (
218            self._client_secret.eval(self.config) if self._client_secret else self._client_secret
219        )
220        if not client_secret:
221            # We've seen some APIs allowing empty client_secret so we will only log here
222            logger.warning(
223                "OAuthAuthenticator was unable to evaluate client_secret parameter hence it'll be empty"
224            )
225        return client_secret  # type: ignore # value will be returned as a string, which might be empty

The client secret to authenticate

def get_refresh_token_name(self) -> str:
227    def get_refresh_token_name(self) -> str:
228        return self._refresh_token_name.eval(self.config)  # type: ignore # eval returns a string in this context

The refresh token name to authenticate

def get_refresh_token(self) -> Optional[str]:
230    def get_refresh_token(self) -> Optional[str]:
231        return None if self._refresh_token is None else str(self._refresh_token.eval(self.config))

The token used to refresh the access token when it expires

def get_scopes(self) -> List[str]:
233    def get_scopes(self) -> List[str]:
234        return self.scopes or []

List of requested scopes

def get_access_token_name(self) -> str:
236    def get_access_token_name(self) -> str:
237        return self.access_token_name.eval(self.config)  # type: ignore # eval returns a string in this context

Field to extract access token from in the response

def get_expires_in_name(self) -> str:
239    def get_expires_in_name(self) -> str:
240        return self.expires_in_name.eval(self.config)  # type: ignore # eval returns a string in this context

Returns the expires_in field name

def get_grant_type_name(self) -> str:
242    def get_grant_type_name(self) -> str:
243        return self.grant_type_name.eval(self.config)  # type: ignore # eval returns a string in this context

Returns grant_type specified name for requesting access_token

def get_grant_type(self) -> str:
245    def get_grant_type(self) -> str:
246        return self.grant_type.eval(self.config)  # type: ignore # eval returns a string in this context

Returns grant_type specified for requesting access_token

def get_refresh_request_body(self) -> Mapping[str, Any]:
248    def get_refresh_request_body(self) -> Mapping[str, Any]:
249        return self._refresh_request_body.eval(self.config)

Returns the request body to set on the refresh request

def get_refresh_request_headers(self) -> Mapping[str, Any]:
251    def get_refresh_request_headers(self) -> Mapping[str, Any]:
252        return self._refresh_request_headers.eval(self.config)

Returns the request headers to set on the refresh request

def should_send_refresh_request_as_query_params(self) -> bool:
254    def should_send_refresh_request_as_query_params(self) -> bool:
255        return self._send_refresh_request_as_query_params

Returns True if the standard refresh args should be sent on the URL query string instead of in the request body.

Defaults to False so existing authenticators retain their previous behavior (params in body, no query params on the refresh URL). Subclasses can override this to opt into the URL-query-string shape required by OAuth providers like Gong.

def get_token_expiry_date(self) -> airbyte_cdk.utils.datetime_helpers.AirbyteDateTime:
257    def get_token_expiry_date(self) -> AirbyteDateTime:
258        if not self._has_access_token_been_initialized():
259            return AirbyteDateTime.from_datetime(datetime.min)
260        return self._token_expiry_date  # type: ignore # _token_expiry_date is an AirbyteDateTime. It is never None despite what mypy thinks

Expiration date of the access token

def set_token_expiry_date(self, value: airbyte_cdk.utils.datetime_helpers.AirbyteDateTime) -> None:
265    def set_token_expiry_date(self, value: AirbyteDateTime) -> None:
266        self._token_expiry_date = value

Setter for access token expiration date

def get_assertion_name(self) -> str:
268    def get_assertion_name(self) -> str:
269        return self.assertion_name
def get_assertion(self) -> str:
271    def get_assertion(self) -> str:
272        if self.profile_assertion is None:
273            raise ValueError("profile_assertion is not set")
274        return self.profile_assertion.token
def build_refresh_request_body(self) -> Mapping[str, Any]:
276    def build_refresh_request_body(self) -> Mapping[str, Any]:
277        """
278        Returns the request body to set on the refresh request
279
280        Override to define additional parameters
281        """
282        if self.use_profile_assertion:
283            return {
284                self.get_grant_type_name(): self.get_grant_type(),
285                self.get_assertion_name(): self.get_assertion(),
286            }
287        return super().build_refresh_request_body()

Returns the request body to set on the refresh request

Override to define additional parameters

access_token: str
289    @property
290    def access_token(self) -> str:
291        if self._access_token is None:
292            raise ValueError("access_token is not set")
293        return self._access_token

Returns the access token

 55@dataclass
 56class JwtAuthenticator(DeclarativeAuthenticator):
 57    """
 58    Generates a JSON Web Token (JWT) based on a declarative connector configuration file. The generated token is attached to each request via the Authorization header.
 59
 60    Attributes:
 61        config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
 62        secret_key (Union[InterpolatedString, str]): The secret key used to sign the JWT
 63        algorithm (Union[str, JwtAlgorithm]): The algorithm used to sign the JWT
 64        token_duration (Optional[int]): The duration in seconds for which the token is valid
 65        base64_encode_secret_key (Optional[Union[InterpolatedBoolean, str, bool]]): Whether to base64 encode the secret key
 66        header_prefix (Optional[Union[InterpolatedString, str]]): The prefix to add to the Authorization header
 67        kid (Optional[Union[InterpolatedString, str]]): The key identifier to be included in the JWT header
 68        typ (Optional[Union[InterpolatedString, str]]): The type of the JWT.
 69        cty (Optional[Union[InterpolatedString, str]]): The content type of the JWT.
 70        iss (Optional[Union[InterpolatedString, str]]): The issuer of the JWT.
 71        sub (Optional[Union[InterpolatedString, str]]): The subject of the JWT.
 72        aud (Optional[Union[InterpolatedString, str]]): The audience of the JWT.
 73        additional_jwt_headers (Optional[Mapping[str, Any]]): Additional headers to include in the JWT.
 74        additional_jwt_payload (Optional[Mapping[str, Any]]): Additional payload to include in the JWT.
 75    """
 76
 77    config: Mapping[str, Any]
 78    parameters: InitVar[Mapping[str, Any]]
 79    secret_key: Union[InterpolatedString, str]
 80    algorithm: Union[str, JwtAlgorithm]
 81    token_duration: Optional[int]
 82    base64_encode_secret_key: Optional[Union[InterpolatedBoolean, str, bool]] = False
 83    header_prefix: Optional[Union[InterpolatedString, str]] = None
 84    kid: Optional[Union[InterpolatedString, str]] = None
 85    typ: Optional[Union[InterpolatedString, str]] = None
 86    cty: Optional[Union[InterpolatedString, str]] = None
 87    iss: Optional[Union[InterpolatedString, str]] = None
 88    sub: Optional[Union[InterpolatedString, str]] = None
 89    aud: Optional[Union[InterpolatedString, str]] = None
 90    additional_jwt_headers: Optional[Mapping[str, Any]] = None
 91    additional_jwt_payload: Optional[Mapping[str, Any]] = None
 92    passphrase: Optional[Union[InterpolatedString, str]] = None
 93    request_option: Optional[RequestOption] = None
 94
 95    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 96        self._secret_key = InterpolatedString.create(self.secret_key, parameters=parameters)
 97        self._algorithm = (
 98            JwtAlgorithm(self.algorithm) if isinstance(self.algorithm, str) else self.algorithm
 99        )
100        self._base64_encode_secret_key = (
101            InterpolatedBoolean(self.base64_encode_secret_key, parameters=parameters)
102            if isinstance(self.base64_encode_secret_key, str)
103            else self.base64_encode_secret_key
104        )
105        self._token_duration = self.token_duration
106        self._header_prefix = (
107            InterpolatedString.create(self.header_prefix, parameters=parameters)
108            if self.header_prefix
109            else None
110        )
111        self._kid = InterpolatedString.create(self.kid, parameters=parameters) if self.kid else None
112        self._typ = InterpolatedString.create(self.typ, parameters=parameters) if self.typ else None
113        self._cty = InterpolatedString.create(self.cty, parameters=parameters) if self.cty else None
114        self._iss = InterpolatedString.create(self.iss, parameters=parameters) if self.iss else None
115        self._sub = InterpolatedString.create(self.sub, parameters=parameters) if self.sub else None
116        self._aud = InterpolatedString.create(self.aud, parameters=parameters) if self.aud else None
117        self._additional_jwt_headers = InterpolatedMapping(
118            self.additional_jwt_headers or {}, parameters=parameters
119        )
120        self._additional_jwt_payload = InterpolatedMapping(
121            self.additional_jwt_payload or {}, parameters=parameters
122        )
123        self._passphrase = (
124            InterpolatedString.create(self.passphrase, parameters=parameters)
125            if self.passphrase
126            else None
127        )
128
129        # When we first implemented the JWT authenticator, we assumed that the signed token was always supposed
130        # to be loaded into the request headers under the `Authorization` key. This is not always the case, but
131        # this default option allows for backwards compatibility to be retained for existing connectors
132        self._request_option = self.request_option or RequestOption(
133            inject_into=RequestOptionType.header, field_name="Authorization", parameters=parameters
134        )
135
136    def _get_jwt_headers(self) -> dict[str, Any]:
137        """
138        Builds and returns the headers used when signing the JWT.
139        """
140        headers = self._additional_jwt_headers.eval(self.config, json_loads=json.loads)
141        if any(prop in headers for prop in ["kid", "alg", "typ", "cty"]):
142            raise ValueError(
143                "'kid', 'alg', 'typ', 'cty' are reserved headers and should not be set as part of 'additional_jwt_headers'"
144            )
145
146        if self._kid:
147            headers["kid"] = self._kid.eval(self.config, json_loads=json.loads)
148        if self._typ:
149            headers["typ"] = self._typ.eval(self.config, json_loads=json.loads)
150        if self._cty:
151            headers["cty"] = self._cty.eval(self.config, json_loads=json.loads)
152        headers["alg"] = self._algorithm
153        return headers
154
155    def _get_jwt_payload(self) -> dict[str, Any]:
156        """
157        Builds and returns the payload used when signing the JWT.
158        """
159        now = int(datetime.now().timestamp())
160        exp = now + self._token_duration if isinstance(self._token_duration, int) else now
161        nbf = now
162
163        payload = self._additional_jwt_payload.eval(self.config, json_loads=json.loads)
164        if any(prop in payload for prop in ["iss", "sub", "aud", "iat", "exp", "nbf"]):
165            raise ValueError(
166                "'iss', 'sub', 'aud', 'iat', 'exp', 'nbf' are reserved properties and should not be set as part of 'additional_jwt_payload'"
167            )
168
169        if self._iss:
170            payload["iss"] = self._iss.eval(self.config, json_loads=json.loads)
171        if self._sub:
172            payload["sub"] = self._sub.eval(self.config, json_loads=json.loads)
173        if self._aud:
174            payload["aud"] = self._aud.eval(self.config, json_loads=json.loads)
175
176        payload["iat"] = now
177        payload["exp"] = exp
178        payload["nbf"] = nbf
179        return payload
180
181    def _get_secret_key(self) -> JwtKeyTypes:
182        """
183        Returns the secret key used to sign the JWT.
184        """
185        secret_key: str = self._secret_key.eval(self.config, json_loads=json.loads)
186
187        if self._passphrase:
188            passphrase_value = self._passphrase.eval(self.config, json_loads=json.loads)
189            if passphrase_value:
190                private_key = serialization.load_pem_private_key(
191                    secret_key.encode(),
192                    password=passphrase_value.encode(),
193                )
194                return cast(JwtKeyTypes, private_key)
195
196        return (
197            base64.b64encode(secret_key.encode()).decode()
198            if self._base64_encode_secret_key
199            else secret_key
200        )
201
202    def _get_signed_token(self) -> Union[str, Any]:
203        """
204        Signed the JWT using the provided secret key and algorithm and the generated headers and payload. For additional information on PyJWT see: https://pyjwt.readthedocs.io/en/stable/
205        """
206        try:
207            return jwt.encode(
208                payload=self._get_jwt_payload(),
209                key=self._get_secret_key(),
210                algorithm=self._algorithm,
211                headers=self._get_jwt_headers(),
212            )
213        except Exception as e:
214            raise ValueError(f"Failed to sign token: {e}")
215
216    def _get_header_prefix(self) -> Union[str, None]:
217        """
218        Returns the header prefix to be used when attaching the token to the request.
219        """
220        return (
221            self._header_prefix.eval(self.config, json_loads=json.loads)
222            if self._header_prefix
223            else None
224        )
225
226    @property
227    def auth_header(self) -> str:
228        options = self._get_request_options(RequestOptionType.header)
229        return next(iter(options.keys()), "")
230
231    @property
232    def token(self) -> str:
233        return (
234            f"{self._get_header_prefix()} {self._get_signed_token()}"
235            if self._get_header_prefix()
236            else self._get_signed_token()
237        )
238
239    def get_request_params(self) -> Mapping[str, Any]:
240        return self._get_request_options(RequestOptionType.request_parameter)
241
242    def get_request_body_data(self) -> Union[Mapping[str, Any], str]:
243        return self._get_request_options(RequestOptionType.body_data)
244
245    def get_request_body_json(self) -> Mapping[str, Any]:
246        return self._get_request_options(RequestOptionType.body_json)
247
248    def _get_request_options(self, option_type: RequestOptionType) -> Mapping[str, Any]:
249        options: MutableMapping[str, Any] = {}
250        if self._request_option.inject_into == option_type:
251            self._request_option.inject_into_request(options, self.token, self.config)
252        return options

Generates a JSON Web Token (JWT) based on a declarative connector configuration file. The generated token is attached to each request via the Authorization header.

Attributes:
  • config (Mapping[str, Any]): The user-provided configuration as specified by the source's spec
  • secret_key (Union[InterpolatedString, str]): The secret key used to sign the JWT
  • algorithm (Union[str, JwtAlgorithm]): The algorithm used to sign the JWT
  • token_duration (Optional[int]): The duration in seconds for which the token is valid
  • base64_encode_secret_key (Optional[Union[InterpolatedBoolean, str, bool]]): Whether to base64 encode the secret key
  • header_prefix (Optional[Union[InterpolatedString, str]]): The prefix to add to the Authorization header
  • kid (Optional[Union[InterpolatedString, str]]): The key identifier to be included in the JWT header
  • typ (Optional[Union[InterpolatedString, str]]): The type of the JWT.
  • cty (Optional[Union[InterpolatedString, str]]): The content type of the JWT.
  • iss (Optional[Union[InterpolatedString, str]]): The issuer of the JWT.
  • sub (Optional[Union[InterpolatedString, str]]): The subject of the JWT.
  • aud (Optional[Union[InterpolatedString, str]]): The audience of the JWT.
  • additional_jwt_headers (Optional[Mapping[str, Any]]): Additional headers to include in the JWT.
  • additional_jwt_payload (Optional[Mapping[str, Any]]): Additional payload to include in the JWT.
JwtAuthenticator( config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], secret_key: Union[airbyte_cdk.InterpolatedString, str], algorithm: Union[str, airbyte_cdk.sources.declarative.auth.jwt.JwtAlgorithm], token_duration: Optional[int], base64_encode_secret_key: Union[airbyte_cdk.InterpolatedBoolean, str, bool, NoneType] = False, header_prefix: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, kid: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, typ: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, cty: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, iss: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, sub: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, aud: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, additional_jwt_headers: Optional[Mapping[str, Any]] = None, additional_jwt_payload: Optional[Mapping[str, Any]] = None, passphrase: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None, request_option: Optional[airbyte_cdk.RequestOption] = None)
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
secret_key: Union[airbyte_cdk.InterpolatedString, str]
token_duration: Optional[int]
base64_encode_secret_key: Union[airbyte_cdk.InterpolatedBoolean, str, bool, NoneType] = False
header_prefix: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
kid: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
typ: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
cty: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
iss: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
sub: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
aud: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
additional_jwt_headers: Optional[Mapping[str, Any]] = None
additional_jwt_payload: Optional[Mapping[str, Any]] = None
passphrase: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
request_option: Optional[airbyte_cdk.RequestOption] = None
auth_header: str
226    @property
227    def auth_header(self) -> str:
228        options = self._get_request_options(RequestOptionType.header)
229        return next(iter(options.keys()), "")

HTTP header to set on the requests

token: str
231    @property
232    def token(self) -> str:
233        return (
234            f"{self._get_header_prefix()} {self._get_signed_token()}"
235            if self._get_header_prefix()
236            else self._get_signed_token()
237        )

The header value to set on outgoing HTTP requests

def get_request_params(self) -> Mapping[str, Any]:
239    def get_request_params(self) -> Mapping[str, Any]:
240        return self._get_request_options(RequestOptionType.request_parameter)

HTTP request parameter to add to the requests

def get_request_body_data(self) -> Union[Mapping[str, Any], str]:
242    def get_request_body_data(self) -> Union[Mapping[str, Any], str]:
243        return self._get_request_options(RequestOptionType.body_data)

Form-encoded body data to set on the requests

def get_request_body_json(self) -> Mapping[str, Any]:
245    def get_request_body_json(self) -> Mapping[str, Any]:
246        return self._get_request_options(RequestOptionType.body_json)

JSON-encoded body data to set on the requests