airbyte_cdk.sources.declarative.auth.token_provider

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5
  6import datetime
  7from abc import abstractmethod
  8from dataclasses import InitVar, dataclass, field
  9from typing import Any, List, Mapping, Optional, Union
 10
 11import dpath
 12from isodate import Duration
 13
 14from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
 15from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
 16from airbyte_cdk.sources.declarative.exceptions import ReadException
 17from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
 18from airbyte_cdk.sources.declarative.requesters.requester import Requester
 19from airbyte_cdk.sources.http_logger import format_http_message
 20from airbyte_cdk.sources.message import MessageRepository, NoopMessageRepository
 21from airbyte_cdk.sources.types import Config
 22from airbyte_cdk.utils.datetime_helpers import AirbyteDateTime, ab_datetime_now
 23
 24
 25class TokenProvider:
 26    @abstractmethod
 27    def get_token(self) -> str:
 28        pass
 29
 30
 31@dataclass
 32class SessionTokenProvider(TokenProvider):
 33    login_requester: Requester
 34    session_token_path: List[str]
 35    expiration_duration: Optional[Union[datetime.timedelta, Duration]]
 36    parameters: InitVar[Mapping[str, Any]]
 37    message_repository: MessageRepository = NoopMessageRepository()
 38    decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={}))
 39
 40    _next_expiration_time: Optional[AirbyteDateTime] = None
 41    _token: Optional[str] = None
 42
 43    def get_token(self) -> str:
 44        self._refresh_if_necessary()
 45        if self._token is None:
 46            raise ReadException("Failed to get session token, token is None")
 47        return self._token
 48
 49    def _refresh_if_necessary(self) -> None:
 50        if self._next_expiration_time is None or self._next_expiration_time < ab_datetime_now():
 51            self._refresh()
 52
 53    def _refresh(self) -> None:
 54        response = self.login_requester.send_request(
 55            log_formatter=lambda response: format_http_message(
 56                response,
 57                "Login request",
 58                "Obtains session token",
 59                None,
 60                is_auxiliary=True,
 61                type="AUTH",
 62            ),
 63        )
 64        if response is None:
 65            raise ReadException("Failed to get session token, response got ignored by requester")
 66        session_token = dpath.get(next(self.decoder.decode(response)), self.session_token_path)
 67        if self.expiration_duration is not None:
 68            self._next_expiration_time = ab_datetime_now() + self.expiration_duration
 69        self._token = session_token  # type: ignore # Returned decoded response will be Mapping and therefore session_token will be str or None
 70
 71
 72@dataclass
 73class InterpolatedStringTokenProvider(TokenProvider):
 74    """Provides a token by interpolating a string with config values."""
 75
 76    config: Config
 77    api_token: Union[InterpolatedString, str]
 78    parameters: Mapping[str, Any]
 79
 80    def __post_init__(self) -> None:
 81        self._token = InterpolatedString.create(self.api_token, parameters=self.parameters)
 82
 83    def get_token(self) -> str:
 84        return str(self._token.eval(self.config))
 85
 86
 87@dataclass
 88class InterpolatedSessionTokenProvider(TokenProvider):
 89    """Provides a token by interpolating a template with the session token.
 90
 91    This allows flexible token formatting, such as "Token {{ session_token }}"
 92    for Django REST Framework APIs that expect "Authorization: Token <value>".
 93    """
 94
 95    config: Config
 96    api_token: Union[InterpolatedString, str]
 97    session_token_provider: TokenProvider
 98    parameters: Mapping[str, Any]
 99
