airbyte_cdk.connector
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5 6import json 7import logging 8import os 9import pkgutil 10from abc import ABC, abstractmethod 11from typing import Any, Generic, Mapping, Optional, Protocol, TypeVar 12 13import yaml 14 15from airbyte_cdk.models import ( 16 AirbyteConnectionStatus, 17 ConnectorSpecification, 18 ConnectorSpecificationSerializer, 19) 20 21 22def load_optional_package_file(package: str, filename: str) -> Optional[bytes]: 23 """Gets a resource from a package, returning None if it does not exist""" 24 try: 25 return pkgutil.get_data(package, filename) 26 except FileNotFoundError: 27 return None 28 29 30TConfig = TypeVar("TConfig", bound=Mapping[str, Any]) 31 32 33class BaseConnector(ABC, Generic[TConfig]): 34 # configure whether the `check_config_against_spec_or_exit()` needs to be called 35 check_config_against_spec: bool = True 36 37 @abstractmethod 38 def configure(self, config: Mapping[str, Any], temp_dir: str) -> TConfig: 39 """ 40 Persist config in temporary directory to run the Source job 41 """ 42 43 @staticmethod 44 def read_config(config_path: str) -> Mapping[str, Any]: 45 config = BaseConnector._read_json_file(config_path) 46 if isinstance(config, Mapping): 47 return config 48 else: 49 raise ValueError( 50 f"The content of {config_path} is not an object and therefore is not a valid config. Please ensure the file represent a config." 51 ) 52 53 @staticmethod 54 def _read_json_file(file_path: str) -> Any: 55 with open(file_path, "r") as file: 56 contents = file.read() 57 58 try: 59 return json.loads(contents) 60 except json.JSONDecodeError as error: 61 raise ValueError( 62 f"Could not read json file {file_path}: {error}. Please ensure that it is a valid JSON." 63 ) 64 65 @staticmethod 66 def write_config(config: TConfig, config_path: str) -> None: 67 with open(config_path, "w") as fh: 68 fh.write(json.dumps(config)) 69 70 def spec(self, logger: logging.Logger) -> ConnectorSpecification: 71 """ 72 Returns the spec for this integration. The spec is a JSON-Schema object describing the required configurations (e.g: username and password) 73 required to run this integration. By default, this will be loaded from a "spec.yaml" or a "spec.json" in the package root. 74 """ 75 76 package = self.__class__.__module__.split(".")[0] 77 78 yaml_spec = load_optional_package_file(package, "spec.yaml") 79 json_spec = load_optional_package_file(package, "spec.json") 80 81 if yaml_spec and json_spec: 82 raise RuntimeError( 83 "Found multiple spec files in the package. Only one of spec.yaml or spec.json should be provided." 84 ) 85 86 if yaml_spec: 87 spec_obj = yaml.load(yaml_spec, Loader=yaml.SafeLoader) 88 elif json_spec: 89 try: 90 spec_obj = json.loads(json_spec) 91 except json.JSONDecodeError as error: 92 raise ValueError( 93 f"Could not read json spec file: {error}. Please ensure that it is a valid JSON." 94 ) 95 else: 96 raise FileNotFoundError("Unable to find spec.yaml or spec.json in the package.") 97 98 return ConnectorSpecificationSerializer.load(spec_obj) 99 100 @abstractmethod 101 def check(self, logger: logging.Logger, config: TConfig) -> AirbyteConnectionStatus: 102 """ 103 Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect 104 to the Stripe API. 105 """ 106 107 108class _WriteConfigProtocol(Protocol): 109 @staticmethod 110 def write_config(config: Mapping[str, Any], config_path: str) -> None: ... 111 112 113class DefaultConnectorMixin: 114 # can be overridden to change an input config 115 def configure( 116 self: _WriteConfigProtocol, config: Mapping[str, Any], temp_dir: str 117 ) -> Mapping[str, Any]: 118 config_path = os.path.join(temp_dir, "config.json") 119 self.write_config(config, config_path) 120 return config 121 122 123class Connector(DefaultConnectorMixin, BaseConnector[Mapping[str, Any]], ABC): ...
def
load_optional_package_file(package: str, filename: str) -> Optional[bytes]:
23def load_optional_package_file(package: str, filename: str) -> Optional[bytes]: 24 """Gets a resource from a package, returning None if it does not exist""" 25 try: 26 return pkgutil.get_data(package, filename) 27 except FileNotFoundError: 28 return None
Gets a resource from a package, returning None if it does not exist
class
BaseConnector(abc.ABC, typing.Generic[~TConfig]):
34class BaseConnector(ABC, Generic[TConfig]): 35 # configure whether the `check_config_against_spec_or_exit()` needs to be called 36 check_config_against_spec: bool = True 37 38 @abstractmethod 39 def configure(self, config: Mapping[str, Any], temp_dir: str) -> TConfig: 40 """ 41 Persist config in temporary directory to run the Source job 42 """ 43 44 @staticmethod 45 def read_config(config_path: str) -> Mapping[str, Any]: 46 config = BaseConnector._read_json_file(config_path) 47 if isinstance(config, Mapping): 48 return config 49 else: 50 raise ValueError( 51 f"The content of {config_path} is not an object and therefore is not a valid config. Please ensure the file represent a config." 52 ) 53 54 @staticmethod 55 def _read_json_file(file_path: str) -> Any: 56 with open(file_path, "r") as file: 57 contents = file.read() 58 59 try: 60 return json.loads(contents) 61 except json.JSONDecodeError as error: 62 raise ValueError( 63 f"Could not read json file {file_path}: {error}. Please ensure that it is a valid JSON." 64 ) 65 66 @staticmethod 67 def write_config(config: TConfig, config_path: str) -> None: 68 with open(config_path, "w") as fh: 69 fh.write(json.dumps(config)) 70 71 def spec(self, logger: logging.Logger) -> ConnectorSpecification: 72 """ 73 Returns the spec for this integration. The spec is a JSON-Schema object describing the required configurations (e.g: username and password) 74 required to run this integration. By default, this will be loaded from a "spec.yaml" or a "spec.json" in the package root. 75 """ 76 77 package = self.__class__.__module__.split(".")[0] 78 79 yaml_spec = load_optional_package_file(package, "spec.yaml") 80 json_spec = load_optional_package_file(package, "spec.json") 81 82 if yaml_spec and json_spec: 83 raise RuntimeError( 84 "Found multiple spec files in the package. Only one of spec.yaml or spec.json should be provided." 85 ) 86 87 if yaml_spec: 88 spec_obj = yaml.load(yaml_spec, Loader=yaml.SafeLoader) 89 elif json_spec: 90 try: 91 spec_obj = json.loads(json_spec) 92 except json.JSONDecodeError as error: 93 raise ValueError( 94 f"Could not read json spec file: {error}. Please ensure that it is a valid JSON." 95 ) 96 else: 97 raise FileNotFoundError("Unable to find spec.yaml or spec.json in the package.") 98 99 return ConnectorSpecificationSerializer.load(spec_obj) 100 101 @abstractmethod 102 def check(self, logger: logging.Logger, config: TConfig) -> AirbyteConnectionStatus: 103 """ 104 Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect 105 to the Stripe API. 106 """
Helper class that provides a standard way to create an ABC using inheritance.
@abstractmethod
def
configure(self, config: Mapping[str, Any], temp_dir: str) -> ~TConfig:
38 @abstractmethod 39 def configure(self, config: Mapping[str, Any], temp_dir: str) -> TConfig: 40 """ 41 Persist config in temporary directory to run the Source job 42 """
Persist config in temporary directory to run the Source job
@staticmethod
def
read_config(config_path: str) -> Mapping[str, Any]:
44 @staticmethod 45 def read_config(config_path: str) -> Mapping[str, Any]: 46 config = BaseConnector._read_json_file(config_path) 47 if isinstance(config, Mapping): 48 return config 49 else: 50 raise ValueError( 51 f"The content of {config_path} is not an object and therefore is not a valid config. Please ensure the file represent a config." 52 )
def
spec( self, logger: logging.Logger) -> airbyte_protocol_dataclasses.models.airbyte_protocol.ConnectorSpecification:
71 def spec(self, logger: logging.Logger) -> ConnectorSpecification: 72 """ 73 Returns the spec for this integration. The spec is a JSON-Schema object describing the required configurations (e.g: username and password) 74 required to run this integration. By default, this will be loaded from a "spec.yaml" or a "spec.json" in the package root. 75 """ 76 77 package = self.__class__.__module__.split(".")[0] 78 79 yaml_spec = load_optional_package_file(package, "spec.yaml") 80 json_spec = load_optional_package_file(package, "spec.json") 81 82 if yaml_spec and json_spec: 83 raise RuntimeError( 84 "Found multiple spec files in the package. Only one of spec.yaml or spec.json should be provided." 85 ) 86 87 if yaml_spec: 88 spec_obj = yaml.load(yaml_spec, Loader=yaml.SafeLoader) 89 elif json_spec: 90 try: 91 spec_obj = json.loads(json_spec) 92 except json.JSONDecodeError as error: 93 raise ValueError( 94 f"Could not read json spec file: {error}. Please ensure that it is a valid JSON." 95 ) 96 else: 97 raise FileNotFoundError("Unable to find spec.yaml or spec.json in the package.") 98 99 return ConnectorSpecificationSerializer.load(spec_obj)
Returns the spec for this integration. The spec is a JSON-Schema object describing the required configurations (e.g: username and password) required to run this integration. By default, this will be loaded from a "spec.yaml" or a "spec.json" in the package root.
@abstractmethod
def
check( self, logger: logging.Logger, config: ~TConfig) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteConnectionStatus:
101 @abstractmethod 102 def check(self, logger: logging.Logger, config: TConfig) -> AirbyteConnectionStatus: 103 """ 104 Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect 105 to the Stripe API. 106 """
Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect to the Stripe API.
class
DefaultConnectorMixin:
114class DefaultConnectorMixin: 115 # can be overridden to change an input config 116 def configure( 117 self: _WriteConfigProtocol, config: Mapping[str, Any], temp_dir: str 118 ) -> Mapping[str, Any]: 119 config_path = os.path.join(temp_dir, "config.json") 120 self.write_config(config, config_path) 121 return config
class
Connector(DefaultConnectorMixin, airbyte_cdk.connector.BaseConnector[typing.Mapping[str, typing.Any]], abc.ABC):
124class Connector(DefaultConnectorMixin, BaseConnector[Mapping[str, Any]], ABC): ...
Helper class that provides a standard way to create an ABC using inheritance.