airbyte_cdk.connector

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5
  6import json
  7import logging
  8import os
  9import pkgutil
 10from abc import ABC, abstractmethod
 11from typing import Any, Generic, Mapping, Optional, Protocol, TypeVar
 12
 13import yaml
 14
 15from airbyte_cdk.models import (
 16    AirbyteConnectionStatus,
 17    ConnectorSpecification,
 18    ConnectorSpecificationSerializer,
 19)
 20
 21
 22def load_optional_package_file(package: str, filename: str) -> Optional[bytes]:
 23    """Gets a resource from a package, returning None if it does not exist"""
 24    try:
 25        return pkgutil.get_data(package, filename)
 26    except FileNotFoundError:
 27        return None
 28
 29
 30TConfig = TypeVar("TConfig", bound=Mapping[str, Any])
 31
 32
 33class BaseConnector(ABC, Generic[TConfig]):
 34    # configure whether the `check_config_against_spec_or_exit()` needs to be called
 35    check_config_against_spec: bool = True
 36
 37    @abstractmethod
 38    def configure(self, config: Mapping[str, Any], temp_dir: str) -> TConfig:
 39        """
 40        Persist config in temporary directory to run the Source job
 41        """
 42
 43    @staticmethod
 44    def read_config(config_path: str) -> Mapping[str, Any]:
 45        config = BaseConnector._read_json_file(config_path)
 46        if isinstance(config, Mapping):
 47            return config
 48        else:
 49            raise ValueError(
 50                f"The content of {config_path} is not an object and therefore is not a valid config. Please ensure the file represent a config."
 51            )
 52
 53    @staticmethod
 54    def _read_json_file(file_path: str) -> Any:
 55        with open(file_path, "r") as file:
 56            contents = file.read()
 57
 58        try:
 59            return json.loads(contents)
 60        except json.JSONDecodeError as error:
 61            raise ValueError(
 62                f"Could not read json file {file_path}: {error}. Please ensure that it is a valid JSON."
 63            )
 64
 65    @staticmethod
 66    def write_config(config: TConfig, config_path: str) -> None:
 67        with open(config_path, "w") as fh:
 68            fh.write(json.dumps(config))
 69
 70    def spec(self, logger: logging.Logger) -> ConnectorSpecification:
 71        """
 72        Returns the spec for this integration. The spec is a JSON-Schema object describing the required configurations (e.g: username and password)
 73        required to run this integration. By default, this will be loaded from a "spec.yaml" or a "spec.json" in the package root.
 74        """
 75
 76        package = self.__class__.__module__.split(".")[0]
 77
 78        yaml_spec = load_optional_package_file(package, "spec.yaml")
 79        json_spec = load_optional_package_file(package, "spec.json")
 80
 81        if yaml_spec and json_spec:
 82            raise RuntimeError(
 83                "Found multiple spec files in the package. Only one of spec.yaml or spec.json should be provided."
 84            )
 85
 86        if yaml_spec:
 87            spec_obj = yaml.load(yaml_spec, Loader=yaml.SafeLoader)
 88        elif json_spec:
 89            try:
 90                spec_obj = json.loads(json_spec)
 91            except json.JSONDecodeError as error:
 92                raise ValueError(
 93                    f"Could not read json spec file: {error}. Please ensure that it is a valid JSON."
 94                )
 95        else:
 96            raise FileNotFoundError("Unable to find spec.yaml or spec.json in the package.")
 97
 98        return ConnectorSpecificationSerializer.load(spec_obj)
 99
100    @abstractmethod
101    def check(self, logger: logging.Logger, config: TConfig) -> AirbyteConnectionStatus:
102        """
103        Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect
104        to the Stripe API.
105        """
106
107
108class _WriteConfigProtocol(Protocol):
109    @staticmethod
110    def write_config(config: Mapping[str, Any], config_path: str) -> None: ...
111
112
113class DefaultConnectorMixin:
114    # can be overridden to change an input config
115    def configure(
116        self: _WriteConfigProtocol, config: Mapping[str, Any], temp_dir: str
117    ) -> Mapping[str, Any]:
118        config_path = os.path.join(temp_dir, "config.json")
119        self.write_config(config, config_path)
120        return config
121
122
123class Connector(DefaultConnectorMixin, BaseConnector[Mapping[str, Any]], ABC): ...
def load_optional_package_file(package: str, filename: str) -> Optional[bytes]:
23def load_optional_package_file(package: str, filename: str) -> Optional[bytes]:
24    """Gets a resource from a package, returning None if it does not exist"""
25    try:
26        return pkgutil.get_data(package, filename)
27    except FileNotFoundError:
28        return None

Gets a resource from a package, returning None if it does not exist

