airbyte_cdk.sources.declarative.auth.token

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5import base64
  6import logging
  7from dataclasses import InitVar, dataclass
  8from typing import Any, Mapping, MutableMapping, Union
  9
 10import requests
 11from cachetools import TTLCache, cached
 12
 13from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator
 14from airbyte_cdk.sources.declarative.auth.token_provider import TokenProvider
 15from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
 16from airbyte_cdk.sources.declarative.requesters.request_option import (
 17    RequestOption,
 18    RequestOptionType,
 19)
 20from airbyte_cdk.sources.types import Config
 21
 22
 23@dataclass
 24class ApiKeyAuthenticator(DeclarativeAuthenticator):
 25    """
 26    ApiKeyAuth sets a request header on the HTTP requests sent.
 27
 28    The header is of the form:
 29    `"<header>": "<token>"`
 30
 31    For example,
 32    `ApiKeyAuthenticator("Authorization", "Bearer hello")`
 33    will result in the following header set on the HTTP request
 34    `"Authorization": "Bearer hello"`
 35
 36    Attributes:
 37        request_option (RequestOption): request option how to inject the token into the request
 38        token_provider (TokenProvider): Provider of the token
 39        config (Config): The user-provided configuration as specified by the source's spec
 40        parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
 41    """
 42
 43    request_option: RequestOption
 44    token_provider: TokenProvider
 45    config: Config
 46    parameters: InitVar[Mapping[str, Any]]
 47
 48    @property
 49    def auth_header(self) -> str:
 50        options = self._get_request_options(RequestOptionType.header)
 51        return next(iter(options.keys()), "")
 52
 53    @property
 54    def token(self) -> str:
 55        return self.token_provider.get_token()
 56
 57    def _get_request_options(self, option_type: RequestOptionType) -> Mapping[str, Any]:
 58        options: MutableMapping[str, Any] = {}
 59        if self.request_option.inject_into == option_type:
 60            self.request_option.inject_into_request(options, self.token, self.config)
 61        return options
 62
 63    def get_request_params(self) -> Mapping[str, Any]:
 64        return self._get_request_options(RequestOptionType.request_parameter)
 65
 66    def get_request_body_data(self) -> Union[Mapping[str, Any], str]:
 67        return self._get_request_options(RequestOptionType.body_data)
 68
 69    def get_request_body_json(self) -> Mapping[str, Any]:
 70        return self._get_request_options(RequestOptionType.body_json)
 71
 72
 73@dataclass
 74class BearerAuthenticator(DeclarativeAuthenticator):
 75    """
 76    Authenticator that sets the Authorization header on the HTTP requests sent.
 77
 78    The header is of the form:
 79    `"Authorization": "Bearer <token>"`
 80
 81    Attributes:
 82        token_provider (TokenProvider): Provider of the token
 83        config (Config): The user-provided configuration as specified by the source's spec
 84        parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
 85    """
 86
 87    token_provider: TokenProvider
 88    config: Config
 89    parameters: InitVar[Mapping[str, Any]]
 90
 91    @property
 92    def auth_header(self) -> str:
 93        return "Authorization"
 94
 95    @property
 96    def token(self) -> str:
 97        return f"Bearer {self.token_provider.get_token()}"
 98
 99
