airbyte.sources.registry
Connectivity to the connector catalog registry.
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""Connectivity to the connector catalog registry.""" 3 4from __future__ import annotations 5 6import json 7import logging 8import os 9import warnings 10from copy import copy 11from dataclasses import dataclass 12from enum import Enum 13from pathlib import Path 14 15import requests 16 17from airbyte import exceptions as exc 18from airbyte._util.meta import is_docker_installed 19from airbyte.constants import AIRBYTE_OFFLINE_MODE 20from airbyte.logs import warn_once 21from airbyte.version import get_version 22 23 24logger = logging.getLogger("airbyte") 25 26 27__cache: dict[str, ConnectorMetadata] | None = None 28 29 30_REGISTRY_ENV_VAR = "AIRBYTE_LOCAL_REGISTRY" 31_REGISTRY_URL = "https://connectors.airbyte.com/files/registries/v0/oss_registry.json" 32 33_PYTHON_LANGUAGE = "python" 34_MANIFEST_ONLY_LANGUAGE = "manifest-only" 35 36_PYTHON_LANGUAGE_TAG = f"language:{_PYTHON_LANGUAGE}" 37_MANIFEST_ONLY_TAG = f"language:{_MANIFEST_ONLY_LANGUAGE}" 38 39_LOWCODE_CONNECTORS_FAILING_VALIDATION: list[str] = [] 40# Connectors that return 404 or some other misc exception. 41_LOWCODE_CONNECTORS_UNEXPECTED_ERRORS: list[str] = [ 42 "source-adjust", 43 "source-amazon-ads", 44 "source-marketo", 45] 46# (CDK) FileNotFoundError: Unable to find spec.yaml or spec.json in the package. 47_LOWCODE_CDK_FILE_NOT_FOUND_ERRORS: list[str] = [] 48_LOWCODE_CONNECTORS_EXCLUDED: list[str] = [ 49 *_LOWCODE_CONNECTORS_FAILING_VALIDATION, 50 *_LOWCODE_CONNECTORS_UNEXPECTED_ERRORS, 51 *_LOWCODE_CDK_FILE_NOT_FOUND_ERRORS, 52] 53 54 55class InstallType(str, Enum): 56 """The type of installation for a connector.""" 57 58 YAML = "yaml" 59 PYTHON = "python" 60 DOCKER = "docker" 61 JAVA = "java" 62 63 64class Language(str, Enum): 65 """The language of a connector.""" 66 67 PYTHON = InstallType.PYTHON.value 68 JAVA = InstallType.JAVA.value 69 MANIFEST_ONLY = _MANIFEST_ONLY_LANGUAGE 70 71 72@dataclass 73class ConnectorMetadata: 74 """Metadata for a connector.""" 75 76 name: str 77 """Connector name. For example, "source-google-sheets".""" 78 79 latest_available_version: str | None 80 """The latest available version of the connector.""" 81 82 pypi_package_name: str | None 83 """The name of the PyPI package for the connector, if it exists.""" 84 85 language: Language | None 86 """The language of the connector.""" 87 88 install_types: set[InstallType] 89 """The supported install types for the connector.""" 90 91 @property 92 def default_install_type(self) -> InstallType: 93 """Return the default install type for the connector.""" 94 if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types: 95 return InstallType.YAML 96 97 if InstallType.PYTHON in self.install_types: 98 return InstallType.PYTHON 99 100 # Else: Java or Docker 101 return InstallType.DOCKER 102 103 104def _get_registry_url() -> str: 105 if _REGISTRY_ENV_VAR in os.environ: 106 return str(os.environ.get(_REGISTRY_ENV_VAR)) 107 108 return _REGISTRY_URL 109 110 111def _is_registry_disabled(url: str) -> bool: 112 return url.upper() in {"0", "F", "FALSE"} or AIRBYTE_OFFLINE_MODE 113 114 115def _registry_entry_to_connector_metadata(entry: dict) -> ConnectorMetadata: 116 name = entry["dockerRepository"].replace("airbyte/", "") 117 latest_version: str | None = entry.get("dockerImageTag") 118 tags = entry.get("tags", []) 119 language: Language | None = None 120 121 if "language" in entry and entry["language"] is not None: 122 try: 123 language = Language(entry["language"]) 124 except Exception: 125 warnings.warn( 126 message=f"Invalid language for connector {name}: {entry['language']}", 127 stacklevel=2, 128 ) 129 if not language and _PYTHON_LANGUAGE_TAG in tags: 130 language = Language.PYTHON 131 if not language and _MANIFEST_ONLY_TAG in tags: 132 language = Language.MANIFEST_ONLY 133 134 remote_registries: dict = entry.get("remoteRegistries", {}) 135 pypi_registry: dict = remote_registries.get("pypi", {}) 136 pypi_package_name: str = pypi_registry.get("packageName", None) 137 pypi_enabled: bool = pypi_registry.get("enabled", False) 138 install_types: set[InstallType] = { 139 x 140 for x in [ 141 InstallType.DOCKER, # Always True 142 InstallType.PYTHON if language == Language.PYTHON and pypi_enabled else None, 143 InstallType.JAVA if language == Language.JAVA else None, 144 InstallType.YAML if language == Language.MANIFEST_ONLY else None, 145 ] 146 if x 147 } 148 149 return ConnectorMetadata( 150 name=name, 151 latest_available_version=latest_version, 152 pypi_package_name=pypi_package_name if pypi_enabled else None, 153 language=language, 154 install_types=install_types, 155 ) 156 157 158def _get_registry_cache(*, force_refresh: bool = False) -> dict[str, ConnectorMetadata]: 159 """Return the registry cache.""" 160 global __cache 161 if __cache and not force_refresh: 162 return __cache 163 164 registry_url = _get_registry_url() 165 166 if _is_registry_disabled(registry_url): 167 return {} 168 169 if registry_url.startswith("http"): 170 response = requests.get( 171 registry_url, 172 headers={"User-Agent": f"PyAirbyte/{get_version()}"}, 173 ) 174 response.raise_for_status() 175 data = response.json() 176 else: 177 # Assume local file 178 with Path(registry_url).open(encoding="utf-8") as f: 179 data = json.load(f) 180 181 new_cache: dict[str, ConnectorMetadata] = {} 182 183 for connector in data["sources"]: 184 connector_metadata = _registry_entry_to_connector_metadata(connector) 185 new_cache[connector_metadata.name] = connector_metadata 186 187 for connector in data["destinations"]: 188 connector_metadata = _registry_entry_to_connector_metadata(connector) 189 new_cache[connector_metadata.name] = connector_metadata 190 191 if len(new_cache) == 0: 192 # This isn't necessarily fatal, since users can bring their own 193 # connector definitions. 194 warn_once( 195 message=f"Connector registry is empty: {registry_url}", 196 with_stack=False, 197 ) 198 199 __cache = new_cache 200 return __cache 201 202 203def get_connector_metadata(name: str) -> ConnectorMetadata | None: 204 """Check the cache for the connector. 205 206 If the cache is empty, populate by calling update_cache. 207 """ 208 registry_url = _get_registry_url() 209 210 if _is_registry_disabled(registry_url): 211 return None 212 213 cache = copy(_get_registry_cache()) 214 215 if not cache: 216 raise exc.PyAirbyteInternalError( 217 message="Connector registry could not be loaded.", 218 context={ 219 "registry_url": _get_registry_url(), 220 }, 221 ) 222 if name not in cache: 223 raise exc.AirbyteConnectorNotRegisteredError( 224 connector_name=name, 225 context={ 226 "registry_url": _get_registry_url(), 227 "available_connectors": get_available_connectors(), 228 }, 229 ) 230 return cache[name] 231 232 233def get_available_connectors(install_type: InstallType | str | None = None) -> list[str]: 234 """Return a list of all available connectors. 235 236 Connectors will be returned in alphabetical order, with the standard prefix "source-". 237 """ 238 if install_type is None: 239 # No install type specified. Filter for whatever is runnable. 240 if is_docker_installed(): 241 logger.info("Docker is detected. Returning all connectors.") 242 # If Docker is available, return all connectors. 243 return sorted(conn.name for conn in _get_registry_cache().values()) 244 245 logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.") 246 247 # If Docker is not available, return only Python and Manifest-based connectors. 248 return sorted( 249 conn.name 250 for conn in _get_registry_cache().values() 251 if conn.language in {Language.PYTHON, Language.MANIFEST_ONLY} 252 ) 253 254 if not isinstance(install_type, InstallType): 255 install_type = InstallType(install_type) 256 257 if install_type == InstallType.PYTHON: 258 return sorted( 259 conn.name 260 for conn in _get_registry_cache().values() 261 if conn.pypi_package_name is not None 262 ) 263 264 if install_type == InstallType.JAVA: 265 warnings.warn( 266 message="Java connectors are not yet supported.", 267 stacklevel=2, 268 ) 269 return sorted( 270 conn.name for conn in _get_registry_cache().values() if conn.language == Language.JAVA 271 ) 272 273 if install_type == InstallType.DOCKER: 274 return sorted(conn.name for conn in _get_registry_cache().values()) 275 276 if install_type == InstallType.YAML: 277 return sorted( 278 conn.name 279 for conn in _get_registry_cache().values() 280 if InstallType.YAML in conn.install_types 281 and conn.name not in _LOWCODE_CONNECTORS_EXCLUDED 282 ) 283 284 # pragma: no cover # Should never be reached. 285 raise exc.PyAirbyteInputError( 286 message="Invalid install type.", 287 context={ 288 "install_type": install_type, 289 }, 290 )
logger =
<Logger airbyte (INFO)>
class
InstallType(builtins.str, enum.Enum):
56class InstallType(str, Enum): 57 """The type of installation for a connector.""" 58 59 YAML = "yaml" 60 PYTHON = "python" 61 DOCKER = "docker" 62 JAVA = "java"
The type of installation for a connector.
YAML =
<InstallType.YAML: 'yaml'>
PYTHON =
<InstallType.PYTHON: 'python'>
DOCKER =
<InstallType.DOCKER: 'docker'>
JAVA =
<InstallType.JAVA: 'java'>
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
class
Language(builtins.str, enum.Enum):
65class Language(str, Enum): 66 """The language of a connector.""" 67 68 PYTHON = InstallType.PYTHON.value 69 JAVA = InstallType.JAVA.value 70 MANIFEST_ONLY = _MANIFEST_ONLY_LANGUAGE
The language of a connector.
PYTHON =
<Language.PYTHON: 'python'>
JAVA =
<Language.JAVA: 'java'>
MANIFEST_ONLY =
<Language.MANIFEST_ONLY: 'manifest-only'>
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
@dataclass
class
ConnectorMetadata:
73@dataclass 74class ConnectorMetadata: 75 """Metadata for a connector.""" 76 77 name: str 78 """Connector name. For example, "source-google-sheets".""" 79 80 latest_available_version: str | None 81 """The latest available version of the connector.""" 82 83 pypi_package_name: str | None 84 """The name of the PyPI package for the connector, if it exists.""" 85 86 language: Language | None 87 """The language of the connector.""" 88 89 install_types: set[InstallType] 90 """The supported install types for the connector.""" 91 92 @property 93 def default_install_type(self) -> InstallType: 94 """Return the default install type for the connector.""" 95 if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types: 96 return InstallType.YAML 97 98 if InstallType.PYTHON in self.install_types: 99 return InstallType.PYTHON 100 101 # Else: Java or Docker 102 return InstallType.DOCKER
Metadata for a connector.
ConnectorMetadata( name: str, latest_available_version: str | None, pypi_package_name: str | None, language: Language | None, install_types: set[InstallType])
default_install_type: InstallType
92 @property 93 def default_install_type(self) -> InstallType: 94 """Return the default install type for the connector.""" 95 if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types: 96 return InstallType.YAML 97 98 if InstallType.PYTHON in self.install_types: 99 return InstallType.PYTHON 100 101 # Else: Java or Docker 102 return InstallType.DOCKER
Return the default install type for the connector.
204def get_connector_metadata(name: str) -> ConnectorMetadata | None: 205 """Check the cache for the connector. 206 207 If the cache is empty, populate by calling update_cache. 208 """ 209 registry_url = _get_registry_url() 210 211 if _is_registry_disabled(registry_url): 212 return None 213 214 cache = copy(_get_registry_cache()) 215 216 if not cache: 217 raise exc.PyAirbyteInternalError( 218 message="Connector registry could not be loaded.", 219 context={ 220 "registry_url": _get_registry_url(), 221 }, 222 ) 223 if name not in cache: 224 raise exc.AirbyteConnectorNotRegisteredError( 225 connector_name=name, 226 context={ 227 "registry_url": _get_registry_url(), 228 "available_connectors": get_available_connectors(), 229 }, 230 ) 231 return cache[name]
Check the cache for the connector.
If the cache is empty, populate by calling update_cache.
234def get_available_connectors(install_type: InstallType | str | None = None) -> list[str]: 235 """Return a list of all available connectors. 236 237 Connectors will be returned in alphabetical order, with the standard prefix "source-". 238 """ 239 if install_type is None: 240 # No install type specified. Filter for whatever is runnable. 241 if is_docker_installed(): 242 logger.info("Docker is detected. Returning all connectors.") 243 # If Docker is available, return all connectors. 244 return sorted(conn.name for conn in _get_registry_cache().values()) 245 246 logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.") 247 248 # If Docker is not available, return only Python and Manifest-based connectors. 249 return sorted( 250 conn.name 251 for conn in _get_registry_cache().values() 252 if conn.language in {Language.PYTHON, Language.MANIFEST_ONLY} 253 ) 254 255 if not isinstance(install_type, InstallType): 256 install_type = InstallType(install_type) 257 258 if install_type == InstallType.PYTHON: 259 return sorted( 260 conn.name 261 for conn in _get_registry_cache().values() 262 if conn.pypi_package_name is not None 263 ) 264 265 if install_type == InstallType.JAVA: 266 warnings.warn( 267 message="Java connectors are not yet supported.", 268 stacklevel=2, 269 ) 270 return sorted( 271 conn.name for conn in _get_registry_cache().values() if conn.language == Language.JAVA 272 ) 273 274 if install_type == InstallType.DOCKER: 275 return sorted(conn.name for conn in _get_registry_cache().values()) 276 277 if install_type == InstallType.YAML: 278 return sorted( 279 conn.name 280 for conn in _get_registry_cache().values() 281 if InstallType.YAML in conn.install_types 282 and conn.name not in _LOWCODE_CONNECTORS_EXCLUDED 283 ) 284 285 # pragma: no cover # Should never be reached. 286 raise exc.PyAirbyteInputError( 287 message="Invalid install type.", 288 context={ 289 "install_type": install_type, 290 }, 291 )
Return a list of all available connectors.
Connectors will be returned in alphabetical order, with the standard prefix "source-".