airbyte_cdk.sources.declarative.auth.token_provider
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5 6import datetime 7from abc import abstractmethod 8from dataclasses import InitVar, dataclass, field 9from typing import Any, List, Mapping, Optional, Union 10 11import dpath 12from isodate import Duration 13 14from airbyte_cdk.sources.declarative.decoders.decoder import Decoder 15from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder 16from airbyte_cdk.sources.declarative.exceptions import ReadException 17from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString 18from airbyte_cdk.sources.declarative.requesters.requester import Requester 19from airbyte_cdk.sources.http_logger import format_http_message 20from airbyte_cdk.sources.message import MessageRepository, NoopMessageRepository 21from airbyte_cdk.sources.types import Config 22from airbyte_cdk.utils.datetime_helpers import AirbyteDateTime, ab_datetime_now 23 24 25class TokenProvider: 26 @abstractmethod 27 def get_token(self) -> str: 28 pass 29 30 31@dataclass 32class SessionTokenProvider(TokenProvider): 33 login_requester: Requester 34 session_token_path: List[str] 35 expiration_duration: Optional[Union[datetime.timedelta, Duration]] 36 parameters: InitVar[Mapping[str, Any]] 37 message_repository: MessageRepository = NoopMessageRepository() 38 decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={})) 39 40 _next_expiration_time: Optional[AirbyteDateTime] = None 41 _token: Optional[str] = None 42 43 def get_token(self) -> str: 44 self._refresh_if_necessary() 45 if self._token is None: 46 raise ReadException("Failed to get session token, token is None") 47 return self._token 48 49 def _refresh_if_necessary(self) -> None: 50 if self._next_expiration_time is None or self._next_expiration_time < ab_datetime_now(): 51 self._refresh() 52 53 def _refresh(self) -> None: 54 response = self.login_requester.send_request( 55 log_formatter=lambda response: format_http_message( 56 response, 57 "Login request", 58 "Obtains session token", 59 None, 60 is_auxiliary=True, 61 type="AUTH", 62 ), 63 ) 64 if response is None: 65 raise ReadException("Failed to get session token, response got ignored by requester") 66 session_token = dpath.get(next(self.decoder.decode(response)), self.session_token_path) 67 if self.expiration_duration is not None: 68 self._next_expiration_time = ab_datetime_now() + self.expiration_duration 69 self._token = session_token # type: ignore # Returned decoded response will be Mapping and therefore session_token will be str or None 70 71 72@dataclass 73class InterpolatedStringTokenProvider(TokenProvider): 74 config: Config 75 api_token: Union[InterpolatedString, str] 76 parameters: Mapping[str, Any] 77 78 def __post_init__(self) -> None: 79 self._token = InterpolatedString.create(self.api_token, parameters=self.parameters) 80 81 def get_token(self) -> str: 82 return str(self._token.eval(self.config))
class
TokenProvider:
32@dataclass 33class SessionTokenProvider(TokenProvider): 34 login_requester: Requester 35 session_token_path: List[str] 36 expiration_duration: Optional[Union[datetime.timedelta, Duration]] 37 parameters: InitVar[Mapping[str, Any]] 38 message_repository: MessageRepository = NoopMessageRepository() 39 decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={})) 40 41 _next_expiration_time: Optional[AirbyteDateTime] = None 42 _token: Optional[str] = None 43 44 def get_token(self) -> str: 45 self._refresh_if_necessary() 46 if self._token is None: 47 raise ReadException("Failed to get session token, token is None") 48 return self._token 49 50 def _refresh_if_necessary(self) -> None: 51 if self._next_expiration_time is None or self._next_expiration_time < ab_datetime_now(): 52 self._refresh() 53 54 def _refresh(self) -> None: 55 response = self.login_requester.send_request( 56 log_formatter=lambda response: format_http_message( 57 response, 58 "Login request", 59 "Obtains session token", 60 None, 61 is_auxiliary=True, 62 type="AUTH", 63 ), 64 ) 65 if response is None: 66 raise ReadException("Failed to get session token, response got ignored by requester") 67 session_token = dpath.get(next(self.decoder.decode(response)), self.session_token_path) 68 if self.expiration_duration is not None: 69 self._next_expiration_time = ab_datetime_now() + self.expiration_duration 70 self._token = session_token # type: ignore # Returned decoded response will be Mapping and therefore session_token will be str or None
SessionTokenProvider( login_requester: airbyte_cdk.Requester, session_token_path: List[str], expiration_duration: Union[datetime.timedelta, isodate.duration.Duration, NoneType], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], message_repository: airbyte_cdk.MessageRepository = <airbyte_cdk.sources.message.NoopMessageRepository object>, decoder: airbyte_cdk.Decoder = <factory>, _next_expiration_time: Optional[airbyte_cdk.utils.datetime_helpers.AirbyteDateTime] = None, _token: Optional[str] = None)
login_requester: airbyte_cdk.Requester
message_repository: airbyte_cdk.MessageRepository =
<airbyte_cdk.sources.message.NoopMessageRepository object>
decoder: airbyte_cdk.Decoder
73@dataclass 74class InterpolatedStringTokenProvider(TokenProvider): 75 config: Config 76 api_token: Union[InterpolatedString, str] 77 parameters: Mapping[str, Any] 78 79 def __post_init__(self) -> None: 80 self._token = InterpolatedString.create(self.api_token, parameters=self.parameters) 81 82 def get_token(self) -> str: 83 return str(self._token.eval(self.config))
InterpolatedStringTokenProvider( config: Mapping[str, Any], api_token: Union[airbyte_cdk.InterpolatedString, str], parameters: Mapping[str, Any])
api_token: Union[airbyte_cdk.InterpolatedString, str]