100@dataclass
101class BasicHttpAuthenticator(DeclarativeAuthenticator):
102    """
103    Builds auth based off the basic authentication scheme as defined by RFC 7617, which transmits credentials as USER ID/password pairs, encoded using base64
104    https://developer.mozilla.org/en-US/docs/Web/HTTP/Authentication#basic_authentication_scheme
105
106    The header is of the form
107    `"Authorization": "Basic <encoded_credentials>"`
108
109    Attributes:
110        username (Union[InterpolatedString, str]): The username
111        config (Config): The user-provided configuration as specified by the source's spec
112        password (Union[InterpolatedString, str]): The password
113        parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
114    """
115
116    username: Union[InterpolatedString, str]
117    config: Config
118    parameters: InitVar[Mapping[str, Any]]
119    password: Union[InterpolatedString, str] = ""
120
121    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
122        self._username = InterpolatedString.create(self.username, parameters=parameters)
123        self._password = InterpolatedString.create(self.password, parameters=parameters)
124
125    @property
126    def auth_header(self) -> str:
127        return "Authorization"
128
129    @property
130    def token(self) -> str:
131        auth_string = (
132            f"{self._username.eval(self.config)}:{self._password.eval(self.config)}".encode("utf8")
133        )
134        b64_encoded = base64.b64encode(auth_string).decode("utf8")
135        return f"Basic {b64_encoded}"
136
137
138"""
139    maxsize - The maximum size of the cache
140    ttl - time-to-live value in seconds
141    docs https://cachetools.readthedocs.io/en/latest/
142    maxsize=1000 - when the cache is full, in this case more than 1000,
143    i.e. by adding another item the cache would exceed its maximum size, the cache must choose which item(s) to discard
144    ttl=86400 means that cached token will live for 86400 seconds (one day)
145"""
146cacheSessionTokenAuthenticator: TTLCache[str, str] = TTLCache(maxsize=1000, ttl=86400)
147
148
149@cached(cacheSessionTokenAuthenticator)
150def get_new_session_token(api_url: str, username: str, password: str, response_key: str) -> str:
151    """
152    This method retrieves session token from api by username and password for SessionTokenAuthenticator.
153    It's cashed to avoid a multiple calling by sync and updating session token every stream sync.
154    Args:
155        api_url: api url for getting new session token
156        username: username for auth
157        password: password for auth
158        response_key: field name in response to retrieve a session token
159
160    Returns:
161        session token
162    """
163    response = requests.post(
164        f"{api_url}",
165        headers={"Content-Type": "application/json"},
166        json={"username": username, "password": password},
167    )
168    response.raise_for_status()
169    if not response.ok:
170        raise ConnectionError(
171            f"Failed to retrieve new session token, response code {response.status_code} because {response.reason}"
172        )
173    return str(response.json()[response_key])
174
175
176@dataclass
177class LegacySessionTokenAuthenticator(DeclarativeAuthenticator):
178    """
179    Builds auth based on session tokens.
180    A session token is a random value generated by a server to identify
181    a specific user for the duration of one interaction session.
182
183    The header is of the form
184    `"Specific Header": "Session Token Value"`
185
186    Attributes:
187        api_url (Union[InterpolatedString, str]): Base api url of source
188        username (Union[InterpolatedString, str]): The username
189        config (Config): The user-provided configuration as specified by the source's spec
190        password (Union[InterpolatedString, str]): The password
191        header (Union[InterpolatedString, str]): Specific header of source for providing session token
192        parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
193        session_token (Union[InterpolatedString, str]): Session token generated by user
194        session_token_response_key (Union[InterpolatedString, str]): Key for retrieving session token from api response
195        login_url (Union[InterpolatedString, str]): Url fot getting a specific session token
196        validate_session_url (Union[InterpolatedString, str]): Url to validate passed session token
197    """
198
199    api_url: Union[InterpolatedString, str]
200    header: Union[InterpolatedString, str]
201    session_token: Union[InterpolatedString, str]
202    session_token_response_key: Union[InterpolatedString, str]
203    username: Union[InterpolatedString, str]
204    config: Config
205    parameters: InitVar[Mapping[str, Any]]
206    login_url: Union[InterpolatedString, str]
207    validate_session_url: Union[InterpolatedString, str]
208    password: Union[InterpolatedString, str] = ""
209
210    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
211        self._username = InterpolatedString.create(self.username, parameters=parameters)
212        self._password = InterpolatedString.create(self.password, parameters=parameters)
213        self._api_url = InterpolatedString.create(self.api_url, parameters=parameters)
214        self._header = InterpolatedString.create(self.header, parameters=parameters)
215        self._session_token = InterpolatedString.create(self.session_token, parameters=parameters)
216        self._session_token_response_key = InterpolatedString.create(
217            self.session_token_response_key, parameters=parameters
218        )
219        self._login_url = InterpolatedString.create(self.login_url, parameters=parameters)
220        self._validate_session_url = InterpolatedString.create(
221            self.validate_session_url, parameters=parameters
222        )
223
224        self.logger = logging.getLogger("airbyte")
225
226    @property
227    def auth_header(self) -> str:
228        return str(self._header.eval(self.config))
229
230    @property
231    def token(self) -> str:
232        if self._session_token.eval(self.config):
233            if self.is_valid_session_token():
234                return str(self._session_token.eval(self.config))
235        if self._password.eval(self.config) and self._username.eval(self.config):
236            username = self._username.eval(self.config)
237            password = self._password.eval(self.config)
238            session_token_response_key = self._session_token_response_key.eval(self.config)
239            api_url = f"{self._api_url.eval(self.config)}{self._login_url.eval(self.config)}"
240
241            self.logger.info("Using generated session token by username and password")
242            return get_new_session_token(api_url, username, password, session_token_response_key)
243
244        raise ConnectionError(
245            "Invalid credentials: session token is not valid or provide username and password"
246        )
247
248    def is_valid_session_token(self) -> bool:
249        try:
250            response = requests.get(
251                f"{self._api_url.eval(self.config)}{self._validate_session_url.eval(self.config)}",
252                headers={self.auth_header: self._session_token.eval(self.config)},
253            )
254            response.raise_for_status()
255        except requests.exceptions.HTTPError as e:
256            if e.response.status_code == requests.codes["unauthorized"]:
257                self.logger.info(f"Unable to connect by session token from config due to {str(e)}")
258                return False
259            else:
260                raise ConnectionError(f"Error while validating session token: {e}")
261        if response.ok:
262            self.logger.info("Connection check for source is successful.")
263            return True
264        else:
265            raise ConnectionError(
266                f"Failed to retrieve new session token, response code {response.status_code} because {response.reason}"
267            )
24@dataclass
25class ApiKeyAuthenticator(DeclarativeAuthenticator):
26    """
27    ApiKeyAuth sets a request header on the HTTP requests sent.
28
29    The header is of the form:
30    `"<header>": "<token>"`
31
32    For example,
33    `ApiKeyAuthenticator("Authorization", "Bearer hello")`
34    will result in the following header set on the HTTP request
35    `"Authorization": "Bearer hello"`
36
37    Attributes:
38        request_option (RequestOption): request option how to inject the token into the request
39        token_provider (TokenProvider): Provider of the token
40        config (Config): The user-provided configuration as specified by the source's spec
41        parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
42    """
43
44    request_option: RequestOption
45    token_provider: TokenProvider
46    config: Config
47    parameters: InitVar[Mapping[str, Any]]
48
49    @property
50    def auth_header(self) -> str:
51        options = self._get_request_options(RequestOptionType.header)
52        return next(iter(options.keys()), "")
53
54    @property
55    def token(self) -> str:
56        return self.token_provider.get_token()
57
58    def _get_request_options(self, option_type: RequestOptionType) -> Mapping[str, Any]:
59        options: MutableMapping[str, Any] = {}
60        if self.request_option.inject_into == option_type:
61            self.request_option.inject_into_request(options, self.token, self.config)
62        return options
63
64    def get_request_params(self) -> Mapping[str, Any]:
65        return self._get_request_options(RequestOptionType.request_parameter)
66
67    def get_request_body_data(self) -> Union[Mapping[str, Any], str]:
68        return self._get_request_options(RequestOptionType.body_data)
69
70    def get_request_body_json(self) -> Mapping[str, Any]:
71        return self._get_request_options(RequestOptionType.body_json)

