airbyte_cdk.sources.streams.http.requests_native_auth
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from .oauth import Oauth2Authenticator, SingleUseRefreshTokenOauth2Authenticator 6from .token import BasicHttpAuthenticator, MultipleTokenAuthenticator, TokenAuthenticator 7 8__all__ = [ 9 "Oauth2Authenticator", 10 "SingleUseRefreshTokenOauth2Authenticator", 11 "TokenAuthenticator", 12 "MultipleTokenAuthenticator", 13 "BasicHttpAuthenticator", 14]
26class Oauth2Authenticator(AbstractOauth2Authenticator): 27 """ 28 Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials. 29 The generated access token is attached to each request via the Authorization header. 30 If a connector_config is provided any mutation of it's value in the scope of this class will emit AirbyteControlConnectorConfigMessage. 31 """ 32 33 def __init__( 34 self, 35 token_refresh_endpoint: str, 36 client_id: str, 37 client_secret: str, 38 refresh_token: str, 39 client_id_name: str = "client_id", 40 client_secret_name: str = "client_secret", 41 refresh_token_name: str = "refresh_token", 42 scopes: List[str] | None = None, 43 token_expiry_date: AirbyteDateTime | None = None, 44 token_expiry_date_format: str | None = None, 45 access_token_name: str = "access_token", 46 expires_in_name: str = "expires_in", 47 refresh_request_body: Mapping[str, Any] | None = None, 48 refresh_request_headers: Mapping[str, Any] | None = None, 49 send_refresh_request_as_query_params: bool = False, 50 grant_type_name: str = "grant_type", 51 grant_type: str = "refresh_token", 52 token_expiry_is_time_of_expiration: bool = False, 53 refresh_token_error_status_codes: Tuple[int, ...] = (), 54 refresh_token_error_key: str = "", 55 refresh_token_error_values: Tuple[str, ...] = (), 56 ) -> None: 57 self._token_refresh_endpoint = token_refresh_endpoint 58 self._client_secret_name = client_secret_name 59 self._client_secret = client_secret 60 self._client_id_name = client_id_name 61 self._client_id = client_id 62 self._refresh_token_name = refresh_token_name 63 self._refresh_token = refresh_token 64 self._scopes = scopes 65 self._access_token_name = access_token_name 66 self._expires_in_name = expires_in_name 67 self._refresh_request_body = refresh_request_body 68 self._refresh_request_headers = refresh_request_headers 69 self._send_refresh_request_as_query_params = send_refresh_request_as_query_params 70 self._grant_type_name = grant_type_name 71 self._grant_type = grant_type 72 73 self._token_expiry_date = token_expiry_date or (ab_datetime_now() - timedelta(days=1)) 74 self._token_expiry_date_format = token_expiry_date_format 75 self._token_expiry_is_time_of_expiration = token_expiry_is_time_of_expiration 76 self._access_token = None 77 super().__init__( 78 refresh_token_error_status_codes, refresh_token_error_key, refresh_token_error_values 79 ) 80 81 def get_token_refresh_endpoint(self) -> str: 82 return self._token_refresh_endpoint 83 84 def get_client_id_name(self) -> str: 85 return self._client_id_name 86 87 def get_client_id(self) -> str: 88 return self._client_id 89 90 def get_client_secret_name(self) -> str: 91 return self._client_secret_name 92 93 def get_client_secret(self) -> str: 94 return self._client_secret 95 96 def get_refresh_token_name(self) -> str: 97 return self._refresh_token_name 98 99 def get_refresh_token(self) -> str: 100 return self._refresh_token 101 102 def get_access_token_name(self) -> str: 103 return self._access_token_name 104 105 def get_scopes(self) -> list[str]: 106 return self._scopes # type: ignore[return-value] 107 108 def get_expires_in_name(self) -> str: 109 return self._expires_in_name 110 111 def get_refresh_request_body(self) -> Mapping[str, Any]: 112 return self._refresh_request_body # type: ignore[return-value] 113 114 def get_refresh_request_headers(self) -> Mapping[str, Any]: 115 return self._refresh_request_headers # type: ignore[return-value] 116 117 def should_send_refresh_request_as_query_params(self) -> bool: 118 return self._send_refresh_request_as_query_params 119 120 def get_grant_type_name(self) -> str: 121 return self._grant_type_name 122 123 def get_grant_type(self) -> str: 124 return self._grant_type 125 126 def get_token_expiry_date(self) -> AirbyteDateTime: 127 return self._token_expiry_date 128 129 def set_token_expiry_date(self, value: AirbyteDateTime) -> None: 130 self._token_expiry_date = value 131 132 @property 133 def token_expiry_is_time_of_expiration(self) -> bool: 134 return self._token_expiry_is_time_of_expiration 135 136 @property 137 def token_expiry_date_format(self) -> Optional[str]: 138 return self._token_expiry_date_format 139 140 @property 141 def access_token(self) -> str: 142 return self._access_token # type: ignore[return-value] 143 144 @access_token.setter 145 def access_token(self, value: str) -> None: 146 self._access_token = value # type: ignore[assignment] # Incorrect type for assignment
Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials. The generated access token is attached to each request via the Authorization header. If a connector_config is provided any mutation of it's value in the scope of this class will emit AirbyteControlConnectorConfigMessage.
33 def __init__( 34 self, 35 token_refresh_endpoint: str, 36 client_id: str, 37 client_secret: str, 38 refresh_token: str, 39 client_id_name: str = "client_id", 40 client_secret_name: str = "client_secret", 41 refresh_token_name: str = "refresh_token", 42 scopes: List[str] | None = None, 43 token_expiry_date: AirbyteDateTime | None = None, 44 token_expiry_date_format: str | None = None, 45 access_token_name: str = "access_token", 46 expires_in_name: str = "expires_in", 47 refresh_request_body: Mapping[str, Any] | None = None, 48 refresh_request_headers: Mapping[str, Any] | None = None, 49 send_refresh_request_as_query_params: bool = False, 50 grant_type_name: str = "grant_type", 51 grant_type: str = "refresh_token", 52 token_expiry_is_time_of_expiration: bool = False, 53 refresh_token_error_status_codes: Tuple[int, ...] = (), 54 refresh_token_error_key: str = "", 55 refresh_token_error_values: Tuple[str, ...] = (), 56 ) -> None: 57 self._token_refresh_endpoint = token_refresh_endpoint 58 self._client_secret_name = client_secret_name 59 self._client_secret = client_secret 60 self._client_id_name = client_id_name 61 self._client_id = client_id 62 self._refresh_token_name = refresh_token_name 63 self._refresh_token = refresh_token 64 self._scopes = scopes 65 self._access_token_name = access_token_name 66 self._expires_in_name = expires_in_name 67 self._refresh_request_body = refresh_request_body 68 self._refresh_request_headers = refresh_request_headers 69 self._send_refresh_request_as_query_params = send_refresh_request_as_query_params 70 self._grant_type_name = grant_type_name 71 self._grant_type = grant_type 72 73 self._token_expiry_date = token_expiry_date or (ab_datetime_now() - timedelta(days=1)) 74 self._token_expiry_date_format = token_expiry_date_format 75 self._token_expiry_is_time_of_expiration = token_expiry_is_time_of_expiration 76 self._access_token = None 77 super().__init__( 78 refresh_token_error_status_codes, refresh_token_error_key, refresh_token_error_values 79 )
If all of refresh_token_error_status_codes, refresh_token_error_key, and refresh_token_error_values are set, then http errors with such params will be wrapped in AirbyteTracedException.
111 def get_refresh_request_body(self) -> Mapping[str, Any]: 112 return self._refresh_request_body # type: ignore[return-value]
Returns the request body to set on the refresh request
114 def get_refresh_request_headers(self) -> Mapping[str, Any]: 115 return self._refresh_request_headers # type: ignore[return-value]
Returns the request headers to set on the refresh request
117 def should_send_refresh_request_as_query_params(self) -> bool: 118 return self._send_refresh_request_as_query_params
Returns True if the standard refresh args should be sent on the URL
query string instead of in the request body.
Defaults to False so existing authenticators retain their previous
behavior (params in body, no query params on the refresh URL). Subclasses
can override this to opt into the URL-query-string shape required by OAuth
providers like Gong.
129 def set_token_expiry_date(self, value: AirbyteDateTime) -> None: 130 self._token_expiry_date = value
Setter for access token expiration date
132 @property 133 def token_expiry_is_time_of_expiration(self) -> bool: 134 return self._token_expiry_is_time_of_expiration
Indicates that the Token Expiry returns the date until which the token will be valid, not the amount of time it will be valid.
136 @property 137 def token_expiry_date_format(self) -> Optional[str]: 138 return self._token_expiry_date_format
Format of the datetime; exists it if expires_in is returned as the expiration datetime instead of seconds until it expires
140 @property 141 def access_token(self) -> str: 142 return self._access_token # type: ignore[return-value]
Returns the access token
Inherited Members
149class SingleUseRefreshTokenOauth2Authenticator(Oauth2Authenticator): 150 """ 151 Authenticator that should be used for API implementing single use refresh tokens: 152 when refreshing access token some API returns a new refresh token that needs to used in the next refresh flow. 153 This authenticator updates the configuration with new refresh token by emitting Airbyte control message from an observed mutation. 154 By default, this authenticator expects a connector config with a "credentials" field with the following nested fields: client_id, 155 client_secret, refresh_token. This behavior can be changed by defining custom config path (using dpath paths) in client_id_config_path, 156 client_secret_config_path, refresh_token_config_path constructor arguments. 157 """ 158 159 def __init__( 160 self, 161 connector_config: Mapping[str, Any], 162 token_refresh_endpoint: str, 163 scopes: List[str] | None = None, 164 access_token_name: str = "access_token", 165 expires_in_name: str = "expires_in", 166 refresh_token_name: str = "refresh_token", 167 refresh_request_body: Mapping[str, Any] | None = None, 168 refresh_request_headers: Mapping[str, Any] | None = None, 169 send_refresh_request_as_query_params: bool = False, 170 grant_type_name: str = "grant_type", 171 grant_type: str = "refresh_token", 172 client_id_name: str = "client_id", 173 client_id: Optional[str] = None, 174 client_secret_name: str = "client_secret", 175 client_secret: Optional[str] = None, 176 access_token_config_path: Sequence[str] = ("credentials", "access_token"), 177 refresh_token_config_path: Sequence[str] = ("credentials", "refresh_token"), 178 token_expiry_date_config_path: Sequence[str] = ("credentials", "token_expiry_date"), 179 token_expiry_date_format: Optional[str] = None, 180 message_repository: MessageRepository = NoopMessageRepository(), 181 token_expiry_is_time_of_expiration: bool = False, 182 refresh_token_error_status_codes: Tuple[int, ...] = (), 183 refresh_token_error_key: str = "", 184 refresh_token_error_values: Tuple[str, ...] = (), 185 ) -> None: 186 """ 187 Args: 188 connector_config (Mapping[str, Any]): The full connector configuration 189 token_refresh_endpoint (str): Full URL to the token refresh endpoint 190 scopes (List[str], optional): List of OAuth scopes to pass in the refresh token request body. Defaults to None. 191 access_token_name (str, optional): Name of the access token field, used to parse the refresh token response. Defaults to "access_token". 192 expires_in_name (str, optional): Name of the name of the field that characterizes when the current access token will expire, used to parse the refresh token response. Defaults to "expires_in". 193 refresh_token_name (str, optional): Name of the name of the refresh token field, used to parse the refresh token response. Defaults to "refresh_token". 194 refresh_request_body (Mapping[str, Any], optional): Custom key value pair that will be added to the refresh token request body. Defaults to None. 195 refresh_request_headers (Mapping[str, Any], optional): Custom key value pair that will be added to the refresh token request headers. Defaults to None. 196 send_refresh_request_as_query_params (bool, optional): When True, the standard refresh args (`grant_type`, `refresh_token`, client credentials when not in an `Authorization` header, scopes, plus any `refresh_request_body` extras) are sent on the URL query string and the request body is emitted empty. Use this for OAuth providers like Gong that document their refresh endpoint with refresh args on the URL query string. Defaults to False. 197 grant_type (str, optional): OAuth grant type. Defaults to "refresh_token". 198 client_id (Optional[str]): The client id to authenticate. If not specified, defaults to credentials.client_id in the config object. 199 client_secret (Optional[str]): The client secret to authenticate. If not specified, defaults to credentials.client_secret in the config object. 200 access_token_config_path (Sequence[str]): Dpath to the access_token field in the connector configuration. Defaults to ("credentials", "access_token"). 201 refresh_token_config_path (Sequence[str]): Dpath to the refresh_token field in the connector configuration. Defaults to ("credentials", "refresh_token"). 202 token_expiry_date_config_path (Sequence[str]): Dpath to the token_expiry_date field in the connector configuration. Defaults to ("credentials", "token_expiry_date"). 203 token_expiry_date_format (Optional[str]): Date format of the token expiry date field (set by expires_in_name). If not specified the token expiry date is interpreted as number of seconds until expiration. 204 token_expiry_is_time_of_expiration bool: set True it if expires_in is returned as time of expiration instead of the number seconds until expiration 205 message_repository (MessageRepository): the message repository used to emit logs on HTTP requests and control message on config update 206 """ 207 self._connector_config = connector_config 208 self._client_id: str = self._get_config_value_by_path( 209 ("credentials", "client_id"), client_id 210 ) 211 self._client_secret: str = self._get_config_value_by_path( 212 ("credentials", "client_secret"), client_secret 213 ) 214 self._client_id_name = client_id_name 215 self._client_secret_name = client_secret_name 216 self._access_token_config_path = access_token_config_path 217 self._refresh_token_config_path = refresh_token_config_path 218 self._token_expiry_date_config_path = token_expiry_date_config_path 219 self._token_expiry_date_format = token_expiry_date_format 220 self._refresh_token_name = refresh_token_name 221 self._grant_type_name = grant_type_name 222 self._connector_config = connector_config 223 self.__message_repository = message_repository 224 super().__init__( 225 token_refresh_endpoint=token_refresh_endpoint, 226 client_id_name=self._client_id_name, 227 client_id=self._client_id, 228 client_secret_name=self._client_secret_name, 229 client_secret=self._client_secret, 230 refresh_token=self.get_refresh_token(), 231 refresh_token_name=self._refresh_token_name, 232 scopes=scopes, 233 token_expiry_date=self.get_token_expiry_date(), 234 access_token_name=access_token_name, 235 expires_in_name=expires_in_name, 236 refresh_request_body=refresh_request_body, 237 refresh_request_headers=refresh_request_headers, 238 send_refresh_request_as_query_params=send_refresh_request_as_query_params, 239 grant_type_name=self._grant_type_name, 240 grant_type=grant_type, 241 token_expiry_date_format=token_expiry_date_format, 242 token_expiry_is_time_of_expiration=token_expiry_is_time_of_expiration, 243 refresh_token_error_status_codes=refresh_token_error_status_codes, 244 refresh_token_error_key=refresh_token_error_key, 245 refresh_token_error_values=refresh_token_error_values, 246 ) 247 248 @property 249 def access_token(self) -> str: 250 """ 251 Retrieve the access token from the configuration. 252 253 Returns: 254 str: The access token. 255 """ 256 return self._get_config_value_by_path(self._access_token_config_path) # type: ignore[return-value] 257 258 @access_token.setter 259 def access_token(self, new_access_token: str) -> None: 260 """ 261 Sets a new access token. 262 263 Args: 264 new_access_token (str): The new access token to be set. 265 """ 266 self._set_config_value_by_path(self._access_token_config_path, new_access_token) 267 268 def get_refresh_token(self) -> str: 269 """ 270 Retrieve the refresh token from the configuration. 271 272 This method fetches the refresh token using the configuration path specified 273 by `_refresh_token_config_path`. 274 275 Returns: 276 str: The refresh token as a string. 277 """ 278 return self._get_config_value_by_path(self._refresh_token_config_path) # type: ignore[return-value] 279 280 def set_refresh_token(self, new_refresh_token: str) -> None: 281 """ 282 Updates the refresh token in the configuration. 283 284 Args: 285 new_refresh_token (str): The new refresh token to be set. 286 """ 287 self._set_config_value_by_path(self._refresh_token_config_path, new_refresh_token) 288 289 def get_token_expiry_date(self) -> AirbyteDateTime: 290 """ 291 Retrieves the token expiry date from the configuration. 292 293 This method fetches the token expiry date from the configuration using the specified path. 294 If the expiry date is an empty string, it returns the current date and time minus one day. 295 Otherwise, it parses the expiry date string into an AirbyteDateTime object. 296 297 Returns: 298 AirbyteDateTime: The parsed or calculated token expiry date. 299 300 Raises: 301 TypeError: If the result is not an instance of AirbyteDateTime. 302 """ 303 expiry_date = self._get_config_value_by_path(self._token_expiry_date_config_path) 304 result = ( 305 ab_datetime_now() - timedelta(days=1) 306 if expiry_date == "" 307 else ab_datetime_parse(str(expiry_date)) 308 ) 309 if isinstance(result, AirbyteDateTime): 310 return result 311 raise TypeError("Invalid datetime conversion") 312 313 def set_token_expiry_date(self, new_token_expiry_date: AirbyteDateTime) -> None: # type: ignore[override] 314 """ 315 Sets the token expiry date in the configuration. 316 317 Args: 318 new_token_expiry_date (AirbyteDateTime): The new expiry date for the token. 319 """ 320 self._set_config_value_by_path( 321 self._token_expiry_date_config_path, str(new_token_expiry_date) 322 ) 323 324 def token_has_expired(self) -> bool: 325 """Returns True if the token is expired""" 326 return ab_datetime_now() > self.get_token_expiry_date() 327 328 def get_access_token(self) -> str: 329 """Retrieve new access and refresh token if the access token has expired. 330 331 This method uses double-checked locking to ensure thread-safe token refresh. 332 This is especially critical for single-use refresh tokens where concurrent 333 refresh attempts would cause failures as the refresh token is invalidated 334 after first use. 335 336 The new refresh token is persisted with the set_refresh_token function. 337 338 Returns: 339 str: The current access_token, updated if it was previously expired. 340 """ 341 if self.token_has_expired(): 342 with self._token_refresh_lock: 343 # Double-check after acquiring lock - another thread may have already refreshed 344 if self.token_has_expired(): 345 self.refresh_and_set_access_token() 346 return self.access_token 347 348 def refresh_and_set_access_token(self) -> None: 349 """Force refresh the access token and update internal state. 350 351 For single-use refresh tokens, this also persists the new refresh token 352 and emits a control message to update the connector config. If the 353 response omits a refresh token, the existing one is preserved. 354 """ 355 new_access_token, access_token_expires_in, new_refresh_token = self.refresh_access_token() 356 self.access_token = new_access_token 357 if new_refresh_token is not None: 358 self.set_refresh_token(new_refresh_token) 359 self.set_token_expiry_date(access_token_expires_in) 360 self._emit_control_message() 361 362 def refresh_access_token(self) -> Tuple[str, AirbyteDateTime, Optional[str]]: # type: ignore[override] 363 """ 364 Refreshes the access token by making a handled request and extracting the necessary token information. 365 366 Returns: 367 A tuple of (access_token, token_expiry_date, refresh_token). The refresh token 368 is `None` when the OAuth provider omits it from the response. 369 """ 370 response_json = self._make_handled_request() 371 return ( 372 self._extract_access_token(response_json), 373 self._extract_token_expiry_date(response_json), 374 self._extract_refresh_token(response_json), 375 ) 376 377 def _set_config_value_by_path(self, config_path: Union[str, Sequence[str]], value: Any) -> None: 378 """ 379 Set a value in the connector configuration at the specified path. 380 381 Args: 382 config_path (Union[str, Sequence[str]]): The path within the configuration where the value should be set. 383 This can be a string representing a single key or a sequence of strings representing a nested path. 384 value (Any): The value to set at the specified path in the configuration. 385 386 Returns: 387 None 388 """ 389 dpath.new(self._connector_config, config_path, value) # type: ignore[arg-type] 390 391 def _get_config_value_by_path( 392 self, config_path: Union[str, Sequence[str]], default: Optional[str] = None 393 ) -> str | Any: 394 """ 395 Retrieve a value from the connector configuration using a specified path. 396 397 Args: 398 config_path (Union[str, Sequence[str]]): The path to the desired configuration value. This can be a string or a sequence of strings. 399 default (Optional[str], optional): The default value to return if the specified path does not exist in the configuration. Defaults to None. 400 401 Returns: 402 Any: The value from the configuration at the specified path, or the default value if the path does not exist. 403 """ 404 return dpath.get( 405 self._connector_config, # type: ignore[arg-type] 406 config_path, 407 default=default if default is not None else "", 408 ) 409 410 def _emit_control_message(self) -> None: 411 """ 412 Emits a control message based on the connector configuration. 413 414 Control messages for config updates (like refreshed tokens) must be printed directly 415 to stdout so the platform can process them immediately. The message repository is 416 also used to queue the message for any additional processing. 417 418 Note: 419 The function `emit_configuration_as_airbyte_control_message` prints directly to 420 stdout, which is required for the platform to detect and persist config changes. 421 """ 422 # Always emit to stdout so the platform can process the config update immediately. 423 # This is critical for single-use refresh tokens where the new token must be persisted 424 # before subsequent operations try to use the old (now invalid) token. 425 emit_configuration_as_airbyte_control_message(self._connector_config) # type: ignore[arg-type] 426 427 # Also emit to the message repository for any additional processing (e.g., logging) 428 if not isinstance(self._message_repository, NoopMessageRepository): 429 self._message_repository.emit_message( 430 create_connector_config_control_message(self._connector_config) # type: ignore[arg-type] 431 ) 432 433 @property 434 def _message_repository(self) -> MessageRepository: 435 """ 436 Overriding AbstractOauth2Authenticator._message_repository to allow for HTTP request logs 437 """ 438 return self.__message_repository
Authenticator that should be used for API implementing single use refresh tokens: when refreshing access token some API returns a new refresh token that needs to used in the next refresh flow. This authenticator updates the configuration with new refresh token by emitting Airbyte control message from an observed mutation. By default, this authenticator expects a connector config with a "credentials" field with the following nested fields: client_id, client_secret, refresh_token. This behavior can be changed by defining custom config path (using dpath paths) in client_id_config_path, client_secret_config_path, refresh_token_config_path constructor arguments.
159 def __init__( 160 self, 161 connector_config: Mapping[str, Any], 162 token_refresh_endpoint: str, 163 scopes: List[str] | None = None, 164 access_token_name: str = "access_token", 165 expires_in_name: str = "expires_in", 166 refresh_token_name: str = "refresh_token", 167 refresh_request_body: Mapping[str, Any] | None = None, 168 refresh_request_headers: Mapping[str, Any] | None = None, 169 send_refresh_request_as_query_params: bool = False, 170 grant_type_name: str = "grant_type", 171 grant_type: str = "refresh_token", 172 client_id_name: str = "client_id", 173 client_id: Optional[str] = None, 174 client_secret_name: str = "client_secret", 175 client_secret: Optional[str] = None, 176 access_token_config_path: Sequence[str] = ("credentials", "access_token"), 177 refresh_token_config_path: Sequence[str] = ("credentials", "refresh_token"), 178 token_expiry_date_config_path: Sequence[str] = ("credentials", "token_expiry_date"), 179 token_expiry_date_format: Optional[str] = None, 180 message_repository: MessageRepository = NoopMessageRepository(), 181 token_expiry_is_time_of_expiration: bool = False, 182 refresh_token_error_status_codes: Tuple[int, ...] = (), 183 refresh_token_error_key: str = "", 184 refresh_token_error_values: Tuple[str, ...] = (), 185 ) -> None: 186 """ 187 Args: 188 connector_config (Mapping[str, Any]): The full connector configuration 189 token_refresh_endpoint (str): Full URL to the token refresh endpoint 190 scopes (List[str], optional): List of OAuth scopes to pass in the refresh token request body. Defaults to None. 191 access_token_name (str, optional): Name of the access token field, used to parse the refresh token response. Defaults to "access_token". 192 expires_in_name (str, optional): Name of the name of the field that characterizes when the current access token will expire, used to parse the refresh token response. Defaults to "expires_in". 193 refresh_token_name (str, optional): Name of the name of the refresh token field, used to parse the refresh token response. Defaults to "refresh_token". 194 refresh_request_body (Mapping[str, Any], optional): Custom key value pair that will be added to the refresh token request body. Defaults to None. 195 refresh_request_headers (Mapping[str, Any], optional): Custom key value pair that will be added to the refresh token request headers. Defaults to None. 196 send_refresh_request_as_query_params (bool, optional): When True, the standard refresh args (`grant_type`, `refresh_token`, client credentials when not in an `Authorization` header, scopes, plus any `refresh_request_body` extras) are sent on the URL query string and the request body is emitted empty. Use this for OAuth providers like Gong that document their refresh endpoint with refresh args on the URL query string. Defaults to False. 197 grant_type (str, optional): OAuth grant type. Defaults to "refresh_token". 198 client_id (Optional[str]): The client id to authenticate. If not specified, defaults to credentials.client_id in the config object. 199 client_secret (Optional[str]): The client secret to authenticate. If not specified, defaults to credentials.client_secret in the config object. 200 access_token_config_path (Sequence[str]): Dpath to the access_token field in the connector configuration. Defaults to ("credentials", "access_token"). 201 refresh_token_config_path (Sequence[str]): Dpath to the refresh_token field in the connector configuration. Defaults to ("credentials", "refresh_token"). 202 token_expiry_date_config_path (Sequence[str]): Dpath to the token_expiry_date field in the connector configuration. Defaults to ("credentials", "token_expiry_date"). 203 token_expiry_date_format (Optional[str]): Date format of the token expiry date field (set by expires_in_name). If not specified the token expiry date is interpreted as number of seconds until expiration. 204 token_expiry_is_time_of_expiration bool: set True it if expires_in is returned as time of expiration instead of the number seconds until expiration 205 message_repository (MessageRepository): the message repository used to emit logs on HTTP requests and control message on config update 206 """ 207 self._connector_config = connector_config 208 self._client_id: str = self._get_config_value_by_path( 209 ("credentials", "client_id"), client_id 210 ) 211 self._client_secret: str = self._get_config_value_by_path( 212 ("credentials", "client_secret"), client_secret 213 ) 214 self._client_id_name = client_id_name 215 self._client_secret_name = client_secret_name 216 self._access_token_config_path = access_token_config_path 217 self._refresh_token_config_path = refresh_token_config_path 218 self._token_expiry_date_config_path = token_expiry_date_config_path 219 self._token_expiry_date_format = token_expiry_date_format 220 self._refresh_token_name = refresh_token_name 221 self._grant_type_name = grant_type_name 222 self._connector_config = connector_config 223 self.__message_repository = message_repository 224 super().__init__( 225 token_refresh_endpoint=token_refresh_endpoint, 226 client_id_name=self._client_id_name, 227 client_id=self._client_id, 228 client_secret_name=self._client_secret_name, 229 client_secret=self._client_secret, 230 refresh_token=self.get_refresh_token(), 231 refresh_token_name=self._refresh_token_name, 232 scopes=scopes, 233 token_expiry_date=self.get_token_expiry_date(), 234 access_token_name=access_token_name, 235 expires_in_name=expires_in_name, 236 refresh_request_body=refresh_request_body, 237 refresh_request_headers=refresh_request_headers, 238 send_refresh_request_as_query_params=send_refresh_request_as_query_params, 239 grant_type_name=self._grant_type_name, 240 grant_type=grant_type, 241 token_expiry_date_format=token_expiry_date_format, 242 token_expiry_is_time_of_expiration=token_expiry_is_time_of_expiration, 243 refresh_token_error_status_codes=refresh_token_error_status_codes, 244 refresh_token_error_key=refresh_token_error_key, 245 refresh_token_error_values=refresh_token_error_values, 246 )
Arguments:
- connector_config (Mapping[str, Any]): The full connector configuration
- token_refresh_endpoint (str): Full URL to the token refresh endpoint
- scopes (List[str], optional): List of OAuth scopes to pass in the refresh token request body. Defaults to None.
- access_token_name (str, optional): Name of the access token field, used to parse the refresh token response. Defaults to "access_token".
- expires_in_name (str, optional): Name of the name of the field that characterizes when the current access token will expire, used to parse the refresh token response. Defaults to "expires_in".
- refresh_token_name (str, optional): Name of the name of the refresh token field, used to parse the refresh token response. Defaults to "refresh_token".
- refresh_request_body (Mapping[str, Any], optional): Custom key value pair that will be added to the refresh token request body. Defaults to None.
- refresh_request_headers (Mapping[str, Any], optional): Custom key value pair that will be added to the refresh token request headers. Defaults to None.
- send_refresh_request_as_query_params (bool, optional): When True, the standard refresh args (
grant_type,refresh_token, client credentials when not in anAuthorizationheader, scopes, plus anyrefresh_request_bodyextras) are sent on the URL query string and the request body is emitted empty. Use this for OAuth providers like Gong that document their refresh endpoint with refresh args on the URL query string. Defaults to False. - grant_type (str, optional): OAuth grant type. Defaults to "refresh_token".
- client_id (Optional[str]): The client id to authenticate. If not specified, defaults to credentials.client_id in the config object.
- client_secret (Optional[str]): The client secret to authenticate. If not specified, defaults to credentials.client_secret in the config object.
- access_token_config_path (Sequence[str]): Dpath to the access_token field in the connector configuration. Defaults to ("credentials", "access_token").
- refresh_token_config_path (Sequence[str]): Dpath to the refresh_token field in the connector configuration. Defaults to ("credentials", "refresh_token").
- token_expiry_date_config_path (Sequence[str]): Dpath to the token_expiry_date field in the connector configuration. Defaults to ("credentials", "token_expiry_date").
- token_expiry_date_format (Optional[str]): Date format of the token expiry date field (set by expires_in_name). If not specified the token expiry date is interpreted as number of seconds until expiration.
- token_expiry_is_time_of_expiration bool: set True it if expires_in is returned as time of expiration instead of the number seconds until expiration
- message_repository (MessageRepository): the message repository used to emit logs on HTTP requests and control message on config update
248 @property 249 def access_token(self) -> str: 250 """ 251 Retrieve the access token from the configuration. 252 253 Returns: 254 str: The access token. 255 """ 256 return self._get_config_value_by_path(self._access_token_config_path) # type: ignore[return-value]
Retrieve the access token from the configuration.
Returns:
str: The access token.
268 def get_refresh_token(self) -> str: 269 """ 270 Retrieve the refresh token from the configuration. 271 272 This method fetches the refresh token using the configuration path specified 273 by `_refresh_token_config_path`. 274 275 Returns: 276 str: The refresh token as a string. 277 """ 278 return self._get_config_value_by_path(self._refresh_token_config_path) # type: ignore[return-value]
Retrieve the refresh token from the configuration.
This method fetches the refresh token using the configuration path specified
by _refresh_token_config_path.
Returns:
str: The refresh token as a string.
280 def set_refresh_token(self, new_refresh_token: str) -> None: 281 """ 282 Updates the refresh token in the configuration. 283 284 Args: 285 new_refresh_token (str): The new refresh token to be set. 286 """ 287 self._set_config_value_by_path(self._refresh_token_config_path, new_refresh_token)
Updates the refresh token in the configuration.
Arguments:
- new_refresh_token (str): The new refresh token to be set.
289 def get_token_expiry_date(self) -> AirbyteDateTime: 290 """ 291 Retrieves the token expiry date from the configuration. 292 293 This method fetches the token expiry date from the configuration using the specified path. 294 If the expiry date is an empty string, it returns the current date and time minus one day. 295 Otherwise, it parses the expiry date string into an AirbyteDateTime object. 296 297 Returns: 298 AirbyteDateTime: The parsed or calculated token expiry date. 299 300 Raises: 301 TypeError: If the result is not an instance of AirbyteDateTime. 302 """ 303 expiry_date = self._get_config_value_by_path(self._token_expiry_date_config_path) 304 result = ( 305 ab_datetime_now() - timedelta(days=1) 306 if expiry_date == "" 307 else ab_datetime_parse(str(expiry_date)) 308 ) 309 if isinstance(result, AirbyteDateTime): 310 return result 311 raise TypeError("Invalid datetime conversion")
Retrieves the token expiry date from the configuration.
This method fetches the token expiry date from the configuration using the specified path. If the expiry date is an empty string, it returns the current date and time minus one day. Otherwise, it parses the expiry date string into an AirbyteDateTime object.
Returns:
AirbyteDateTime: The parsed or calculated token expiry date.
Raises:
- TypeError: If the result is not an instance of AirbyteDateTime.
313 def set_token_expiry_date(self, new_token_expiry_date: AirbyteDateTime) -> None: # type: ignore[override] 314 """ 315 Sets the token expiry date in the configuration. 316 317 Args: 318 new_token_expiry_date (AirbyteDateTime): The new expiry date for the token. 319 """ 320 self._set_config_value_by_path( 321 self._token_expiry_date_config_path, str(new_token_expiry_date) 322 )
Sets the token expiry date in the configuration.
Arguments:
- new_token_expiry_date (AirbyteDateTime): The new expiry date for the token.
324 def token_has_expired(self) -> bool: 325 """Returns True if the token is expired""" 326 return ab_datetime_now() > self.get_token_expiry_date()
Returns True if the token is expired
328 def get_access_token(self) -> str: 329 """Retrieve new access and refresh token if the access token has expired. 330 331 This method uses double-checked locking to ensure thread-safe token refresh. 332 This is especially critical for single-use refresh tokens where concurrent 333 refresh attempts would cause failures as the refresh token is invalidated 334 after first use. 335 336 The new refresh token is persisted with the set_refresh_token function. 337 338 Returns: 339 str: The current access_token, updated if it was previously expired. 340 """ 341 if self.token_has_expired(): 342 with self._token_refresh_lock: 343 # Double-check after acquiring lock - another thread may have already refreshed 344 if self.token_has_expired(): 345 self.refresh_and_set_access_token() 346 return self.access_token
Retrieve new access and refresh token if the access token has expired.
This method uses double-checked locking to ensure thread-safe token refresh. This is especially critical for single-use refresh tokens where concurrent refresh attempts would cause failures as the refresh token is invalidated after first use.
The new refresh token is persisted with the set_refresh_token function.
Returns:
str: The current access_token, updated if it was previously expired.
348 def refresh_and_set_access_token(self) -> None: 349 """Force refresh the access token and update internal state. 350 351 For single-use refresh tokens, this also persists the new refresh token 352 and emits a control message to update the connector config. If the 353 response omits a refresh token, the existing one is preserved. 354 """ 355 new_access_token, access_token_expires_in, new_refresh_token = self.refresh_access_token() 356 self.access_token = new_access_token 357 if new_refresh_token is not None: 358 self.set_refresh_token(new_refresh_token) 359 self.set_token_expiry_date(access_token_expires_in) 360 self._emit_control_message()
Force refresh the access token and update internal state.
For single-use refresh tokens, this also persists the new refresh token and emits a control message to update the connector config. If the response omits a refresh token, the existing one is preserved.
362 def refresh_access_token(self) -> Tuple[str, AirbyteDateTime, Optional[str]]: # type: ignore[override] 363 """ 364 Refreshes the access token by making a handled request and extracting the necessary token information. 365 366 Returns: 367 A tuple of (access_token, token_expiry_date, refresh_token). The refresh token 368 is `None` when the OAuth provider omits it from the response. 369 """ 370 response_json = self._make_handled_request() 371 return ( 372 self._extract_access_token(response_json), 373 self._extract_token_expiry_date(response_json), 374 self._extract_refresh_token(response_json), 375 )
Refreshes the access token by making a handled request and extracting the necessary token information.
Returns:
A tuple of (access_token, token_expiry_date, refresh_token). The refresh token is
Nonewhen the OAuth provider omits it from the response.
Inherited Members
- Oauth2Authenticator
- get_token_refresh_endpoint
- get_client_id_name
- get_client_id
- get_client_secret_name
- get_client_secret
- get_refresh_token_name
- get_access_token_name
- get_scopes
- get_expires_in_name
- get_refresh_request_body
- get_refresh_request_headers
- should_send_refresh_request_as_query_params
- get_grant_type_name
- get_grant_type
- token_expiry_is_time_of_expiration
- token_expiry_date_format
39class TokenAuthenticator(AbstractHeaderAuthenticator): 40 """ 41 Builds auth header, based on the token provided. 42 The token is attached to each request via the `auth_header` header. 43 """ 44 45 @property 46 def auth_header(self) -> str: 47 return self._auth_header 48 49 @property 50 def token(self) -> str: 51 return f"{self._auth_method} {self._token}" 52 53 def __init__(self, token: str, auth_method: str = "Bearer", auth_header: str = "Authorization"): 54 self._auth_header = auth_header 55 self._auth_method = auth_method 56 self._token = token
Builds auth header, based on the token provided.
The token is attached to each request via the auth_header header.
15class MultipleTokenAuthenticator(AbstractHeaderAuthenticator): 16 """ 17 Builds auth header, based on the list of tokens provided. 18 Auth header is changed per each `get_auth_header` call, using each token in cycle. 19 The token is attached to each request via the `auth_header` header. 20 """ 21 22 @property 23 def auth_header(self) -> str: 24 return self._auth_header 25 26 @property 27 def token(self) -> str: 28 return f"{self._auth_method} {next(self._tokens_iter)}" 29 30 def __init__( 31 self, tokens: List[str], auth_method: str = "Bearer", auth_header: str = "Authorization" 32 ): 33 self._auth_method = auth_method 34 self._auth_header = auth_header 35 self._tokens = tokens 36 self._tokens_iter = cycle(self._tokens)
Builds auth header, based on the list of tokens provided.
Auth header is changed per each get_auth_header call, using each token in cycle.
The token is attached to each request via the auth_header header.
59class BasicHttpAuthenticator(AbstractHeaderAuthenticator): 60 """ 61 Builds auth based off the basic authentication scheme as defined by RFC 7617, which transmits credentials as USER ID/password pairs, encoded using bas64 62 https://developer.mozilla.org/en-US/docs/Web/HTTP/Authentication#basic_authentication_scheme 63 """ 64 65 @property 66 def auth_header(self) -> str: 67 return self._auth_header 68 69 @property 70 def token(self) -> str: 71 return f"{self._auth_method} {self._token}" 72 73 def __init__( 74 self, 75 username: str, 76 password: str = "", 77 auth_method: str = "Basic", 78 auth_header: str = "Authorization", 79 ): 80 auth_string = f"{username}:{password}".encode("utf8") 81 b64_encoded = base64.b64encode(auth_string).decode("utf8") 82 self._auth_header = auth_header 83 self._auth_method = auth_method 84 self._token = b64_encoded
Builds auth based off the basic authentication scheme as defined by RFC 7617, which transmits credentials as USER ID/password pairs, encoded using bas64 https://developer.mozilla.org/en-US/docs/Web/HTTP/Authentication#basic_authentication_scheme
73 def __init__( 74 self, 75 username: str, 76 password: str = "", 77 auth_method: str = "Basic", 78 auth_header: str = "Authorization", 79 ): 80 auth_string = f"{username}:{password}".encode("utf8") 81 b64_encoded = base64.b64encode(auth_string).decode("utf8") 82 self._auth_header = auth_header 83 self._auth_method = auth_method 84 self._token = b64_encoded