100    def __post_init__(self) -> None:
101        self._token_template = InterpolatedString.create(self.api_token, parameters=self.parameters)
102
103    def get_token(self) -> str:
104        session_token = self.session_token_provider.get_token()
105        return str(self._token_template.eval(self.config, session_token=session_token))
class TokenProvider:
26class TokenProvider:
27    @abstractmethod
28    def get_token(self) -> str:
29        pass
@abstractmethod
def get_token(self) -> str:
27    @abstractmethod
28    def get_token(self) -> str:
29        pass
@dataclass
class SessionTokenProvider(TokenProvider):
32@dataclass
33class SessionTokenProvider(TokenProvider):
34    login_requester: Requester
35    session_token_path: List[str]
36    expiration_duration: Optional[Union[datetime.timedelta, Duration]]
37    parameters: InitVar[Mapping[str, Any]]
38    message_repository: MessageRepository = NoopMessageRepository()
39    decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={}))
40
41    _next_expiration_time: Optional[AirbyteDateTime] = None
42    _token: Optional[str] = None
43
44    def get_token(self) -> str:
45        self._refresh_if_necessary()
46        if self._token is None:
47            raise ReadException("Failed to get session token, token is None")
48        return self._token
49
50    def _refresh_if_necessary(self) -> None:
51        if self._next_expiration_time is None or self._next_expiration_time < ab_datetime_now():
52            self._refresh()
53
54    def _refresh(self) -> None:
55        response = self.login_requester.send_request(
56            log_formatter=lambda response: format_http_message(
57                response,
58                "Login request",
59                "Obtains session token",
60                None,
61                is_auxiliary=True,
62                type="AUTH",
63            ),
64        )
65        if response is None:
66            raise ReadException("Failed to get session token, response got ignored by requester")
67        session_token = dpath.get(next(self.decoder.decode(response)), self.session_token_path)
68        if self.expiration_duration is not None:
69            self._next_expiration_time = ab_datetime_now() + self.expiration_duration
70        self._token = session_token  # type: ignore # Returned decoded response will be Mapping and therefore session_token will be str or None
SessionTokenProvider( login_requester: airbyte_cdk.Requester, session_token_path: List[str], expiration_duration: Union[datetime.timedelta, isodate.duration.Duration, NoneType], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], message_repository: airbyte_cdk.MessageRepository = <airbyte_cdk.sources.message.NoopMessageRepository object>, decoder: airbyte_cdk.Decoder = <factory>, _next_expiration_time: Optional[airbyte_cdk.utils.datetime_helpers.AirbyteDateTime] = None, _token: Optional[str] = None)
login_requester: airbyte_cdk.Requester
session_token_path: List[str]
expiration_duration: Union[datetime.timedelta, isodate.duration.Duration, NoneType]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def get_token(self) -> str:
44    def get_token(self) -> str:
45        self._refresh_if_necessary()
46        if self._token is None:
47            raise ReadException("Failed to get session token, token is None")
48        return self._token
@dataclass
class InterpolatedStringTokenProvider(TokenProvider):
73@dataclass
74class InterpolatedStringTokenProvider(TokenProvider):
75    """Provides a token by interpolating a string with config values."""
76
77    config: Config
78    api_token: Union[InterpolatedString, str]
79    parameters: Mapping[str, Any]
80
81    def __post_init__(self) -> None:
82        self._token = InterpolatedString.create(self.api_token, parameters=self.parameters)
83
84    def get_token(self) -> str:
85        return str(self._token.eval(self.config))

Provides a token by interpolating a string with config values.

InterpolatedStringTokenProvider( config: Mapping[str, Any], api_token: Union[airbyte_cdk.InterpolatedString, str], parameters: Mapping[str, Any])
config: Mapping[str, Any]
api_token: Union[airbyte_cdk.InterpolatedString, str]
parameters: Mapping[str, Any]
def get_token(self) -> str:
84    def get_token(self) -> str:
85        return str(self._token.eval(self.config))
@dataclass
class InterpolatedSessionTokenProvider(TokenProvider):
 88@dataclass
 89class InterpolatedSessionTokenProvider(TokenProvider):
 90    """Provides a token by interpolating a template with the session token.
 91
 92    This allows flexible token formatting, such as "Token {{ session_token }}"
 93    for Django REST Framework APIs that expect "Authorization: Token <value>".
 94    """
 95
 96    config: Config
 97    api_token: Union[InterpolatedString, str]
 98    session_token_provider: TokenProvider
 99    parameters: Mapping[str, Any]
100
101    def __post_init__(self) -> None:
102        self._token_template = InterpolatedString.create(self.api_token, parameters=self.parameters)
103
104    def get_token(self) -> str:
105        session_token = self.session_token_provider.get_token()
106        return str(self._token_template.eval(self.config, session_token=session_token))

Provides a token by interpolating a template with the session token.

This allows flexible token formatting, such as "Token {{ session_token }}" for Django REST Framework APIs that expect "Authorization: Token ".

InterpolatedSessionTokenProvider( config: Mapping[str, Any], api_token: Union[airbyte_cdk.InterpolatedString, str], session_token_provider: TokenProvider, parameters: Mapping[str, Any])
config: Mapping[str, Any]
api_token: Union[airbyte_cdk.InterpolatedString, str]
session_token_provider: TokenProvider
parameters: Mapping[str, Any]
def get_token(self) -> str:
104    def get_token(self) -> str:
105        session_token = self.session_token_provider.get_token()
106        return str(self._token_template.eval(self.config, session_token=session_token))