ApiKeyAuth sets a request header on the HTTP requests sent.

The header is of the form: "<header>": "<token>"

For example, ApiKeyAuthenticator("Authorization", "Bearer hello") will result in the following header set on the HTTP request "Authorization": "Bearer hello"

Attributes:
  • request_option (RequestOption): request option how to inject the token into the request
  • token_provider (TokenProvider): Provider of the token
  • config (Config): The user-provided configuration as specified by the source's spec
  • parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
ApiKeyAuthenticator( request_option: airbyte_cdk.RequestOption, token_provider: airbyte_cdk.sources.declarative.auth.token_provider.TokenProvider, config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
request_option: airbyte_cdk.RequestOption
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
auth_header: str
49    @property
50    def auth_header(self) -> str:
51        options = self._get_request_options(RequestOptionType.header)
52        return next(iter(options.keys()), "")

HTTP header to set on the requests

token: str
54    @property
55    def token(self) -> str:
56        return self.token_provider.get_token()

The header value to set on outgoing HTTP requests

def get_request_params(self) -> Mapping[str, Any]:
64    def get_request_params(self) -> Mapping[str, Any]:
65        return self._get_request_options(RequestOptionType.request_parameter)

HTTP request parameter to add to the requests

def get_request_body_data(self) -> Union[Mapping[str, Any], str]:
67    def get_request_body_data(self) -> Union[Mapping[str, Any], str]:
68        return self._get_request_options(RequestOptionType.body_data)

Form-encoded body data to set on the requests

def get_request_body_json(self) -> Mapping[str, Any]:
70    def get_request_body_json(self) -> Mapping[str, Any]:
71        return self._get_request_options(RequestOptionType.body_json)

JSON-encoded body data to set on the requests

74@dataclass
75class BearerAuthenticator(DeclarativeAuthenticator):
76    """
77    Authenticator that sets the Authorization header on the HTTP requests sent.
78
79    The header is of the form:
80    `"Authorization": "Bearer <token>"`
81
82    Attributes:
83        token_provider (TokenProvider): Provider of the token
84        config (Config): The user-provided configuration as specified by the source's spec
85        parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
86    """
87
88    token_provider: TokenProvider
89    config: Config
90    parameters: InitVar[Mapping[str, Any]]
91
92    @property
93    def auth_header(self) -> str:
94        return "Authorization"
95
96    @property
97    def token(self) -> str:
98        return f"Bearer {self.token_provider.get_token()}"

Authenticator that sets the Authorization header on the HTTP requests sent.

The header is of the form: "Authorization": "Bearer <token>"

Attributes:
  • token_provider (TokenProvider): Provider of the token
  • config (Config): The user-provided configuration as specified by the source's spec
  • parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
BearerAuthenticator( token_provider: airbyte_cdk.sources.declarative.auth.token_provider.TokenProvider, config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
auth_header: str
92    @property
93    def auth_header(self) -> str:
94        return "Authorization"

HTTP header to set on the requests

token: str
96    @property
97    def token(self) -> str:
98        return f"Bearer {self.token_provider.get_token()}"

The header value to set on outgoing HTTP requests

101@dataclass
102class BasicHttpAuthenticator(DeclarativeAuthenticator):
103    """
104    Builds auth based off the basic authentication scheme as defined by RFC 7617, which transmits credentials as USER ID/password pairs, encoded using base64
105    https://developer.mozilla.org/en-US/docs/Web/HTTP/Authentication#basic_authentication_scheme
106
107    The header is of the form
108    `"Authorization": "Basic <encoded_credentials>"`
109
110    Attributes:
111        username (Union[InterpolatedString, str]): The username
112        config (Config): The user-provided configuration as specified by the source's spec
113        password (Union[InterpolatedString, str]): The password
114        parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
115    """
116
117    username: Union[InterpolatedString, str]
118    config: Config
119    parameters: InitVar[Mapping[str, Any]]
120    password: Union[InterpolatedString, str] = ""
121
122    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
123        self._username = InterpolatedString.create(self.username, parameters=parameters)
124        self._password = InterpolatedString.create(self.password, parameters=parameters)
125
126    @property
127    def auth_header(self) -> str:
128        return "Authorization"
129
130    @property
131    def token(self) -> str:
132        auth_string = (
133            f"{self._username.eval(self.config)}:{self._password.eval(self.config)}".encode("utf8")
134        )
135        b64_encoded = base64.b64encode(auth_string).decode("utf8")
136        return f"Basic {b64_encoded}"

Builds auth based off the basic authentication scheme as defined by RFC 7617, which transmits credentials as USER ID/password pairs, encoded using base64 https://developer.mozilla.org/en-US/docs/Web/HTTP/Authentication#basic_authentication_scheme

The header is of the form "Authorization": "Basic <encoded_credentials>"

Attributes:
  • username (Union[InterpolatedString, str]): The username
  • config (Config): The user-provided configuration as specified by the source's spec
  • password (Union[InterpolatedString, str]): The password
  • parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
BasicHttpAuthenticator( username: Union[airbyte_cdk.InterpolatedString, str], config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], password: Union[airbyte_cdk.InterpolatedString, str] = '')
username: Union[airbyte_cdk.InterpolatedString, str]
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
password: Union[airbyte_cdk.InterpolatedString, str] = ''
auth_header: str
126    @property
127    def auth_header(self) -> str:
128        return "Authorization"

HTTP header to set on the requests

token: str
130    @property
131    def token(self) -> str:
132        auth_string = (
133            f"{self._username.eval(self.config)}:{self._password.eval(self.config)}".encode("utf8")
134        )
135        b64_encoded = base64.b64encode(auth_string).decode("utf8")
136        return f"Basic {b64_encoded}"

The header value to set on outgoing HTTP requests

cacheSessionTokenAuthenticator: cachetools.TTLCache[str, str] = TTLCache({}, maxsize=1000, currsize=0)
@cached(cacheSessionTokenAuthenticator)
def get_new_session_token(api_url: str, username: str, password: str, response_key: str) -> str:
150@cached(cacheSessionTokenAuthenticator)
151def get_new_session_token(api_url: str, username: str, password: str, response_key: str) -> str:
152    """
153    This method retrieves session token from api by username and password for SessionTokenAuthenticator.
154    It's cashed to avoid a multiple calling by sync and updating session token every stream sync.
155    Args:
156        api_url: api url for getting new session token
157        username: username for auth
158        password: password for auth
159        response_key: field name in response to retrieve a session token
160
161    Returns:
162        session token
163    """
164    response = requests.post(
165        f"{api_url}",
166        headers={"Content-Type": "application/json"},
167        json={"username": username, "password": password},
168    )
169    response.raise_for_status()
170    if not response.ok:
171        raise ConnectionError(
172            f"Failed to retrieve new session token, response code {response.status_code} because {response.reason}"
173        )
174    return str(response.json()[response_key])

This method retrieves session token from api by username and password for SessionTokenAuthenticator. It's cashed to avoid a multiple calling by sync and updating session token every stream sync.

Arguments:
  • api_url: api url for getting new session token
  • username: username for auth
  • password: password for auth
  • response_key: field name in response to retrieve a session token
Returns:

session token

@dataclass
class LegacySessionTokenAuthenticator(airbyte_cdk.sources.declarative.auth.declarative_authenticator.DeclarativeAuthenticator):
177@dataclass
178class LegacySessionTokenAuthenticator(DeclarativeAuthenticator):
179    """
180    Builds auth based on session tokens.
181    A session token is a random value generated by a server to identify
182    a specific user for the duration of one interaction session.
183
184    The header is of the form
185    `"Specific Header": "Session Token Value"`
186
187    Attributes:
188        api_url (Union[InterpolatedString, str]): Base api url of source
189        username (Union[InterpolatedString, str]): The username
190        config (Config): The user-provided configuration as specified by the source's spec
191        password (Union[InterpolatedString, str]): The password
192        header (Union[InterpolatedString, str]): Specific header of source for providing session token
193        parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
194        session_token (Union[InterpolatedString, str]): Session token generated by user
195        session_token_response_key (Union[InterpolatedString, str]): Key for retrieving session token from api response
196        login_url (Union[InterpolatedString, str]): Url fot getting a specific session token
197        validate_session_url (Union[InterpolatedString, str]): Url to validate passed session token
198    """
199
200    api_url: Union[InterpolatedString, str]
201    header: Union[InterpolatedString, str]
202    session_token: Union[InterpolatedString, str]
203    session_token_response_key: Union[InterpolatedString, str]
204    username: Union[InterpolatedString, str]
205    config: Config
206    parameters: InitVar[Mapping[str, Any]]
207    login_url: Union[InterpolatedString, str]
208    validate_session_url: Union[InterpolatedString, str]
209    password: Union[InterpolatedString, str] = ""
210
211    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
212        self._username = InterpolatedString.create(self.username, parameters=parameters)
213        self._password = InterpolatedString.create(self.password, parameters=parameters)
214        self._api_url = InterpolatedString.create(self.api_url, parameters=parameters)
215        self._header = InterpolatedString.create(self.header, parameters=parameters)
216        self._session_token = InterpolatedString.create(self.session_token, parameters=parameters)
217        self._session_token_response_key = InterpolatedString.create(
218            self.session_token_response_key, parameters=parameters
219        )
220        self._login_url = InterpolatedString.create(self.login_url, parameters=parameters)
221        self._validate_session_url = InterpolatedString.create(
222            self.validate_session_url, parameters=parameters
223        )
224
225        self.logger = logging.getLogger("airbyte")
226
227    @property
228    def auth_header(self) -> str:
229        return str(self._header.eval(self.config))
230
231    @property
232    def token(self) -> str:
233        if self._session_token.eval(self.config):
234            if self.is_valid_session_token():
235                return str(self._session_token.eval(self.config))
236        if self._password.eval(self.config) and self._username.eval(self.config):
237            username = self._username.eval(self.config)
238            password = self._password.eval(self.config)
239            session_token_response_key = self._session_token_response_key.eval(self.config)
240            api_url = f"{self._api_url.eval(self.config)}{self._login_url.eval(self.config)}"
241
242            self.logger.info("Using generated session token by username and password")
243            return get_new_session_token(api_url, username, password, session_token_response_key)
244
245        raise ConnectionError(
246            "Invalid credentials: session token is not valid or provide username and password"
247        )
248
249    def is_valid_session_token(self) -> bool:
250        try:
251            response = requests.get(
252                f"{self._api_url.eval(self.config)}{self._validate_session_url.eval(self.config)}",
253                headers={self.auth_header: self._session_token.eval(self.config)},
254            )
255            response.raise_for_status()
256        except requests.exceptions.HTTPError as e:
257            if e.response.status_code == requests.codes["unauthorized"]:
258                self.logger.info(f"Unable to connect by session token from config due to {str(e)}")
259                return False
260            else:
261                raise ConnectionError(f"Error while validating session token: {e}")
262        if response.ok:
263            self.logger.info("Connection check for source is successful.")
264            return True
265        else:
266            raise ConnectionError(
267                f"Failed to retrieve new session token, response code {response.status_code} because {response.reason}"
268            )

Builds auth based on session tokens. A session token is a random value generated by a server to identify a specific user for the duration of one interaction session.

The header is of the form "Specific Header": "Session Token Value"

Attributes:
  • api_url (Union[InterpolatedString, str]): Base api url of source
  • username (Union[InterpolatedString, str]): The username
  • config (Config): The user-provided configuration as specified by the source's spec
  • password (Union[InterpolatedString, str]): The password
  • header (Union[InterpolatedString, str]): Specific header of source for providing session token
  • parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
  • session_token (Union[InterpolatedString, str]): Session token generated by user
  • session_token_response_key (Union[InterpolatedString, str]): Key for retrieving session token from api response
  • login_url (Union[InterpolatedString, str]): Url fot getting a specific session token
  • validate_session_url (Union[InterpolatedString, str]): Url to validate passed session token
LegacySessionTokenAuthenticator( api_url: Union[airbyte_cdk.InterpolatedString, str], header: Union[airbyte_cdk.InterpolatedString, str], session_token: Union[airbyte_cdk.InterpolatedString, str], session_token_response_key: Union[airbyte_cdk.InterpolatedString, str], username: Union[airbyte_cdk.InterpolatedString, str], config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], login_url: Union[airbyte_cdk.InterpolatedString, str], validate_session_url: Union[airbyte_cdk.InterpolatedString, str], password: Union[airbyte_cdk.InterpolatedString, str] = '')
api_url: Union[airbyte_cdk.InterpolatedString, str]
header: Union[airbyte_cdk.InterpolatedString, str]
session_token: Union[airbyte_cdk.InterpolatedString, str]
session_token_response_key: Union[airbyte_cdk.InterpolatedString, str]
username: Union[airbyte_cdk.InterpolatedString, str]
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
login_url: Union[airbyte_cdk.InterpolatedString, str]
validate_session_url: Union[airbyte_cdk.InterpolatedString, str]
password: Union[airbyte_cdk.InterpolatedString, str] = ''
auth_header: str
227    @property
228    def auth_header(self) -> str:
229        return str(self._header.eval(self.config))

