airbyte_cdk.sources.streams.http.requests_native_auth.abstract_token
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from abc import abstractmethod 6from typing import Any, Mapping 7 8import requests 9from requests.auth import AuthBase 10 11 12class AbstractHeaderAuthenticator(AuthBase): 13 """Abstract class for an header-based authenticators that add a header to outgoing HTTP requests.""" 14 15 def __call__(self, request: requests.PreparedRequest) -> Any: 16 """Attach the HTTP headers required to authenticate on the HTTP request""" 17 request.headers.update(self.get_auth_header()) 18 return request 19 20 def get_auth_header(self) -> Mapping[str, Any]: 21 """The header to set on outgoing HTTP requests""" 22 if self.auth_header: 23 return {self.auth_header: self.token} 24 return {} 25 26 @property 27 @abstractmethod 28 def auth_header(self) -> str: 29 """HTTP header to set on the requests""" 30 31 @property 32 @abstractmethod 33 def token(self) -> str: 34 """The header value to set on outgoing HTTP requests"""
class
AbstractHeaderAuthenticator(requests.auth.AuthBase):
13class AbstractHeaderAuthenticator(AuthBase): 14 """Abstract class for an header-based authenticators that add a header to outgoing HTTP requests.""" 15 16 def __call__(self, request: requests.PreparedRequest) -> Any: 17 """Attach the HTTP headers required to authenticate on the HTTP request""" 18 request.headers.update(self.get_auth_header()) 19 return request 20 21 def get_auth_header(self) -> Mapping[str, Any]: 22 """The header to set on outgoing HTTP requests""" 23 if self.auth_header: 24 return {self.auth_header: self.token} 25 return {} 26 27 @property 28 @abstractmethod 29 def auth_header(self) -> str: 30 """HTTP header to set on the requests""" 31 32 @property 33 @abstractmethod 34 def token(self) -> str: 35 """The header value to set on outgoing HTTP requests"""
Abstract class for an header-based authenticators that add a header to outgoing HTTP requests.
def
get_auth_header(self) -> Mapping[str, Any]:
21 def get_auth_header(self) -> Mapping[str, Any]: 22 """The header to set on outgoing HTTP requests""" 23 if self.auth_header: 24 return {self.auth_header: self.token} 25 return {}
The header to set on outgoing HTTP requests