airbyte_cdk.sources.streams.http.requests_native_auth
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from .oauth import Oauth2Authenticator, SingleUseRefreshTokenOauth2Authenticator 6from .token import BasicHttpAuthenticator, MultipleTokenAuthenticator, TokenAuthenticator 7 8__all__ = [ 9 "Oauth2Authenticator", 10 "SingleUseRefreshTokenOauth2Authenticator", 11 "TokenAuthenticator", 12 "MultipleTokenAuthenticator", 13 "BasicHttpAuthenticator", 14]
26class Oauth2Authenticator(AbstractOauth2Authenticator): 27 """ 28 Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials. 29 The generated access token is attached to each request via the Authorization header. 30 If a connector_config is provided any mutation of it's value in the scope of this class will emit AirbyteControlConnectorConfigMessage. 31 """ 32 33 def __init__( 34 self, 35 token_refresh_endpoint: str, 36 client_id: str, 37 client_secret: str, 38 refresh_token: str, 39 client_id_name: str = "client_id", 40 client_secret_name: str = "client_secret", 41 refresh_token_name: str = "refresh_token", 42 scopes: List[str] | None = None, 43 token_expiry_date: AirbyteDateTime | None = None, 44 token_expiry_date_format: str | None = None, 45 access_token_name: str = "access_token", 46 expires_in_name: str = "expires_in", 47 refresh_request_body: Mapping[str, Any] | None = None, 48 refresh_request_headers: Mapping[str, Any] | None = None, 49 grant_type_name: str = "grant_type", 50 grant_type: str = "refresh_token", 51 token_expiry_is_time_of_expiration: bool = False, 52 refresh_token_error_status_codes: Tuple[int, ...] = (), 53 refresh_token_error_key: str = "", 54 refresh_token_error_values: Tuple[str, ...] = (), 55 ) -> None: 56 self._token_refresh_endpoint = token_refresh_endpoint 57 self._client_secret_name = client_secret_name 58 self._client_secret = client_secret 59 self._client_id_name = client_id_name 60 self._client_id = client_id 61 self._refresh_token_name = refresh_token_name 62 self._refresh_token = refresh_token 63 self._scopes = scopes 64 self._access_token_name = access_token_name 65 self._expires_in_name = expires_in_name 66 self._refresh_request_body = refresh_request_body 67 self._refresh_request_headers = refresh_request_headers 68 self._grant_type_name = grant_type_name 69 self._grant_type = grant_type 70 71 self._token_expiry_date = token_expiry_date or (ab_datetime_now() - timedelta(days=1)) 72 self._token_expiry_date_format = token_expiry_date_format 73 self._token_expiry_is_time_of_expiration = token_expiry_is_time_of_expiration 74 self._access_token = None 75 super().__init__( 76 refresh_token_error_status_codes, refresh_token_error_key, refresh_token_error_values 77 ) 78 79 def get_token_refresh_endpoint(self) -> str: 80 return self._token_refresh_endpoint 81 82 def get_client_id_name(self) -> str: 83 return self._client_id_name 84 85 def get_client_id(self) -> str: 86 return self._client_id 87 88 def get_client_secret_name(self) -> str: 89 return self._client_secret_name 90 91 def get_client_secret(self) -> str: 92 return self._client_secret 93 94 def get_refresh_token_name(self) -> str: 95 return self._refresh_token_name 96 97 def get_refresh_token(self) -> str: 98 return self._refresh_token 99 100 def get_access_token_name(self) -> str: 101 return self._access_token_name 102 103 def get_scopes(self) -> list[str]: 104 return self._scopes # type: ignore[return-value] 105 106 def get_expires_in_name(self) -> str: 107 return self._expires_in_name 108 109 def get_refresh_request_body(self) -> Mapping[str, Any]: 110 return self._refresh_request_body # type: ignore[return-value] 111 112 def get_refresh_request_headers(self) -> Mapping[str, Any]: 113 return self._refresh_request_headers # type: ignore[return-value] 114 115 def get_grant_type_name(self) -> str: 116 return self._grant_type_name 117 118 def get_grant_type(self) -> str: 119 return self._grant_type 120 121 def get_token_expiry_date(self) -> AirbyteDateTime: 122 return self._token_expiry_date 123 124 def set_token_expiry_date(self, value: AirbyteDateTime) -> None: 125 self._token_expiry_date = value 126 127 @property 128 def token_expiry_is_time_of_expiration(self) -> bool: 129 return self._token_expiry_is_time_of_expiration 130 131 @property 132 def token_expiry_date_format(self) -> Optional[str]: 133 return self._token_expiry_date_format 134 135 @property 136 def access_token(self) -> str: 137 return self._access_token # type: ignore[return-value] 138 139 @access_token.setter 140 def access_token(self, value: str) -> None: 141 self._access_token = value # type: ignore[assignment] # Incorrect type for assignment
Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials. The generated access token is attached to each request via the Authorization header. If a connector_config is provided any mutation of it's value in the scope of this class will emit AirbyteControlConnectorConfigMessage.
33 def __init__( 34 self, 35 token_refresh_endpoint: str, 36 client_id: str, 37 client_secret: str, 38 refresh_token: str, 39 client_id_name: str = "client_id", 40 client_secret_name: str = "client_secret", 41 refresh_token_name: str = "refresh_token", 42 scopes: List[str] | None = None, 43 token_expiry_date: AirbyteDateTime | None = None, 44 token_expiry_date_format: str | None = None, 45 access_token_name: str = "access_token", 46 expires_in_name: str = "expires_in", 47 refresh_request_body: Mapping[str, Any] | None = None, 48 refresh_request_headers: Mapping[str, Any] | None = None, 49 grant_type_name: str = "grant_type", 50 grant_type: str = "refresh_token", 51 token_expiry_is_time_of_expiration: bool = False, 52 refresh_token_error_status_codes: Tuple[int, ...] = (), 53 refresh_token_error_key: str = "", 54 refresh_token_error_values: Tuple[str, ...] = (), 55 ) -> None: 56 self._token_refresh_endpoint = token_refresh_endpoint 57 self._client_secret_name = client_secret_name 58 self._client_secret = client_secret 59 self._client_id_name = client_id_name 60 self._client_id = client_id 61 self._refresh_token_name = refresh_token_name 62 self._refresh_token = refresh_token 63 self._scopes = scopes 64 self._access_token_name = access_token_name 65 self._expires_in_name = expires_in_name 66 self._refresh_request_body = refresh_request_body 67 self._refresh_request_headers = refresh_request_headers 68 self._grant_type_name = grant_type_name 69 self._grant_type = grant_type 70 71 self._token_expiry_date = token_expiry_date or (ab_datetime_now() - timedelta(days=1)) 72 self._token_expiry_date_format = token_expiry_date_format 73 self._token_expiry_is_time_of_expiration = token_expiry_is_time_of_expiration 74 self._access_token = None 75 super().__init__( 76 refresh_token_error_status_codes, refresh_token_error_key, refresh_token_error_values 77 )
If all of refresh_token_error_status_codes, refresh_token_error_key, and refresh_token_error_values are set, then http errors with such params will be wrapped in AirbyteTracedException.
109 def get_refresh_request_body(self) -> Mapping[str, Any]: 110 return self._refresh_request_body # type: ignore[return-value]
Returns the request body to set on the refresh request
112 def get_refresh_request_headers(self) -> Mapping[str, Any]: 113 return self._refresh_request_headers # type: ignore[return-value]
Returns the request headers to set on the refresh request
124 def set_token_expiry_date(self, value: AirbyteDateTime) -> None: 125 self._token_expiry_date = value
Setter for access token expiration date
127 @property 128 def token_expiry_is_time_of_expiration(self) -> bool: 129 return self._token_expiry_is_time_of_expiration
Indicates that the Token Expiry returns the date until which the token will be valid, not the amount of time it will be valid.
131 @property 132 def token_expiry_date_format(self) -> Optional[str]: 133 return self._token_expiry_date_format
Format of the datetime; exists it if expires_in is returned as the expiration datetime instead of seconds until it expires
144class SingleUseRefreshTokenOauth2Authenticator(Oauth2Authenticator): 145 """ 146 Authenticator that should be used for API implementing single use refresh tokens: 147 when refreshing access token some API returns a new refresh token that needs to used in the next refresh flow. 148 This authenticator updates the configuration with new refresh token by emitting Airbyte control message from an observed mutation. 149 By default, this authenticator expects a connector config with a "credentials" field with the following nested fields: client_id, 150 client_secret, refresh_token. This behavior can be changed by defining custom config path (using dpath paths) in client_id_config_path, 151 client_secret_config_path, refresh_token_config_path constructor arguments. 152 """ 153 154 def __init__( 155 self, 156 connector_config: Mapping[str, Any], 157 token_refresh_endpoint: str, 158 scopes: List[str] | None = None, 159 access_token_name: str = "access_token", 160 expires_in_name: str = "expires_in", 161 refresh_token_name: str = "refresh_token", 162 refresh_request_body: Mapping[str, Any] | None = None, 163 refresh_request_headers: Mapping[str, Any] | None = None, 164 grant_type_name: str = "grant_type", 165 grant_type: str = "refresh_token", 166 client_id_name: str = "client_id", 167 client_id: Optional[str] = None, 168 client_secret_name: str = "client_secret", 169 client_secret: Optional[str] = None, 170 access_token_config_path: Sequence[str] = ("credentials", "access_token"), 171 refresh_token_config_path: Sequence[str] = ("credentials", "refresh_token"), 172 token_expiry_date_config_path: Sequence[str] = ("credentials", "token_expiry_date"), 173 token_expiry_date_format: Optional[str] = None, 174 message_repository: MessageRepository = NoopMessageRepository(), 175 token_expiry_is_time_of_expiration: bool = False, 176 refresh_token_error_status_codes: Tuple[int, ...] = (), 177 refresh_token_error_key: str = "", 178 refresh_token_error_values: Tuple[str, ...] = (), 179 ) -> None: 180 """ 181 Args: 182 connector_config (Mapping[str, Any]): The full connector configuration 183 token_refresh_endpoint (str): Full URL to the token refresh endpoint 184 scopes (List[str], optional): List of OAuth scopes to pass in the refresh token request body. Defaults to None. 185 access_token_name (str, optional): Name of the access token field, used to parse the refresh token response. Defaults to "access_token". 186 expires_in_name (str, optional): Name of the name of the field that characterizes when the current access token will expire, used to parse the refresh token response. Defaults to "expires_in". 187 refresh_token_name (str, optional): Name of the name of the refresh token field, used to parse the refresh token response. Defaults to "refresh_token". 188 refresh_request_body (Mapping[str, Any], optional): Custom key value pair that will be added to the refresh token request body. Defaults to None. 189 refresh_request_headers (Mapping[str, Any], optional): Custom key value pair that will be added to the refresh token request headers. Defaults to None. 190 grant_type (str, optional): OAuth grant type. Defaults to "refresh_token". 191 client_id (Optional[str]): The client id to authenticate. If not specified, defaults to credentials.client_id in the config object. 192 client_secret (Optional[str]): The client secret to authenticate. If not specified, defaults to credentials.client_secret in the config object. 193 access_token_config_path (Sequence[str]): Dpath to the access_token field in the connector configuration. Defaults to ("credentials", "access_token"). 194 refresh_token_config_path (Sequence[str]): Dpath to the refresh_token field in the connector configuration. Defaults to ("credentials", "refresh_token"). 195 token_expiry_date_config_path (Sequence[str]): Dpath to the token_expiry_date field in the connector configuration. Defaults to ("credentials", "token_expiry_date"). 196 token_expiry_date_format (Optional[str]): Date format of the token expiry date field (set by expires_in_name). If not specified the token expiry date is interpreted as number of seconds until expiration. 197 token_expiry_is_time_of_expiration bool: set True it if expires_in is returned as time of expiration instead of the number seconds until expiration 198 message_repository (MessageRepository): the message repository used to emit logs on HTTP requests and control message on config update 199 """ 200 self._connector_config = connector_config 201 self._client_id: str = self._get_config_value_by_path( 202 ("credentials", "client_id"), client_id 203 ) 204 self._client_secret: str = self._get_config_value_by_path( 205 ("credentials", "client_secret"), client_secret 206 ) 207 self._client_id_name = client_id_name 208 self._client_secret_name = client_secret_name 209 self._access_token_config_path = access_token_config_path 210 self._refresh_token_config_path = refresh_token_config_path 211 self._token_expiry_date_config_path = token_expiry_date_config_path 212 self._token_expiry_date_format = token_expiry_date_format 213 self._refresh_token_name = refresh_token_name 214 self._grant_type_name = grant_type_name 215 self._connector_config = connector_config 216 self.__message_repository = message_repository 217 super().__init__( 218 token_refresh_endpoint=token_refresh_endpoint, 219 client_id_name=self._client_id_name, 220 client_id=self._client_id, 221 client_secret_name=self._client_secret_name, 222 client_secret=self._client_secret, 223 refresh_token=self.get_refresh_token(), 224 refresh_token_name=self._refresh_token_name, 225 scopes=scopes, 226 token_expiry_date=self.get_token_expiry_date(), 227 access_token_name=access_token_name, 228 expires_in_name=expires_in_name, 229 refresh_request_body=refresh_request_body, 230 refresh_request_headers=refresh_request_headers, 231 grant_type_name=self._grant_type_name, 232 grant_type=grant_type, 233 token_expiry_date_format=token_expiry_date_format, 234 token_expiry_is_time_of_expiration=token_expiry_is_time_of_expiration, 235 refresh_token_error_status_codes=refresh_token_error_status_codes, 236 refresh_token_error_key=refresh_token_error_key, 237 refresh_token_error_values=refresh_token_error_values, 238 ) 239 240 @property 241 def access_token(self) -> str: 242 """ 243 Retrieve the access token from the configuration. 244 245 Returns: 246 str: The access token. 247 """ 248 return self._get_config_value_by_path(self._access_token_config_path) # type: ignore[return-value] 249 250 @access_token.setter 251 def access_token(self, new_access_token: str) -> None: 252 """ 253 Sets a new access token. 254 255 Args: 256 new_access_token (str): The new access token to be set. 257 """ 258 self._set_config_value_by_path(self._access_token_config_path, new_access_token) 259 260 def get_refresh_token(self) -> str: 261 """ 262 Retrieve the refresh token from the configuration. 263 264 This method fetches the refresh token using the configuration path specified 265 by `_refresh_token_config_path`. 266 267 Returns: 268 str: The refresh token as a string. 269 """ 270 return self._get_config_value_by_path(self._refresh_token_config_path) # type: ignore[return-value] 271 272 def set_refresh_token(self, new_refresh_token: str) -> None: 273 """ 274 Updates the refresh token in the configuration. 275 276 Args: 277 new_refresh_token (str): The new refresh token to be set. 278 """ 279 self._set_config_value_by_path(self._refresh_token_config_path, new_refresh_token) 280 281 def get_token_expiry_date(self) -> AirbyteDateTime: 282 """ 283 Retrieves the token expiry date from the configuration. 284 285 This method fetches the token expiry date from the configuration using the specified path. 286 If the expiry date is an empty string, it returns the current date and time minus one day. 287 Otherwise, it parses the expiry date string into an AirbyteDateTime object. 288 289 Returns: 290 AirbyteDateTime: The parsed or calculated token expiry date. 291 292 Raises: 293 TypeError: If the result is not an instance of AirbyteDateTime. 294 """ 295 expiry_date = self._get_config_value_by_path(self._token_expiry_date_config_path) 296 result = ( 297 ab_datetime_now() - timedelta(days=1) 298 if expiry_date == "" 299 else ab_datetime_parse(str(expiry_date)) 300 ) 301 if isinstance(result, AirbyteDateTime): 302 return result 303 raise TypeError("Invalid datetime conversion") 304 305 def set_token_expiry_date(self, new_token_expiry_date: AirbyteDateTime) -> None: # type: ignore[override] 306 """ 307 Sets the token expiry date in the configuration. 308 309 Args: 310 new_token_expiry_date (AirbyteDateTime): The new expiry date for the token. 311 """ 312 self._set_config_value_by_path( 313 self._token_expiry_date_config_path, str(new_token_expiry_date) 314 ) 315 316 def token_has_expired(self) -> bool: 317 """Returns True if the token is expired""" 318 return ab_datetime_now() > self.get_token_expiry_date() 319 320 def get_access_token(self) -> str: 321 """Retrieve new access and refresh token if the access token has expired. 322 323 This method uses double-checked locking to ensure thread-safe token refresh. 324 This is especially critical for single-use refresh tokens where concurrent 325 refresh attempts would cause failures as the refresh token is invalidated 326 after first use. 327 328 The new refresh token is persisted with the set_refresh_token function. 329 330 Returns: 331 str: The current access_token, updated if it was previously expired. 332 """ 333 if self.token_has_expired(): 334 with self._token_refresh_lock: 335 # Double-check after acquiring lock - another thread may have already refreshed 336 if self.token_has_expired(): 337 self.refresh_and_set_access_token() 338 return self.access_token 339 340 def refresh_and_set_access_token(self) -> None: 341 """Force refresh the access token and update internal state. 342 343 For single-use refresh tokens, this also persists the new refresh token 344 and emits a control message to update the connector config. 345 """ 346 new_access_token, access_token_expires_in, new_refresh_token = self.refresh_access_token() 347 self.access_token = new_access_token 348 self.set_refresh_token(new_refresh_token) 349 self.set_token_expiry_date(access_token_expires_in) 350 self._emit_control_message() 351 352 def refresh_access_token(self) -> Tuple[str, AirbyteDateTime, str]: # type: ignore[override] 353 """ 354 Refreshes the access token by making a handled request and extracting the necessary token information. 355 356 Returns: 357 Tuple[str, str, str]: A tuple containing the new access token, token expiry date, and refresh token. 358 """ 359 response_json = self._make_handled_request() 360 return ( 361 self._extract_access_token(response_json), 362 self._extract_token_expiry_date(response_json), 363 self._extract_refresh_token(response_json), 364 ) 365 366 def _set_config_value_by_path(self, config_path: Union[str, Sequence[str]], value: Any) -> None: 367 """ 368 Set a value in the connector configuration at the specified path. 369 370 Args: 371 config_path (Union[str, Sequence[str]]): The path within the configuration where the value should be set. 372 This can be a string representing a single key or a sequence of strings representing a nested path. 373 value (Any): The value to set at the specified path in the configuration. 374 375 Returns: 376 None 377 """ 378 dpath.new(self._connector_config, config_path, value) # type: ignore[arg-type] 379 380 def _get_config_value_by_path( 381 self, config_path: Union[str, Sequence[str]], default: Optional[str] = None 382 ) -> str | Any: 383 """ 384 Retrieve a value from the connector configuration using a specified path. 385 386 Args: 387 config_path (Union[str, Sequence[str]]): The path to the desired configuration value. This can be a string or a sequence of strings. 388 default (Optional[str], optional): The default value to return if the specified path does not exist in the configuration. Defaults to None. 389 390 Returns: 391 Any: The value from the configuration at the specified path, or the default value if the path does not exist. 392 """ 393 return dpath.get( 394 self._connector_config, # type: ignore[arg-type] 395 config_path, 396 default=default if default is not None else "", 397 ) 398 399 def _emit_control_message(self) -> None: 400 """ 401 Emits a control message based on the connector configuration. 402 403 Control messages for config updates (like refreshed tokens) must be printed directly 404 to stdout so the platform can process them immediately. The message repository is 405 also used to queue the message for any additional processing. 406 407 Note: 408 The function `emit_configuration_as_airbyte_control_message` prints directly to 409 stdout, which is required for the platform to detect and persist config changes. 410 """ 411 # Always emit to stdout so the platform can process the config update immediately. 412 # This is critical for single-use refresh tokens where the new token must be persisted 413 # before subsequent operations try to use the old (now invalid) token. 414 emit_configuration_as_airbyte_control_message(self._connector_config) # type: ignore[arg-type] 415 416 # Also emit to the message repository for any additional processing (e.g., logging) 417 if not isinstance(self._message_repository, NoopMessageRepository): 418 self._message_repository.emit_message( 419 create_connector_config_control_message(self._connector_config) # type: ignore[arg-type] 420 ) 421 422 @property 423 def _message_repository(self) -> MessageRepository: 424 """ 425 Overriding AbstractOauth2Authenticator._message_repository to allow for HTTP request logs 426 """ 427 return self.__message_repository
Authenticator that should be used for API implementing single use refresh tokens: when refreshing access token some API returns a new refresh token that needs to used in the next refresh flow. This authenticator updates the configuration with new refresh token by emitting Airbyte control message from an observed mutation. By default, this authenticator expects a connector config with a "credentials" field with the following nested fields: client_id, client_secret, refresh_token. This behavior can be changed by defining custom config path (using dpath paths) in client_id_config_path, client_secret_config_path, refresh_token_config_path constructor arguments.
154 def __init__( 155 self, 156 connector_config: Mapping[str, Any], 157 token_refresh_endpoint: str, 158 scopes: List[str] | None = None, 159 access_token_name: str = "access_token", 160 expires_in_name: str = "expires_in", 161 refresh_token_name: str = "refresh_token", 162 refresh_request_body: Mapping[str, Any] | None = None, 163 refresh_request_headers: Mapping[str, Any] | None = None, 164 grant_type_name: str = "grant_type", 165 grant_type: str = "refresh_token", 166 client_id_name: str = "client_id", 167 client_id: Optional[str] = None, 168 client_secret_name: str = "client_secret", 169 client_secret: Optional[str] = None, 170 access_token_config_path: Sequence[str] = ("credentials", "access_token"), 171 refresh_token_config_path: Sequence[str] = ("credentials", "refresh_token"), 172 token_expiry_date_config_path: Sequence[str] = ("credentials", "token_expiry_date"), 173 token_expiry_date_format: Optional[str] = None, 174 message_repository: MessageRepository = NoopMessageRepository(), 175 token_expiry_is_time_of_expiration: bool = False, 176 refresh_token_error_status_codes: Tuple[int, ...] = (), 177 refresh_token_error_key: str = "", 178 refresh_token_error_values: Tuple[str, ...] = (), 179 ) -> None: 180 """ 181 Args: 182 connector_config (Mapping[str, Any]): The full connector configuration 183 token_refresh_endpoint (str): Full URL to the token refresh endpoint 184 scopes (List[str], optional): List of OAuth scopes to pass in the refresh token request body. Defaults to None. 185 access_token_name (str, optional): Name of the access token field, used to parse the refresh token response. Defaults to "access_token". 186 expires_in_name (str, optional): Name of the name of the field that characterizes when the current access token will expire, used to parse the refresh token response. Defaults to "expires_in". 187 refresh_token_name (str, optional): Name of the name of the refresh token field, used to parse the refresh token response. Defaults to "refresh_token". 188 refresh_request_body (Mapping[str, Any], optional): Custom key value pair that will be added to the refresh token request body. Defaults to None. 189 refresh_request_headers (Mapping[str, Any], optional): Custom key value pair that will be added to the refresh token request headers. Defaults to None. 190 grant_type (str, optional): OAuth grant type. Defaults to "refresh_token". 191 client_id (Optional[str]): The client id to authenticate. If not specified, defaults to credentials.client_id in the config object. 192 client_secret (Optional[str]): The client secret to authenticate. If not specified, defaults to credentials.client_secret in the config object. 193 access_token_config_path (Sequence[str]): Dpath to the access_token field in the connector configuration. Defaults to ("credentials", "access_token"). 194 refresh_token_config_path (Sequence[str]): Dpath to the refresh_token field in the connector configuration. Defaults to ("credentials", "refresh_token"). 195 token_expiry_date_config_path (Sequence[str]): Dpath to the token_expiry_date field in the connector configuration. Defaults to ("credentials", "token_expiry_date"). 196 token_expiry_date_format (Optional[str]): Date format of the token expiry date field (set by expires_in_name). If not specified the token expiry date is interpreted as number of seconds until expiration. 197 token_expiry_is_time_of_expiration bool: set True it if expires_in is returned as time of expiration instead of the number seconds until expiration 198 message_repository (MessageRepository): the message repository used to emit logs on HTTP requests and control message on config update 199 """ 200 self._connector_config = connector_config 201 self._client_id: str = self._get_config_value_by_path( 202 ("credentials", "client_id"), client_id 203 ) 204 self._client_secret: str = self._get_config_value_by_path( 205 ("credentials", "client_secret"), client_secret 206 ) 207 self._client_id_name = client_id_name 208 self._client_secret_name = client_secret_name 209 self._access_token_config_path = access_token_config_path 210 self._refresh_token_config_path = refresh_token_config_path 211 self._token_expiry_date_config_path = token_expiry_date_config_path 212 self._token_expiry_date_format = token_expiry_date_format 213 self._refresh_token_name = refresh_token_name 214 self._grant_type_name = grant_type_name 215 self._connector_config = connector_config 216 self.__message_repository = message_repository 217 super().__init__( 218 token_refresh_endpoint=token_refresh_endpoint, 219 client_id_name=self._client_id_name, 220 client_id=self._client_id, 221 client_secret_name=self._client_secret_name, 222 client_secret=self._client_secret, 223 refresh_token=self.get_refresh_token(), 224 refresh_token_name=self._refresh_token_name, 225 scopes=scopes, 226 token_expiry_date=self.get_token_expiry_date(), 227 access_token_name=access_token_name, 228 expires_in_name=expires_in_name, 229 refresh_request_body=refresh_request_body, 230 refresh_request_headers=refresh_request_headers, 231 grant_type_name=self._grant_type_name, 232 grant_type=grant_type, 233 token_expiry_date_format=token_expiry_date_format, 234 token_expiry_is_time_of_expiration=token_expiry_is_time_of_expiration, 235 refresh_token_error_status_codes=refresh_token_error_status_codes, 236 refresh_token_error_key=refresh_token_error_key, 237 refresh_token_error_values=refresh_token_error_values, 238 )
Arguments:
- connector_config (Mapping[str, Any]): The full connector configuration
- token_refresh_endpoint (str): Full URL to the token refresh endpoint
- scopes (List[str], optional): List of OAuth scopes to pass in the refresh token request body. Defaults to None.
- access_token_name (str, optional): Name of the access token field, used to parse the refresh token response. Defaults to "access_token".
- expires_in_name (str, optional): Name of the name of the field that characterizes when the current access token will expire, used to parse the refresh token response. Defaults to "expires_in".
- refresh_token_name (str, optional): Name of the name of the refresh token field, used to parse the refresh token response. Defaults to "refresh_token".
- refresh_request_body (Mapping[str, Any], optional): Custom key value pair that will be added to the refresh token request body. Defaults to None.
- refresh_request_headers (Mapping[str, Any], optional): Custom key value pair that will be added to the refresh token request headers. Defaults to None.
- grant_type (str, optional): OAuth grant type. Defaults to "refresh_token".
- client_id (Optional[str]): The client id to authenticate. If not specified, defaults to credentials.client_id in the config object.
- client_secret (Optional[str]): The client secret to authenticate. If not specified, defaults to credentials.client_secret in the config object.
- access_token_config_path (Sequence[str]): Dpath to the access_token field in the connector configuration. Defaults to ("credentials", "access_token").
- refresh_token_config_path (Sequence[str]): Dpath to the refresh_token field in the connector configuration. Defaults to ("credentials", "refresh_token").
- token_expiry_date_config_path (Sequence[str]): Dpath to the token_expiry_date field in the connector configuration. Defaults to ("credentials", "token_expiry_date").
- token_expiry_date_format (Optional[str]): Date format of the token expiry date field (set by expires_in_name). If not specified the token expiry date is interpreted as number of seconds until expiration.
- token_expiry_is_time_of_expiration bool: set True it if expires_in is returned as time of expiration instead of the number seconds until expiration
- message_repository (MessageRepository): the message repository used to emit logs on HTTP requests and control message on config update
240 @property 241 def access_token(self) -> str: 242 """ 243 Retrieve the access token from the configuration. 244 245 Returns: 246 str: The access token. 247 """ 248 return self._get_config_value_by_path(self._access_token_config_path) # type: ignore[return-value]
Retrieve the access token from the configuration.
Returns:
str: The access token.
260 def get_refresh_token(self) -> str: 261 """ 262 Retrieve the refresh token from the configuration. 263 264 This method fetches the refresh token using the configuration path specified 265 by `_refresh_token_config_path`. 266 267 Returns: 268 str: The refresh token as a string. 269 """ 270 return self._get_config_value_by_path(self._refresh_token_config_path) # type: ignore[return-value]
Retrieve the refresh token from the configuration.
This method fetches the refresh token using the configuration path specified
by _refresh_token_config_path.
Returns:
str: The refresh token as a string.
272 def set_refresh_token(self, new_refresh_token: str) -> None: 273 """ 274 Updates the refresh token in the configuration. 275 276 Args: 277 new_refresh_token (str): The new refresh token to be set. 278 """ 279 self._set_config_value_by_path(self._refresh_token_config_path, new_refresh_token)
Updates the refresh token in the configuration.
Arguments:
- new_refresh_token (str): The new refresh token to be set.
281 def get_token_expiry_date(self) -> AirbyteDateTime: 282 """ 283 Retrieves the token expiry date from the configuration. 284 285 This method fetches the token expiry date from the configuration using the specified path. 286 If the expiry date is an empty string, it returns the current date and time minus one day. 287 Otherwise, it parses the expiry date string into an AirbyteDateTime object. 288 289 Returns: 290 AirbyteDateTime: The parsed or calculated token expiry date. 291 292 Raises: 293 TypeError: If the result is not an instance of AirbyteDateTime. 294 """ 295 expiry_date = self._get_config_value_by_path(self._token_expiry_date_config_path) 296 result = ( 297 ab_datetime_now() - timedelta(days=1) 298 if expiry_date == "" 299 else ab_datetime_parse(str(expiry_date)) 300 ) 301 if isinstance(result, AirbyteDateTime): 302 return result 303 raise TypeError("Invalid datetime conversion")
Retrieves the token expiry date from the configuration.
This method fetches the token expiry date from the configuration using the specified path. If the expiry date is an empty string, it returns the current date and time minus one day. Otherwise, it parses the expiry date string into an AirbyteDateTime object.
Returns:
AirbyteDateTime: The parsed or calculated token expiry date.
Raises:
- TypeError: If the result is not an instance of AirbyteDateTime.
305 def set_token_expiry_date(self, new_token_expiry_date: AirbyteDateTime) -> None: # type: ignore[override] 306 """ 307 Sets the token expiry date in the configuration. 308 309 Args: 310 new_token_expiry_date (AirbyteDateTime): The new expiry date for the token. 311 """ 312 self._set_config_value_by_path( 313 self._token_expiry_date_config_path, str(new_token_expiry_date) 314 )
Sets the token expiry date in the configuration.
Arguments:
- new_token_expiry_date (AirbyteDateTime): The new expiry date for the token.
316 def token_has_expired(self) -> bool: 317 """Returns True if the token is expired""" 318 return ab_datetime_now() > self.get_token_expiry_date()
Returns True if the token is expired
320 def get_access_token(self) -> str: 321 """Retrieve new access and refresh token if the access token has expired. 322 323 This method uses double-checked locking to ensure thread-safe token refresh. 324 This is especially critical for single-use refresh tokens where concurrent 325 refresh attempts would cause failures as the refresh token is invalidated 326 after first use. 327 328 The new refresh token is persisted with the set_refresh_token function. 329 330 Returns: 331 str: The current access_token, updated if it was previously expired. 332 """ 333 if self.token_has_expired(): 334 with self._token_refresh_lock: 335 # Double-check after acquiring lock - another thread may have already refreshed 336 if self.token_has_expired(): 337 self.refresh_and_set_access_token() 338 return self.access_token
Retrieve new access and refresh token if the access token has expired.
This method uses double-checked locking to ensure thread-safe token refresh. This is especially critical for single-use refresh tokens where concurrent refresh attempts would cause failures as the refresh token is invalidated after first use.
The new refresh token is persisted with the set_refresh_token function.
Returns:
str: The current access_token, updated if it was previously expired.
340 def refresh_and_set_access_token(self) -> None: 341 """Force refresh the access token and update internal state. 342 343 For single-use refresh tokens, this also persists the new refresh token 344 and emits a control message to update the connector config. 345 """ 346 new_access_token, access_token_expires_in, new_refresh_token = self.refresh_access_token() 347 self.access_token = new_access_token 348 self.set_refresh_token(new_refresh_token) 349 self.set_token_expiry_date(access_token_expires_in) 350 self._emit_control_message()
Force refresh the access token and update internal state.
For single-use refresh tokens, this also persists the new refresh token and emits a control message to update the connector config.
352 def refresh_access_token(self) -> Tuple[str, AirbyteDateTime, str]: # type: ignore[override] 353 """ 354 Refreshes the access token by making a handled request and extracting the necessary token information. 355 356 Returns: 357 Tuple[str, str, str]: A tuple containing the new access token, token expiry date, and refresh token. 358 """ 359 response_json = self._make_handled_request() 360 return ( 361 self._extract_access_token(response_json), 362 self._extract_token_expiry_date(response_json), 363 self._extract_refresh_token(response_json), 364 )
Refreshes the access token by making a handled request and extracting the necessary token information.
Returns:
Tuple[str, str, str]: A tuple containing the new access token, token expiry date, and refresh token.
Inherited Members
- Oauth2Authenticator
- get_token_refresh_endpoint
- get_client_id_name
- get_client_id
- get_client_secret_name
- get_client_secret
- get_refresh_token_name
- get_access_token_name
- get_scopes
- get_expires_in_name
- get_refresh_request_body
- get_refresh_request_headers
- get_grant_type_name
- get_grant_type
- token_expiry_is_time_of_expiration
- token_expiry_date_format
39class TokenAuthenticator(AbstractHeaderAuthenticator): 40 """ 41 Builds auth header, based on the token provided. 42 The token is attached to each request via the `auth_header` header. 43 """ 44 45 @property 46 def auth_header(self) -> str: 47 return self._auth_header 48 49 @property 50 def token(self) -> str: 51 return f"{self._auth_method} {self._token}" 52 53 def __init__(self, token: str, auth_method: str = "Bearer", auth_header: str = "Authorization"): 54 self._auth_header = auth_header 55 self._auth_method = auth_method 56 self._token = token
Builds auth header, based on the token provided.
The token is attached to each request via the auth_header header.
15class MultipleTokenAuthenticator(AbstractHeaderAuthenticator): 16 """ 17 Builds auth header, based on the list of tokens provided. 18 Auth header is changed per each `get_auth_header` call, using each token in cycle. 19 The token is attached to each request via the `auth_header` header. 20 """ 21 22 @property 23 def auth_header(self) -> str: 24 return self._auth_header 25 26 @property 27 def token(self) -> str: 28 return f"{self._auth_method} {next(self._tokens_iter)}" 29 30 def __init__( 31 self, tokens: List[str], auth_method: str = "Bearer", auth_header: str = "Authorization" 32 ): 33 self._auth_method = auth_method 34 self._auth_header = auth_header 35 self._tokens = tokens 36 self._tokens_iter = cycle(self._tokens)
Builds auth header, based on the list of tokens provided.
Auth header is changed per each get_auth_header call, using each token in cycle.
The token is attached to each request via the auth_header header.
59class BasicHttpAuthenticator(AbstractHeaderAuthenticator): 60 """ 61 Builds auth based off the basic authentication scheme as defined by RFC 7617, which transmits credentials as USER ID/password pairs, encoded using bas64 62 https://developer.mozilla.org/en-US/docs/Web/HTTP/Authentication#basic_authentication_scheme 63 """ 64 65 @property 66 def auth_header(self) -> str: 67 return self._auth_header 68 69 @property 70 def token(self) -> str: 71 return f"{self._auth_method} {self._token}" 72 73 def __init__( 74 self, 75 username: str, 76 password: str = "", 77 auth_method: str = "Basic", 78 auth_header: str = "Authorization", 79 ): 80 auth_string = f"{username}:{password}".encode("utf8") 81 b64_encoded = base64.b64encode(auth_string).decode("utf8") 82 self._auth_header = auth_header 83 self._auth_method = auth_method 84 self._token = b64_encoded
Builds auth based off the basic authentication scheme as defined by RFC 7617, which transmits credentials as USER ID/password pairs, encoded using bas64 https://developer.mozilla.org/en-US/docs/Web/HTTP/Authentication#basic_authentication_scheme
73 def __init__( 74 self, 75 username: str, 76 password: str = "", 77 auth_method: str = "Basic", 78 auth_header: str = "Authorization", 79 ): 80 auth_string = f"{username}:{password}".encode("utf8") 81 b64_encoded = base64.b64encode(auth_string).decode("utf8") 82 self._auth_header = auth_header 83 self._auth_method = auth_method 84 self._token = b64_encoded