airbyte_cdk.sources.declarative.auth.token_provider
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5 6import datetime 7from abc import abstractmethod 8from dataclasses import InitVar, dataclass, field 9from typing import Any, List, Mapping, Optional, Union 10 11import dpath 12from isodate import Duration 13 14from airbyte_cdk.sources.declarative.decoders.decoder import Decoder 15from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder 16from airbyte_cdk.sources.declarative.exceptions import ReadException 17from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString 18from airbyte_cdk.sources.declarative.requesters.requester import Requester 19from airbyte_cdk.sources.http_logger import format_http_message 20from airbyte_cdk.sources.message import MessageRepository, NoopMessageRepository 21from airbyte_cdk.sources.types import Config 22from airbyte_cdk.utils.datetime_helpers import AirbyteDateTime, ab_datetime_now 23 24 25class TokenProvider: 26 @abstractmethod 27 def get_token(self) -> str: 28 pass 29 30 31@dataclass 32class SessionTokenProvider(TokenProvider): 33 login_requester: Requester 34 session_token_path: List[str] 35 expiration_duration: Optional[Union[datetime.timedelta, Duration]] 36 parameters: InitVar[Mapping[str, Any]] 37 message_repository: MessageRepository = NoopMessageRepository() 38 decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={})) 39 40 _next_expiration_time: Optional[AirbyteDateTime] = None 41 _token: Optional[str] = None 42 43 def get_token(self) -> str: 44 self._refresh_if_necessary() 45 if self._token is None: 46 raise ReadException("Failed to get session token, token is None") 47 return self._token 48 49 def _refresh_if_necessary(self) -> None: 50 if self._next_expiration_time is None or self._next_expiration_time < ab_datetime_now(): 51 self._refresh() 52 53 def _refresh(self) -> None: 54 response = self.login_requester.send_request( 55 log_formatter=lambda response: format_http_message( 56 response, 57 "Login request", 58 "Obtains session token", 59 None, 60 is_auxiliary=True, 61 type="AUTH", 62 ), 63 ) 64 if response is None: 65 raise ReadException("Failed to get session token, response got ignored by requester") 66 session_token = dpath.get(next(self.decoder.decode(response)), self.session_token_path) 67 if self.expiration_duration is not None: 68 self._next_expiration_time = ab_datetime_now() + self.expiration_duration 69 self._token = session_token # type: ignore # Returned decoded response will be Mapping and therefore session_token will be str or None 70 71 72@dataclass 73class InterpolatedStringTokenProvider(TokenProvider): 74 """Provides a token by interpolating a string with config values.""" 75 76 config: Config 77 api_token: Union[InterpolatedString, str] 78 parameters: Mapping[str, Any] 79 80 def __post_init__(self) -> None: 81 self._token = InterpolatedString.create(self.api_token, parameters=self.parameters) 82 83 def get_token(self) -> str: 84 return str(self._token.eval(self.config)) 85 86 87@dataclass 88class InterpolatedSessionTokenProvider(TokenProvider): 89 """Provides a token by interpolating a template with the session token. 90 91 This allows flexible token formatting, such as "Token {{ session_token }}" 92 for Django REST Framework APIs that expect "Authorization: Token <value>". 93 """ 94 95 config: Config 96 api_token: Union[InterpolatedString, str] 97 session_token_provider: TokenProvider 98 parameters: Mapping[str, Any] 99 100 def __post_init__(self) -> None: 101 self._token_template = InterpolatedString.create(self.api_token, parameters=self.parameters) 102 103 def get_token(self) -> str: 104 session_token = self.session_token_provider.get_token() 105 return str(self._token_template.eval(self.config, session_token=session_token))
class
TokenProvider:
32@dataclass 33class SessionTokenProvider(TokenProvider): 34 login_requester: Requester 35 session_token_path: List[str] 36 expiration_duration: Optional[Union[datetime.timedelta, Duration]] 37 parameters: InitVar[Mapping[str, Any]] 38 message_repository: MessageRepository = NoopMessageRepository() 39 decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={})) 40 41 _next_expiration_time: Optional[AirbyteDateTime] = None 42 _token: Optional[str] = None 43 44 def get_token(self) -> str: 45 self._refresh_if_necessary() 46 if self._token is None: 47 raise ReadException("Failed to get session token, token is None") 48 return self._token 49 50 def _refresh_if_necessary(self) -> None: 51 if self._next_expiration_time is None or self._next_expiration_time < ab_datetime_now(): 52 self._refresh() 53 54 def _refresh(self) -> None: 55 response = self.login_requester.send_request( 56 log_formatter=lambda response: format_http_message( 57 response, 58 "Login request", 59 "Obtains session token", 60 None, 61 is_auxiliary=True, 62 type="AUTH", 63 ), 64 ) 65 if response is None: 66 raise ReadException("Failed to get session token, response got ignored by requester") 67 session_token = dpath.get(next(self.decoder.decode(response)), self.session_token_path) 68 if self.expiration_duration is not None: 69 self._next_expiration_time = ab_datetime_now() + self.expiration_duration 70 self._token = session_token # type: ignore # Returned decoded response will be Mapping and therefore session_token will be str or None
SessionTokenProvider( login_requester: airbyte_cdk.Requester, session_token_path: List[str], expiration_duration: Union[datetime.timedelta, isodate.duration.Duration, NoneType], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], message_repository: airbyte_cdk.MessageRepository = <airbyte_cdk.sources.message.NoopMessageRepository object>, decoder: airbyte_cdk.Decoder = <factory>, _next_expiration_time: Optional[airbyte_cdk.utils.datetime_helpers.AirbyteDateTime] = None, _token: Optional[str] = None)
login_requester: airbyte_cdk.Requester
message_repository: airbyte_cdk.MessageRepository =
<airbyte_cdk.sources.message.NoopMessageRepository object>
decoder: airbyte_cdk.Decoder
73@dataclass 74class InterpolatedStringTokenProvider(TokenProvider): 75 """Provides a token by interpolating a string with config values.""" 76 77 config: Config 78 api_token: Union[InterpolatedString, str] 79 parameters: Mapping[str, Any] 80 81 def __post_init__(self) -> None: 82 self._token = InterpolatedString.create(self.api_token, parameters=self.parameters) 83 84 def get_token(self) -> str: 85 return str(self._token.eval(self.config))
Provides a token by interpolating a string with config values.
InterpolatedStringTokenProvider( config: Mapping[str, Any], api_token: Union[airbyte_cdk.InterpolatedString, str], parameters: Mapping[str, Any])
api_token: Union[airbyte_cdk.InterpolatedString, str]
88@dataclass 89class InterpolatedSessionTokenProvider(TokenProvider): 90 """Provides a token by interpolating a template with the session token. 91 92 This allows flexible token formatting, such as "Token {{ session_token }}" 93 for Django REST Framework APIs that expect "Authorization: Token <value>". 94 """ 95 96 config: Config 97 api_token: Union[InterpolatedString, str] 98 session_token_provider: TokenProvider 99 parameters: Mapping[str, Any] 100 101 def __post_init__(self) -> None: 102 self._token_template = InterpolatedString.create(self.api_token, parameters=self.parameters) 103 104 def get_token(self) -> str: 105 session_token = self.session_token_provider.get_token() 106 return str(self._token_template.eval(self.config, session_token=session_token))
Provides a token by interpolating a template with the session token.
This allows flexible token formatting, such as "Token {{ session_token }}"
for Django REST Framework APIs that expect "Authorization: Token
InterpolatedSessionTokenProvider( config: Mapping[str, Any], api_token: Union[airbyte_cdk.InterpolatedString, str], session_token_provider: TokenProvider, parameters: Mapping[str, Any])
api_token: Union[airbyte_cdk.InterpolatedString, str]
session_token_provider: TokenProvider