airbyte_cdk.sources.declarative.auth.declarative_authenticator

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5from dataclasses import InitVar, dataclass
 6from typing import Any, Mapping, Union
 7
 8from airbyte_cdk.sources.streams.http.requests_native_auth.abstract_token import (
 9    AbstractHeaderAuthenticator,
10)
11
12
13@dataclass
14class DeclarativeAuthenticator(AbstractHeaderAuthenticator):
15    """
16    Interface used to associate which authenticators can be used as part of the declarative framework
17    """
18
19    def get_request_params(self) -> Mapping[str, Any]:
20        """HTTP request parameter to add to the requests"""
21        return {}
22
23    def get_request_body_data(self) -> Union[Mapping[str, Any], str]:
24        """Form-encoded body data to set on the requests"""
25        return {}
26
27    def get_request_body_json(self) -> Mapping[str, Any]:
28        """JSON-encoded body data to set on the requests"""
29        return {}
30
31
32@dataclass
33class NoAuth(DeclarativeAuthenticator):
34    parameters: InitVar[Mapping[str, Any]]
35
36    @property
37    def auth_header(self) -> str:
38        return ""
39
40    @property
41    def token(self) -> str:
42        return ""
14@dataclass
15class DeclarativeAuthenticator(AbstractHeaderAuthenticator):
16    """
17    Interface used to associate which authenticators can be used as part of the declarative framework
18    """
19
20    def get_request_params(self) -> Mapping[str, Any]:
21        """HTTP request parameter to add to the requests"""
22        return {}
23
24    def get_request_body_data(self) -> Union[Mapping[str, Any], str]:
25        """Form-encoded body data to set on the requests"""
26        return {}
27
28    def get_request_body_json(self) -> Mapping[str, Any]:
29        """JSON-encoded body data to set on the requests"""
30        return {}

Interface used to associate which authenticators can be used as part of the declarative framework

def get_request_params(self) -> Mapping[str, Any]:
20    def get_request_params(self) -> Mapping[str, Any]:
21        """HTTP request parameter to add to the requests"""
22        return {}

HTTP request parameter to add to the requests

def get_request_body_data(self) -> Union[Mapping[str, Any], str]:
24    def get_request_body_data(self) -> Union[Mapping[str, Any], str]:
25        """Form-encoded body data to set on the requests"""
26        return {}

Form-encoded body data to set on the requests

def get_request_body_json(self) -> Mapping[str, Any]:
28    def get_request_body_json(self) -> Mapping[str, Any]:
29        """JSON-encoded body data to set on the requests"""
30        return {}

JSON-encoded body data to set on the requests

@dataclass
class NoAuth(DeclarativeAuthenticator):
33@dataclass
34class NoAuth(DeclarativeAuthenticator):
35    parameters: InitVar[Mapping[str, Any]]
36
37    @property
38    def auth_header(self) -> str:
39        return ""
40
41    @property
42    def token(self) -> str:
43        return ""
NoAuth(parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
auth_header: str
37    @property
38    def auth_header(self) -> str:
39        return ""

HTTP header to set on the requests

token: str
41    @property
42    def token(self) -> str:
43        return ""

The header value to set on outgoing HTTP requests