airbyte_cdk.sources.declarative.auth.token
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import base64 6import logging 7from dataclasses import InitVar, dataclass 8from typing import Any, Mapping, MutableMapping, Union 9 10import requests 11from cachetools import TTLCache, cached 12 13from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator 14from airbyte_cdk.sources.declarative.auth.token_provider import TokenProvider 15from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString 16from airbyte_cdk.sources.declarative.requesters.request_option import ( 17 RequestOption, 18 RequestOptionType, 19) 20from airbyte_cdk.sources.types import Config 21 22 23@dataclass 24class ApiKeyAuthenticator(DeclarativeAuthenticator): 25 """ 26 ApiKeyAuth sets a request header on the HTTP requests sent. 27 28 The header is of the form: 29 `"<header>": "<token>"` 30 31 For example, 32 `ApiKeyAuthenticator("Authorization", "Bearer hello")` 33 will result in the following header set on the HTTP request 34 `"Authorization": "Bearer hello"` 35 36 Attributes: 37 request_option (RequestOption): request option how to inject the token into the request 38 token_provider (TokenProvider): Provider of the token 39 config (Config): The user-provided configuration as specified by the source's spec 40 parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation 41 """ 42 43 request_option: RequestOption 44 token_provider: TokenProvider 45 config: Config 46 parameters: InitVar[Mapping[str, Any]] 47 48 @property 49 def auth_header(self) -> str: 50 options = self._get_request_options(RequestOptionType.header) 51 return next(iter(options.keys()), "") 52 53 @property 54 def token(self) -> str: 55 return self.token_provider.get_token() 56 57 def _get_request_options(self, option_type: RequestOptionType) -> Mapping[str, Any]: 58 options: MutableMapping[str, Any] = {} 59 if self.request_option.inject_into == option_type: 60 self.request_option.inject_into_request(options, self.token, self.config) 61 return options 62 63 def get_request_params(self) -> Mapping[str, Any]: 64 return self._get_request_options(RequestOptionType.request_parameter) 65 66 def get_request_body_data(self) -> Union[Mapping[str, Any], str]: 67 return self._get_request_options(RequestOptionType.body_data) 68 69 def get_request_body_json(self) -> Mapping[str, Any]: 70 return self._get_request_options(RequestOptionType.body_json) 71 72 73@dataclass 74class BearerAuthenticator(DeclarativeAuthenticator): 75 """ 76 Authenticator that sets the Authorization header on the HTTP requests sent. 77 78 The header is of the form: 79 `"Authorization": "Bearer <token>"` 80 81 Attributes: 82 token_provider (TokenProvider): Provider of the token 83 config (Config): The user-provided configuration as specified by the source's spec 84 parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation 85 """ 86 87 token_provider: TokenProvider 88 config: Config 89 parameters: InitVar[Mapping[str, Any]] 90 91 @property 92 def auth_header(self) -> str: 93 return "Authorization" 94 95 @property 96 def token(self) -> str: 97 return f"Bearer {self.token_provider.get_token()}" 98 99 100@dataclass 101class BasicHttpAuthenticator(DeclarativeAuthenticator): 102 """ 103 Builds auth based off the basic authentication scheme as defined by RFC 7617, which transmits credentials as USER ID/password pairs, encoded using base64 104 https://developer.mozilla.org/en-US/docs/Web/HTTP/Authentication#basic_authentication_scheme 105 106 The header is of the form 107 `"Authorization": "Basic <encoded_credentials>"` 108 109 Attributes: 110 username (Union[InterpolatedString, str]): The username 111 config (Config): The user-provided configuration as specified by the source's spec 112 password (Union[InterpolatedString, str]): The password 113 parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation 114 """ 115 116 username: Union[InterpolatedString, str] 117 config: Config 118 parameters: InitVar[Mapping[str, Any]] 119 password: Union[InterpolatedString, str] = "" 120 121 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 122 self._username = InterpolatedString.create(self.username, parameters=parameters) 123 self._password = InterpolatedString.create(self.password, parameters=parameters) 124 125 @property 126 def auth_header(self) -> str: 127 return "Authorization" 128 129 @property 130 def token(self) -> str: 131 auth_string = ( 132 f"{self._username.eval(self.config)}:{self._password.eval(self.config)}".encode("utf8") 133 ) 134 b64_encoded = base64.b64encode(auth_string).decode("utf8") 135 return f"Basic {b64_encoded}" 136 137 138""" 139 maxsize - The maximum size of the cache 140 ttl - time-to-live value in seconds 141 docs https://cachetools.readthedocs.io/en/latest/ 142 maxsize=1000 - when the cache is full, in this case more than 1000, 143 i.e. by adding another item the cache would exceed its maximum size, the cache must choose which item(s) to discard 144 ttl=86400 means that cached token will live for 86400 seconds (one day) 145""" 146cacheSessionTokenAuthenticator: TTLCache[str, str] = TTLCache(maxsize=1000, ttl=86400) 147 148 149@cached(cacheSessionTokenAuthenticator) 150def get_new_session_token(api_url: str, username: str, password: str, response_key: str) -> str: 151 """ 152 This method retrieves session token from api by username and password for SessionTokenAuthenticator. 153 It's cashed to avoid a multiple calling by sync and updating session token every stream sync. 154 Args: 155 api_url: api url for getting new session token 156 username: username for auth 157 password: password for auth 158 response_key: field name in response to retrieve a session token 159 160 Returns: 161 session token 162 """ 163 response = requests.post( 164 f"{api_url}", 165 headers={"Content-Type": "application/json"}, 166 json={"username": username, "password": password}, 167 ) 168 response.raise_for_status() 169 if not response.ok: 170 raise ConnectionError( 171 f"Failed to retrieve new session token, response code {response.status_code} because {response.reason}" 172 ) 173 return str(response.json()[response_key]) 174 175 176@dataclass 177class LegacySessionTokenAuthenticator(DeclarativeAuthenticator): 178 """ 179 Builds auth based on session tokens. 180 A session token is a random value generated by a server to identify 181 a specific user for the duration of one interaction session. 182 183 The header is of the form 184 `"Specific Header": "Session Token Value"` 185 186 Attributes: 187 api_url (Union[InterpolatedString, str]): Base api url of source 188 username (Union[InterpolatedString, str]): The username 189 config (Config): The user-provided configuration as specified by the source's spec 190 password (Union[InterpolatedString, str]): The password 191 header (Union[InterpolatedString, str]): Specific header of source for providing session token 192 parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation 193 session_token (Union[InterpolatedString, str]): Session token generated by user 194 session_token_response_key (Union[InterpolatedString, str]): Key for retrieving session token from api response 195 login_url (Union[InterpolatedString, str]): Url fot getting a specific session token 196 validate_session_url (Union[InterpolatedString, str]): Url to validate passed session token 197 """ 198 199 api_url: Union[InterpolatedString, str] 200 header: Union[InterpolatedString, str] 201 session_token: Union[InterpolatedString, str] 202 session_token_response_key: Union[InterpolatedString, str] 203 username: Union[InterpolatedString, str] 204 config: Config 205 parameters: InitVar[Mapping[str, Any]] 206 login_url: Union[InterpolatedString, str] 207 validate_session_url: Union[InterpolatedString, str] 208 password: Union[InterpolatedString, str] = "" 209 210 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 211 self._username = InterpolatedString.create(self.username, parameters=parameters) 212 self._password = InterpolatedString.create(self.password, parameters=parameters) 213 self._api_url = InterpolatedString.create(self.api_url, parameters=parameters) 214 self._header = InterpolatedString.create(self.header, parameters=parameters) 215 self._session_token = InterpolatedString.create(self.session_token, parameters=parameters) 216 self._session_token_response_key = InterpolatedString.create( 217 self.session_token_response_key, parameters=parameters 218 ) 219 self._login_url = InterpolatedString.create(self.login_url, parameters=parameters) 220 self._validate_session_url = InterpolatedString.create( 221 self.validate_session_url, parameters=parameters 222 ) 223 224 self.logger = logging.getLogger("airbyte") 225 226 @property 227 def auth_header(self) -> str: 228 return str(self._header.eval(self.config)) 229 230 @property 231 def token(self) -> str: 232 if self._session_token.eval(self.config): 233 if self.is_valid_session_token(): 234 return str(self._session_token.eval(self.config)) 235 if self._password.eval(self.config) and self._username.eval(self.config): 236 username = self._username.eval(self.config) 237 password = self._password.eval(self.config) 238 session_token_response_key = self._session_token_response_key.eval(self.config) 239 api_url = f"{self._api_url.eval(self.config)}{self._login_url.eval(self.config)}" 240 241 self.logger.info("Using generated session token by username and password") 242 return get_new_session_token(api_url, username, password, session_token_response_key) 243 244 raise ConnectionError( 245 "Invalid credentials: session token is not valid or provide username and password" 246 ) 247 248 def is_valid_session_token(self) -> bool: 249 try: 250 response = requests.get( 251 f"{self._api_url.eval(self.config)}{self._validate_session_url.eval(self.config)}", 252 headers={self.auth_header: self._session_token.eval(self.config)}, 253 ) 254 response.raise_for_status() 255 except requests.exceptions.HTTPError as e: 256 if e.response.status_code == requests.codes["unauthorized"]: 257 self.logger.info(f"Unable to connect by session token from config due to {str(e)}") 258 return False 259 else: 260 raise ConnectionError(f"Error while validating session token: {e}") 261 if response.ok: 262 self.logger.info("Connection check for source is successful.") 263 return True 264 else: 265 raise ConnectionError( 266 f"Failed to retrieve new session token, response code {response.status_code} because {response.reason}" 267 )
24@dataclass 25class ApiKeyAuthenticator(DeclarativeAuthenticator): 26 """ 27 ApiKeyAuth sets a request header on the HTTP requests sent. 28 29 The header is of the form: 30 `"<header>": "<token>"` 31 32 For example, 33 `ApiKeyAuthenticator("Authorization", "Bearer hello")` 34 will result in the following header set on the HTTP request 35 `"Authorization": "Bearer hello"` 36 37 Attributes: 38 request_option (RequestOption): request option how to inject the token into the request 39 token_provider (TokenProvider): Provider of the token 40 config (Config): The user-provided configuration as specified by the source's spec 41 parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation 42 """ 43 44 request_option: RequestOption 45 token_provider: TokenProvider 46 config: Config 47 parameters: InitVar[Mapping[str, Any]] 48 49 @property 50 def auth_header(self) -> str: 51 options = self._get_request_options(RequestOptionType.header) 52 return next(iter(options.keys()), "") 53 54 @property 55 def token(self) -> str: 56 return self.token_provider.get_token() 57 58 def _get_request_options(self, option_type: RequestOptionType) -> Mapping[str, Any]: 59 options: MutableMapping[str, Any] = {} 60 if self.request_option.inject_into == option_type: 61 self.request_option.inject_into_request(options, self.token, self.config) 62 return options 63 64 def get_request_params(self) -> Mapping[str, Any]: 65 return self._get_request_options(RequestOptionType.request_parameter) 66 67 def get_request_body_data(self) -> Union[Mapping[str, Any], str]: 68 return self._get_request_options(RequestOptionType.body_data) 69 70 def get_request_body_json(self) -> Mapping[str, Any]: 71 return self._get_request_options(RequestOptionType.body_json)
ApiKeyAuth sets a request header on the HTTP requests sent.
The header is of the form:
"<header>": "<token>"
For example,
ApiKeyAuthenticator("Authorization", "Bearer hello")
will result in the following header set on the HTTP request
"Authorization": "Bearer hello"
Attributes:
- request_option (RequestOption): request option how to inject the token into the request
- token_provider (TokenProvider): Provider of the token
- config (Config): The user-provided configuration as specified by the source's spec
- parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
49 @property 50 def auth_header(self) -> str: 51 options = self._get_request_options(RequestOptionType.header) 52 return next(iter(options.keys()), "")
HTTP header to set on the requests
64 def get_request_params(self) -> Mapping[str, Any]: 65 return self._get_request_options(RequestOptionType.request_parameter)
HTTP request parameter to add to the requests
67 def get_request_body_data(self) -> Union[Mapping[str, Any], str]: 68 return self._get_request_options(RequestOptionType.body_data)
Form-encoded body data to set on the requests
74@dataclass 75class BearerAuthenticator(DeclarativeAuthenticator): 76 """ 77 Authenticator that sets the Authorization header on the HTTP requests sent. 78 79 The header is of the form: 80 `"Authorization": "Bearer <token>"` 81 82 Attributes: 83 token_provider (TokenProvider): Provider of the token 84 config (Config): The user-provided configuration as specified by the source's spec 85 parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation 86 """ 87 88 token_provider: TokenProvider 89 config: Config 90 parameters: InitVar[Mapping[str, Any]] 91 92 @property 93 def auth_header(self) -> str: 94 return "Authorization" 95 96 @property 97 def token(self) -> str: 98 return f"Bearer {self.token_provider.get_token()}"
Authenticator that sets the Authorization header on the HTTP requests sent.
The header is of the form:
"Authorization": "Bearer <token>"
Attributes:
- token_provider (TokenProvider): Provider of the token
- config (Config): The user-provided configuration as specified by the source's spec
- parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
101@dataclass 102class BasicHttpAuthenticator(DeclarativeAuthenticator): 103 """ 104 Builds auth based off the basic authentication scheme as defined by RFC 7617, which transmits credentials as USER ID/password pairs, encoded using base64 105 https://developer.mozilla.org/en-US/docs/Web/HTTP/Authentication#basic_authentication_scheme 106 107 The header is of the form 108 `"Authorization": "Basic <encoded_credentials>"` 109 110 Attributes: 111 username (Union[InterpolatedString, str]): The username 112 config (Config): The user-provided configuration as specified by the source's spec 113 password (Union[InterpolatedString, str]): The password 114 parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation 115 """ 116 117 username: Union[InterpolatedString, str] 118 config: Config 119 parameters: InitVar[Mapping[str, Any]] 120 password: Union[InterpolatedString, str] = "" 121 122 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 123 self._username = InterpolatedString.create(self.username, parameters=parameters) 124 self._password = InterpolatedString.create(self.password, parameters=parameters) 125 126 @property 127 def auth_header(self) -> str: 128 return "Authorization" 129 130 @property 131 def token(self) -> str: 132 auth_string = ( 133 f"{self._username.eval(self.config)}:{self._password.eval(self.config)}".encode("utf8") 134 ) 135 b64_encoded = base64.b64encode(auth_string).decode("utf8") 136 return f"Basic {b64_encoded}"
Builds auth based off the basic authentication scheme as defined by RFC 7617, which transmits credentials as USER ID/password pairs, encoded using base64 https://developer.mozilla.org/en-US/docs/Web/HTTP/Authentication#basic_authentication_scheme
The header is of the form
"Authorization": "Basic <encoded_credentials>"
Attributes:
- username (Union[InterpolatedString, str]): The username
- config (Config): The user-provided configuration as specified by the source's spec
- password (Union[InterpolatedString, str]): The password
- parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
130 @property 131 def token(self) -> str: 132 auth_string = ( 133 f"{self._username.eval(self.config)}:{self._password.eval(self.config)}".encode("utf8") 134 ) 135 b64_encoded = base64.b64encode(auth_string).decode("utf8") 136 return f"Basic {b64_encoded}"
The header value to set on outgoing HTTP requests
150@cached(cacheSessionTokenAuthenticator) 151def get_new_session_token(api_url: str, username: str, password: str, response_key: str) -> str: 152 """ 153 This method retrieves session token from api by username and password for SessionTokenAuthenticator. 154 It's cashed to avoid a multiple calling by sync and updating session token every stream sync. 155 Args: 156 api_url: api url for getting new session token 157 username: username for auth 158 password: password for auth 159 response_key: field name in response to retrieve a session token 160 161 Returns: 162 session token 163 """ 164 response = requests.post( 165 f"{api_url}", 166 headers={"Content-Type": "application/json"}, 167 json={"username": username, "password": password}, 168 ) 169 response.raise_for_status() 170 if not response.ok: 171 raise ConnectionError( 172 f"Failed to retrieve new session token, response code {response.status_code} because {response.reason}" 173 ) 174 return str(response.json()[response_key])
This method retrieves session token from api by username and password for SessionTokenAuthenticator. It's cashed to avoid a multiple calling by sync and updating session token every stream sync.
Arguments:
- api_url: api url for getting new session token
- username: username for auth
- password: password for auth
- response_key: field name in response to retrieve a session token
Returns:
session token
177@dataclass 178class LegacySessionTokenAuthenticator(DeclarativeAuthenticator): 179 """ 180 Builds auth based on session tokens. 181 A session token is a random value generated by a server to identify 182 a specific user for the duration of one interaction session. 183 184 The header is of the form 185 `"Specific Header": "Session Token Value"` 186 187 Attributes: 188 api_url (Union[InterpolatedString, str]): Base api url of source 189 username (Union[InterpolatedString, str]): The username 190 config (Config): The user-provided configuration as specified by the source's spec 191 password (Union[InterpolatedString, str]): The password 192 header (Union[InterpolatedString, str]): Specific header of source for providing session token 193 parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation 194 session_token (Union[InterpolatedString, str]): Session token generated by user 195 session_token_response_key (Union[InterpolatedString, str]): Key for retrieving session token from api response 196 login_url (Union[InterpolatedString, str]): Url fot getting a specific session token 197 validate_session_url (Union[InterpolatedString, str]): Url to validate passed session token 198 """ 199 200 api_url: Union[InterpolatedString, str] 201 header: Union[InterpolatedString, str] 202 session_token: Union[InterpolatedString, str] 203 session_token_response_key: Union[InterpolatedString, str] 204 username: Union[InterpolatedString, str] 205 config: Config 206 parameters: InitVar[Mapping[str, Any]] 207 login_url: Union[InterpolatedString, str] 208 validate_session_url: Union[InterpolatedString, str] 209 password: Union[InterpolatedString, str] = "" 210 211 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 212 self._username = InterpolatedString.create(self.username, parameters=parameters) 213 self._password = InterpolatedString.create(self.password, parameters=parameters) 214 self._api_url = InterpolatedString.create(self.api_url, parameters=parameters) 215 self._header = InterpolatedString.create(self.header, parameters=parameters) 216 self._session_token = InterpolatedString.create(self.session_token, parameters=parameters) 217 self._session_token_response_key = InterpolatedString.create( 218 self.session_token_response_key, parameters=parameters 219 ) 220 self._login_url = InterpolatedString.create(self.login_url, parameters=parameters) 221 self._validate_session_url = InterpolatedString.create( 222 self.validate_session_url, parameters=parameters 223 ) 224 225 self.logger = logging.getLogger("airbyte") 226 227 @property 228 def auth_header(self) -> str: 229 return str(self._header.eval(self.config)) 230 231 @property 232 def token(self) -> str: 233 if self._session_token.eval(self.config): 234 if self.is_valid_session_token(): 235 return str(self._session_token.eval(self.config)) 236 if self._password.eval(self.config) and self._username.eval(self.config): 237 username = self._username.eval(self.config) 238 password = self._password.eval(self.config) 239 session_token_response_key = self._session_token_response_key.eval(self.config) 240 api_url = f"{self._api_url.eval(self.config)}{self._login_url.eval(self.config)}" 241 242 self.logger.info("Using generated session token by username and password") 243 return get_new_session_token(api_url, username, password, session_token_response_key) 244 245 raise ConnectionError( 246 "Invalid credentials: session token is not valid or provide username and password" 247 ) 248 249 def is_valid_session_token(self) -> bool: 250 try: 251 response = requests.get( 252 f"{self._api_url.eval(self.config)}{self._validate_session_url.eval(self.config)}", 253 headers={self.auth_header: self._session_token.eval(self.config)}, 254 ) 255 response.raise_for_status() 256 except requests.exceptions.HTTPError as e: 257 if e.response.status_code == requests.codes["unauthorized"]: 258 self.logger.info(f"Unable to connect by session token from config due to {str(e)}") 259 return False 260 else: 261 raise ConnectionError(f"Error while validating session token: {e}") 262 if response.ok: 263 self.logger.info("Connection check for source is successful.") 264 return True 265 else: 266 raise ConnectionError( 267 f"Failed to retrieve new session token, response code {response.status_code} because {response.reason}" 268 )
Builds auth based on session tokens. A session token is a random value generated by a server to identify a specific user for the duration of one interaction session.
The header is of the form
"Specific Header": "Session Token Value"
Attributes:
- api_url (Union[InterpolatedString, str]): Base api url of source
- username (Union[InterpolatedString, str]): The username
- config (Config): The user-provided configuration as specified by the source's spec
- password (Union[InterpolatedString, str]): The password
- header (Union[InterpolatedString, str]): Specific header of source for providing session token
- parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
- session_token (Union[InterpolatedString, str]): Session token generated by user
- session_token_response_key (Union[InterpolatedString, str]): Key for retrieving session token from api response
- login_url (Union[InterpolatedString, str]): Url fot getting a specific session token
- validate_session_url (Union[InterpolatedString, str]): Url to validate passed session token
231 @property 232 def token(self) -> str: 233 if self._session_token.eval(self.config): 234 if self.is_valid_session_token(): 235 return str(self._session_token.eval(self.config)) 236 if self._password.eval(self.config) and self._username.eval(self.config): 237 username = self._username.eval(self.config) 238 password = self._password.eval(self.config) 239 session_token_response_key = self._session_token_response_key.eval(self.config) 240 api_url = f"{self._api_url.eval(self.config)}{self._login_url.eval(self.config)}" 241 242 self.logger.info("Using generated session token by username and password") 243 return get_new_session_token(api_url, username, password, session_token_response_key) 244 245 raise ConnectionError( 246 "Invalid credentials: session token is not valid or provide username and password" 247 )
The header value to set on outgoing HTTP requests
249 def is_valid_session_token(self) -> bool: 250 try: 251 response = requests.get( 252 f"{self._api_url.eval(self.config)}{self._validate_session_url.eval(self.config)}", 253 headers={self.auth_header: self._session_token.eval(self.config)}, 254 ) 255 response.raise_for_status() 256 except requests.exceptions.HTTPError as e: 257 if e.response.status_code == requests.codes["unauthorized"]: 258 self.logger.info(f"Unable to connect by session token from config due to {str(e)}") 259 return False 260 else: 261 raise ConnectionError(f"Error while validating session token: {e}") 262 if response.ok: 263 self.logger.info("Connection check for source is successful.") 264 return True 265 else: 266 raise ConnectionError( 267 f"Failed to retrieve new session token, response code {response.status_code} because {response.reason}" 268 )