airbyte_cdk.sources.declarative.auth.token_provider

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5
 6import datetime
 7from abc import abstractmethod
 8from dataclasses import InitVar, dataclass, field
 9from typing import Any, List, Mapping, Optional, Union
10
11import dpath
12from isodate import Duration
13
14from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
15from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
16from airbyte_cdk.sources.declarative.exceptions import ReadException
17from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
18from airbyte_cdk.sources.declarative.requesters.requester import Requester
19from airbyte_cdk.sources.http_logger import format_http_message
20from airbyte_cdk.sources.message import MessageRepository, NoopMessageRepository
21from airbyte_cdk.sources.types import Config
22from airbyte_cdk.utils.datetime_helpers import AirbyteDateTime, ab_datetime_now
23
24
25class TokenProvider:
26    @abstractmethod
27    def get_token(self) -> str:
28        pass
29
30
31@dataclass
32class SessionTokenProvider(TokenProvider):
33    login_requester: Requester
34    session_token_path: List[str]
35    expiration_duration: Optional[Union[datetime.timedelta, Duration]]
36    parameters: InitVar[Mapping[str, Any]]
37    message_repository: MessageRepository = NoopMessageRepository()
38    decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={}))
39
40    _next_expiration_time: Optional[AirbyteDateTime] = None
41    _token: Optional[str] = None
42
43    def get_token(self) -> str:
44        self._refresh_if_necessary()
45        if self._token is None:
46            raise ReadException("Failed to get session token, token is None")
47        return self._token
48
49    def _refresh_if_necessary(self) -> None:
50        if self._next_expiration_time is None or self._next_expiration_time < ab_datetime_now():
51            self._refresh()
52
53    def _refresh(self) -> None:
54        response = self.login_requester.send_request(
55            log_formatter=lambda response: format_http_message(
56                response,
57                "Login request",
58                "Obtains session token",
59                None,
60                is_auxiliary=True,
61                type="AUTH",
62            ),
63        )
64        if response is None:
65            raise ReadException("Failed to get session token, response got ignored by requester")
66        session_token = dpath.get(next(self.decoder.decode(response)), self.session_token_path)
67        if self.expiration_duration is not None:
68            self._next_expiration_time = ab_datetime_now() + self.expiration_duration
69        self._token = session_token  # type: ignore # Returned decoded response will be Mapping and therefore session_token will be str or None
70
71
72@dataclass
73class InterpolatedStringTokenProvider(TokenProvider):
74    config: Config
75    api_token: Union[InterpolatedString, str]
76    parameters: Mapping[str, Any]
77
78    def __post_init__(self) -> None:
79        self._token = InterpolatedString.create(self.api_token, parameters=self.parameters)
80
81    def get_token(self) -> str:
82        return str(self._token.eval(self.config))
class TokenProvider:
26class TokenProvider:
27    @abstractmethod
28    def get_token(self) -> str:
29        pass
@abstractmethod
def get_token(self) -> str:
27    @abstractmethod
28    def get_token(self) -> str:
29        pass
@dataclass
class SessionTokenProvider(TokenProvider):
32@dataclass
33class SessionTokenProvider(TokenProvider):
34    login_requester: Requester
35    session_token_path: List[str]
36    expiration_duration: Optional[Union[datetime.timedelta, Duration]]
37    parameters: InitVar[Mapping[str, Any]]
38    message_repository: MessageRepository = NoopMessageRepository()
39    decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={}))
40
41    _next_expiration_time: Optional[AirbyteDateTime] = None
42    _token: Optional[str] = None
43
44    def get_token(self) -> str:
45        self._refresh_if_necessary()
46        if self._token is None:
47            raise ReadException("Failed to get session token, token is None")
48        return self._token
49
50    def _refresh_if_necessary(self) -> None:
51        if self._next_expiration_time is None or self._next_expiration_time < ab_datetime_now():
52            self._refresh()
53
54    def _refresh(self) -> None:
55        response = self.login_requester.send_request(
56            log_formatter=lambda response: format_http_message(
57                response,
58                "Login request",
59                "Obtains session token",
60                None,
61                is_auxiliary=True,
62                type="AUTH",
63            ),
64        )
65        if response is None:
66            raise ReadException("Failed to get session token, response got ignored by requester")
67        session_token = dpath.get(next(self.decoder.decode(response)), self.session_token_path)
68        if self.expiration_duration is not None:
69            self._next_expiration_time = ab_datetime_now() + self.expiration_duration
70        self._token = session_token  # type: ignore # Returned decoded response will be Mapping and therefore session_token will be str or None
SessionTokenProvider( login_requester: airbyte_cdk.Requester, session_token_path: List[str], expiration_duration: Union[datetime.timedelta, isodate.duration.Duration, NoneType], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], message_repository: airbyte_cdk.MessageRepository = <airbyte_cdk.sources.message.NoopMessageRepository object>, decoder: airbyte_cdk.Decoder = <factory>, _next_expiration_time: Optional[airbyte_cdk.utils.datetime_helpers.AirbyteDateTime] = None, _token: Optional[str] = None)
login_requester: airbyte_cdk.Requester
session_token_path: List[str]
expiration_duration: Union[datetime.timedelta, isodate.duration.Duration, NoneType]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def get_token(self) -> str:
44    def get_token(self) -> str:
45        self._refresh_if_necessary()
46        if self._token is None:
47            raise ReadException("Failed to get session token, token is None")
48        return self._token
@dataclass
class InterpolatedStringTokenProvider(TokenProvider):
73@dataclass
74class InterpolatedStringTokenProvider(TokenProvider):
75    config: Config
76    api_token: Union[InterpolatedString, str]
77    parameters: Mapping[str, Any]
78
79    def __post_init__(self) -> None:
80        self._token = InterpolatedString.create(self.api_token, parameters=self.parameters)
81
82    def get_token(self) -> str:
83        return str(self._token.eval(self.config))
InterpolatedStringTokenProvider( config: Mapping[str, Any], api_token: Union[airbyte_cdk.InterpolatedString, str], parameters: Mapping[str, Any])
config: Mapping[str, Any]
api_token: Union[airbyte_cdk.InterpolatedString, str]
parameters: Mapping[str, Any]
def get_token(self) -> str:
82    def get_token(self) -> str:
83        return str(self._token.eval(self.config))