airbyte_cdk.sources.streams.http.requests_native_auth.abstract_token

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5from abc import abstractmethod
 6from typing import Any, Mapping
 7
 8import requests
 9from requests.auth import AuthBase
10
11
12class AbstractHeaderAuthenticator(AuthBase):
13    """Abstract class for an header-based authenticators that add a header to outgoing HTTP requests."""
14
15    def __call__(self, request: requests.PreparedRequest) -> Any:
16        """Attach the HTTP headers required to authenticate on the HTTP request"""
17        request.headers.update(self.get_auth_header())
18        return request
19
20    def get_auth_header(self) -> Mapping[str, Any]:
21        """The header to set on outgoing HTTP requests"""
22        if self.auth_header:
23            return {self.auth_header: self.token}
24        return {}
25
26    @property
27    @abstractmethod
28    def auth_header(self) -> str:
29        """HTTP header to set on the requests"""
30
31    @property
32    @abstractmethod
33    def token(self) -> str:
34        """The header value to set on outgoing HTTP requests"""
class AbstractHeaderAuthenticator(requests.auth.AuthBase):
13class AbstractHeaderAuthenticator(AuthBase):
14    """Abstract class for an header-based authenticators that add a header to outgoing HTTP requests."""
15
16    def __call__(self, request: requests.PreparedRequest) -> Any:
17        """Attach the HTTP headers required to authenticate on the HTTP request"""
18        request.headers.update(self.get_auth_header())
19        return request
20
21    def get_auth_header(self) -> Mapping[str, Any]:
22        """The header to set on outgoing HTTP requests"""
23        if self.auth_header:
24            return {self.auth_header: self.token}
25        return {}
26
27    @property
28    @abstractmethod
29    def auth_header(self) -> str:
30        """HTTP header to set on the requests"""
31
32    @property
33    @abstractmethod
34    def token(self) -> str:
35        """The header value to set on outgoing HTTP requests"""

Abstract class for an header-based authenticators that add a header to outgoing HTTP requests.

def get_auth_header(self) -> Mapping[str, Any]:
21    def get_auth_header(self) -> Mapping[str, Any]:
22        """The header to set on outgoing HTTP requests"""
23        if self.auth_header:
24            return {self.auth_header: self.token}
25        return {}

The header to set on outgoing HTTP requests

auth_header: str
27    @property
28    @abstractmethod
29    def auth_header(self) -> str:
30        """HTTP header to set on the requests"""

HTTP header to set on the requests

token: str
32    @property
33    @abstractmethod
34    def token(self) -> str:
35        """The header value to set on outgoing HTTP requests"""

The header value to set on outgoing HTTP requests