class BaseConnector(abc.ABC, typing.Generic[~TConfig]):
 34class BaseConnector(ABC, Generic[TConfig]):
 35    # configure whether the `check_config_against_spec_or_exit()` needs to be called
 36    check_config_against_spec: bool = True
 37
 38    @abstractmethod
 39    def configure(self, config: Mapping[str, Any], temp_dir: str) -> TConfig:
 40        """
 41        Persist config in temporary directory to run the Source job
 42        """
 43
 44    @staticmethod
 45    def read_config(config_path: str) -> Mapping[str, Any]:
 46        config = BaseConnector._read_json_file(config_path)
 47        if isinstance(config, Mapping):
 48            return config
 49        else:
 50            raise ValueError(
 51                f"The content of {config_path} is not an object and therefore is not a valid config. Please ensure the file represent a config."
 52            )
 53
 54    @staticmethod
 55    def _read_json_file(file_path: str) -> Any:
 56        with open(file_path, "r") as file:
 57            contents = file.read()
 58
 59        try:
 60            return json.loads(contents)
 61        except json.JSONDecodeError as error:
 62            raise ValueError(
 63                f"Could not read json file {file_path}: {error}. Please ensure that it is a valid JSON."
 64            )
 65
 66    @staticmethod
 67    def write_config(config: TConfig, config_path: str) -> None:
 68        with open(config_path, "w") as fh:
 69            fh.write(json.dumps(config))
 70
 71    def spec(self, logger: logging.Logger) -> ConnectorSpecification:
 72        """
 73        Returns the spec for this integration. The spec is a JSON-Schema object describing the required configurations (e.g: username and password)
 74        required to run this integration. By default, this will be loaded from a "spec.yaml" or a "spec.json" in the package root.
 75        """
 76
 77        package = self.__class__.__module__.split(".")[0]
 78
 79        yaml_spec = load_optional_package_file(package, "spec.yaml")
 80        json_spec = load_optional_package_file(package, "spec.json")
 81
 82        if yaml_spec and json_spec:
 83            raise RuntimeError(
 84                "Found multiple spec files in the package. Only one of spec.yaml or spec.json should be provided."
 85            )
 86
 87        if yaml_spec:
 88            spec_obj = yaml.load(yaml_spec, Loader=yaml.SafeLoader)
 89        elif json_spec:
 90            try:
 91                spec_obj = json.loads(json_spec)
 92            except json.JSONDecodeError as error:
 93                raise ValueError(
 94                    f"Could not read json spec file: {error}. Please ensure that it is a valid JSON."
 95                )
 96        else:
 97            raise FileNotFoundError("Unable to find spec.yaml or spec.json in the package.")
 98
 99        return ConnectorSpecificationSerializer.load(spec_obj)
100
101    @abstractmethod
102    def check(self, logger: logging.Logger, config: TConfig) -> AirbyteConnectionStatus:
103        """
104        Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect
105        to the Stripe API.
106        """

Helper class that provides a standard way to create an ABC using inheritance.

check_config_against_spec: bool = True
@abstractmethod
def configure(self, config: Mapping[str, Any], temp_dir: str) -> ~TConfig:
38    @abstractmethod
39    def configure(self, config: Mapping[str, Any], temp_dir: str) -> TConfig:
40        """
41        Persist config in temporary directory to run the Source job
42        """

Persist config in temporary directory to run the Source job

@staticmethod
def read_config(config_path: str) -> Mapping[str, Any]:
44    @staticmethod
45    def read_config(config_path: str) -> Mapping[str, Any]:
46        config = BaseConnector._read_json_file(config_path)
47        if isinstance(config, Mapping):
48            return config
49        else:
50            raise ValueError(
51                f"The content of {config_path} is not an object and therefore is not a valid config. Please ensure the file represent a config."
52            )
@staticmethod
def write_config(config: ~TConfig, config_path: str) -> None:
66    @staticmethod
67    def write_config(config: TConfig, config_path: str) -> None:
68        with open(config_path, "w") as fh:
69            fh.write(json.dumps(config))
def spec( self, logger: logging.Logger) -> airbyte_protocol_dataclasses.models.airbyte_protocol.ConnectorSpecification:
71    def spec(self, logger: logging.Logger) -> ConnectorSpecification:
72        """
73        Returns the spec for this integration. The spec is a JSON-Schema object describing the required configurations (e.g: username and password)
74        required to run this integration. By default, this will be loaded from a "spec.yaml" or a "spec.json" in the package root.
75        """
76
77        package = self.__class__.__module__.split(".")[0]
78
79        yaml_spec = load_optional_package_file(package, "spec.yaml")
80        json_spec = load_optional_package_file(package, "spec.json")
81
82        if yaml_spec and json_spec:
83            raise RuntimeError(
84                "Found multiple spec files in the package. Only one of spec.yaml or spec.json should be provided."
85            )
86
87        if yaml_spec:
88            spec_obj = yaml.load(yaml_spec, Loader=yaml.SafeLoader)
89        elif json_spec:
90            try:
91                spec_obj = json.loads(json_spec)
92            except json.JSONDecodeError as error:
93                raise ValueError(
94                    f"Could not read json spec file: {error}. Please ensure that it is a valid JSON."
95                )
96        else:
97            raise FileNotFoundError("Unable to find spec.yaml or spec.json in the package.")
98
99        return ConnectorSpecificationSerializer.load(spec_obj)

Returns the spec for this integration. The spec is a JSON-Schema object describing the required configurations (e.g: username and password) required to run this integration. By default, this will be loaded from a "spec.yaml" or a "spec.json" in the package root.

@abstractmethod
def check( self, logger: logging.Logger, config: ~TConfig) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteConnectionStatus:
101    @abstractmethod
102    def check(self, logger: logging.Logger, config: TConfig) -> AirbyteConnectionStatus:
103        """
104        Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect
105        to the Stripe API.
106        """

Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect to the Stripe API.

class DefaultConnectorMixin:
114class DefaultConnectorMixin:
115    # can be overridden to change an input config
116    def configure(
117        self: _WriteConfigProtocol, config: Mapping[str, Any], temp_dir: str
118    ) -> Mapping[str, Any]:
119        config_path = os.path.join(temp_dir, "config.json")
120        self.write_config(config, config_path)
121        return config
def configure( self: airbyte_cdk.connector._WriteConfigProtocol, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]:
116    def configure(
117        self: _WriteConfigProtocol, config: Mapping[str, Any], temp_dir: str
118    ) -> Mapping[str, Any]:
119        config_path = os.path.join(temp_dir, "config.json")
120        self.write_config(config, config_path)
121        return config
124class Connector(DefaultConnectorMixin, BaseConnector[Mapping[str, Any]], ABC): ...

Helper class that provides a standard way to create an ABC using inheritance.