HTTP header to set on the requests

token: str
231    @property
232    def token(self) -> str:
233        if self._session_token.eval(self.config):
234            if self.is_valid_session_token():
235                return str(self._session_token.eval(self.config))
236        if self._password.eval(self.config) and self._username.eval(self.config):
237            username = self._username.eval(self.config)
238            password = self._password.eval(self.config)
239            session_token_response_key = self._session_token_response_key.eval(self.config)
240            api_url = f"{self._api_url.eval(self.config)}{self._login_url.eval(self.config)}"
241
242            self.logger.info("Using generated session token by username and password")
243            return get_new_session_token(api_url, username, password, session_token_response_key)
244
245        raise ConnectionError(
246            "Invalid credentials: session token is not valid or provide username and password"
247        )

The header value to set on outgoing HTTP requests

def is_valid_session_token(self) -> bool:
249    def is_valid_session_token(self) -> bool:
250        try:
251            response = requests.get(
252                f"{self._api_url.eval(self.config)}{self._validate_session_url.eval(self.config)}",
253                headers={self.auth_header: self._session_token.eval(self.config)},
254            )
255            response.raise_for_status()
256        except requests.exceptions.HTTPError as e:
257            if e.response.status_code == requests.codes["unauthorized"]:
258                self.logger.info(f"Unable to connect by session token from config due to {str(e)}")
259                return False
260            else:
261                raise ConnectionError(f"Error while validating session token: {e}")
262        if response.ok:
263            self.logger.info("Connection check for source is successful.")
264            return True
265        else:
266            raise ConnectionError(
267                f"Failed to retrieve new session token, response code {response.status_code} because {response.reason}"
268            )