airbyte.sources.registry
Connectivity to the connector catalog registry.
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""Connectivity to the connector catalog registry.""" 3 4from __future__ import annotations 5 6import json 7import logging 8import os 9import warnings 10from copy import copy 11from enum import Enum 12from pathlib import Path 13 14import requests 15from pydantic import BaseModel 16 17from airbyte import exceptions as exc 18from airbyte._util.meta import is_docker_installed 19from airbyte.constants import AIRBYTE_OFFLINE_MODE 20from airbyte.logs import warn_once 21from airbyte.version import get_version 22 23 24logger = logging.getLogger("airbyte") 25 26 27__cache: dict[str, ConnectorMetadata] | None = None 28 29 30_REGISTRY_ENV_VAR = "AIRBYTE_LOCAL_REGISTRY" 31_REGISTRY_URL = "https://connectors.airbyte.com/files/registries/v0/oss_registry.json" 32 33_PYTHON_LANGUAGE = "python" 34_MANIFEST_ONLY_LANGUAGE = "manifest-only" 35 36_PYTHON_LANGUAGE_TAG = f"language:{_PYTHON_LANGUAGE}" 37_MANIFEST_ONLY_TAG = f"language:{_MANIFEST_ONLY_LANGUAGE}" 38 39 40class InstallType(str, Enum): 41 """The type of installation for a connector.""" 42 43 YAML = "yaml" 44 PYTHON = "python" 45 DOCKER = "docker" 46 JAVA = "java" 47 48 49class Language(str, Enum): 50 """The language of a connector.""" 51 52 PYTHON = InstallType.PYTHON.value 53 JAVA = InstallType.JAVA.value 54 MANIFEST_ONLY = _MANIFEST_ONLY_LANGUAGE 55 56 57class ConnectorMetadata(BaseModel): 58 """Metadata for a connector.""" 59 60 name: str 61 """Connector name. For example, "source-google-sheets".""" 62 63 latest_available_version: str | None 64 """The latest available version of the connector.""" 65 66 pypi_package_name: str | None 67 """The name of the PyPI package for the connector, if it exists.""" 68 69 language: Language | None 70 """The language of the connector.""" 71 72 install_types: set[InstallType] 73 """The supported install types for the connector.""" 74 75 suggested_streams: list[str] | None = None 76 """A list of suggested streams for the connector, if available.""" 77 78 @property 79 def default_install_type(self) -> InstallType: 80 """Return the default install type for the connector.""" 81 if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types: 82 return InstallType.YAML 83 84 if InstallType.PYTHON in self.install_types: 85 return InstallType.PYTHON 86 87 # Else: Java or Docker 88 return InstallType.DOCKER 89 90 91def _get_registry_url() -> str: 92 if _REGISTRY_ENV_VAR in os.environ: 93 return str(os.environ.get(_REGISTRY_ENV_VAR)) 94 95 return _REGISTRY_URL 96 97 98def _is_registry_disabled(url: str) -> bool: 99 return url.upper() in {"0", "F", "FALSE"} or AIRBYTE_OFFLINE_MODE 100 101 102def _registry_entry_to_connector_metadata(entry: dict) -> ConnectorMetadata: 103 name = entry["dockerRepository"].replace("airbyte/", "") 104 latest_version: str | None = entry.get("dockerImageTag") 105 tags = entry.get("tags", []) 106 language: Language | None = None 107 108 if "language" in entry and entry["language"] is not None: 109 try: 110 language = Language(entry["language"]) 111 except Exception: 112 warnings.warn( 113 message=f"Invalid language for connector {name}: {entry['language']}", 114 stacklevel=2, 115 ) 116 if not language and _PYTHON_LANGUAGE_TAG in tags: 117 language = Language.PYTHON 118 if not language and _MANIFEST_ONLY_TAG in tags: 119 language = Language.MANIFEST_ONLY 120 121 remote_registries: dict = entry.get("remoteRegistries", {}) 122 pypi_registry: dict = remote_registries.get("pypi", {}) 123 pypi_package_name: str = pypi_registry.get("packageName", None) 124 pypi_enabled: bool = pypi_registry.get("enabled", False) 125 install_types: set[InstallType] = { 126 x 127 for x in [ 128 InstallType.DOCKER, # Always True 129 InstallType.PYTHON if language == Language.PYTHON and pypi_enabled else None, 130 InstallType.JAVA if language == Language.JAVA else None, 131 InstallType.YAML if language == Language.MANIFEST_ONLY else None, 132 ] 133 if x 134 } 135 136 return ConnectorMetadata( 137 name=name, 138 latest_available_version=latest_version, 139 pypi_package_name=pypi_package_name if pypi_enabled else None, 140 language=language, 141 install_types=install_types, 142 suggested_streams=entry.get("suggestedStreams", {}).get("streams", None), 143 ) 144 145 146def _get_registry_cache(*, force_refresh: bool = False) -> dict[str, ConnectorMetadata]: 147 """Return the registry cache.""" 148 global __cache 149 if __cache and not force_refresh: 150 return __cache 151 152 registry_url = _get_registry_url() 153 154 if _is_registry_disabled(registry_url): 155 return {} 156 157 if registry_url.startswith("http"): 158 response = requests.get( 159 registry_url, 160 headers={"User-Agent": f"PyAirbyte/{get_version()}"}, 161 ) 162 response.raise_for_status() 163 data = response.json() 164 else: 165 # Assume local file 166 with Path(registry_url).open(encoding="utf-8") as f: 167 data = json.load(f) 168 169 new_cache: dict[str, ConnectorMetadata] = {} 170 171 for connector in data["sources"]: 172 connector_metadata = _registry_entry_to_connector_metadata(connector) 173 new_cache[connector_metadata.name] = connector_metadata 174 175 for connector in data["destinations"]: 176 connector_metadata = _registry_entry_to_connector_metadata(connector) 177 new_cache[connector_metadata.name] = connector_metadata 178 179 if len(new_cache) == 0: 180 # This isn't necessarily fatal, since users can bring their own 181 # connector definitions. 182 warn_once( 183 message=f"Connector registry is empty: {registry_url}", 184 with_stack=False, 185 ) 186 187 __cache = new_cache 188 return __cache 189 190 191def get_connector_metadata(name: str) -> ConnectorMetadata | None: 192 """Check the cache for the connector. 193 194 If the cache is empty, populate by calling update_cache. 195 """ 196 registry_url = _get_registry_url() 197 198 if _is_registry_disabled(registry_url): 199 return None 200 201 cache = copy(_get_registry_cache()) 202 203 if not cache: 204 raise exc.PyAirbyteInternalError( 205 message="Connector registry could not be loaded.", 206 context={ 207 "registry_url": _get_registry_url(), 208 }, 209 ) 210 if name not in cache: 211 raise exc.AirbyteConnectorNotRegisteredError( 212 connector_name=name, 213 context={ 214 "registry_url": _get_registry_url(), 215 "available_connectors": get_available_connectors(), 216 }, 217 ) 218 return cache[name] 219 220 221def get_available_connectors(install_type: InstallType | str | None = None) -> list[str]: 222 """Return a list of all available connectors. 223 224 Connectors will be returned in alphabetical order, with the standard prefix "source-". 225 """ 226 if install_type is None: 227 # No install type specified. Filter for whatever is runnable. 228 if is_docker_installed(): 229 logger.info("Docker is detected. Returning all connectors.") 230 # If Docker is available, return all connectors. 231 return sorted(conn.name for conn in _get_registry_cache().values()) 232 233 logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.") 234 235 # If Docker is not available, return only Python and Manifest-based connectors. 236 return sorted( 237 conn.name 238 for conn in _get_registry_cache().values() 239 if conn.language in {Language.PYTHON, Language.MANIFEST_ONLY} 240 ) 241 242 if not isinstance(install_type, InstallType): 243 install_type = InstallType(install_type) 244 245 if install_type == InstallType.PYTHON: 246 return sorted( 247 conn.name 248 for conn in _get_registry_cache().values() 249 if conn.pypi_package_name is not None 250 ) 251 252 if install_type == InstallType.JAVA: 253 warnings.warn( 254 message="Java connectors are not yet supported.", 255 stacklevel=2, 256 ) 257 return sorted( 258 conn.name for conn in _get_registry_cache().values() if conn.language == Language.JAVA 259 ) 260 261 if install_type == InstallType.DOCKER: 262 return sorted(conn.name for conn in _get_registry_cache().values()) 263 264 if install_type == InstallType.YAML: 265 return sorted( 266 conn.name 267 for conn in _get_registry_cache().values() 268 if InstallType.YAML in conn.install_types 269 ) 270 271 # pragma: no cover # Should never be reached. 272 raise exc.PyAirbyteInputError( 273 message="Invalid install type.", 274 context={ 275 "install_type": install_type, 276 }, 277 )
logger =
<Logger airbyte (INFO)>
class
InstallType(builtins.str, enum.Enum):
41class InstallType(str, Enum): 42 """The type of installation for a connector.""" 43 44 YAML = "yaml" 45 PYTHON = "python" 46 DOCKER = "docker" 47 JAVA = "java"
The type of installation for a connector.
YAML =
<InstallType.YAML: 'yaml'>
PYTHON =
<InstallType.PYTHON: 'python'>
DOCKER =
<InstallType.DOCKER: 'docker'>
JAVA =
<InstallType.JAVA: 'java'>
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
class
Language(builtins.str, enum.Enum):
50class Language(str, Enum): 51 """The language of a connector.""" 52 53 PYTHON = InstallType.PYTHON.value 54 JAVA = InstallType.JAVA.value 55 MANIFEST_ONLY = _MANIFEST_ONLY_LANGUAGE
The language of a connector.
PYTHON =
<Language.PYTHON: 'python'>
JAVA =
<Language.JAVA: 'java'>
MANIFEST_ONLY =
<Language.MANIFEST_ONLY: 'manifest-only'>
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
class
ConnectorMetadata(pydantic.main.BaseModel):
58class ConnectorMetadata(BaseModel): 59 """Metadata for a connector.""" 60 61 name: str 62 """Connector name. For example, "source-google-sheets".""" 63 64 latest_available_version: str | None 65 """The latest available version of the connector.""" 66 67 pypi_package_name: str | None 68 """The name of the PyPI package for the connector, if it exists.""" 69 70 language: Language | None 71 """The language of the connector.""" 72 73 install_types: set[InstallType] 74 """The supported install types for the connector.""" 75 76 suggested_streams: list[str] | None = None 77 """A list of suggested streams for the connector, if available.""" 78 79 @property 80 def default_install_type(self) -> InstallType: 81 """Return the default install type for the connector.""" 82 if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types: 83 return InstallType.YAML 84 85 if InstallType.PYTHON in self.install_types: 86 return InstallType.PYTHON 87 88 # Else: Java or Docker 89 return InstallType.DOCKER
Metadata for a connector.
default_install_type: InstallType
79 @property 80 def default_install_type(self) -> InstallType: 81 """Return the default install type for the connector.""" 82 if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types: 83 return InstallType.YAML 84 85 if InstallType.PYTHON in self.install_types: 86 return InstallType.PYTHON 87 88 # Else: Java or Docker 89 return InstallType.DOCKER
Return the default install type for the connector.
model_config: ClassVar[pydantic.config.ConfigDict] =
{}
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- model_fields
- model_computed_fields
192def get_connector_metadata(name: str) -> ConnectorMetadata | None: 193 """Check the cache for the connector. 194 195 If the cache is empty, populate by calling update_cache. 196 """ 197 registry_url = _get_registry_url() 198 199 if _is_registry_disabled(registry_url): 200 return None 201 202 cache = copy(_get_registry_cache()) 203 204 if not cache: 205 raise exc.PyAirbyteInternalError( 206 message="Connector registry could not be loaded.", 207 context={ 208 "registry_url": _get_registry_url(), 209 }, 210 ) 211 if name not in cache: 212 raise exc.AirbyteConnectorNotRegisteredError( 213 connector_name=name, 214 context={ 215 "registry_url": _get_registry_url(), 216 "available_connectors": get_available_connectors(), 217 }, 218 ) 219 return cache[name]
Check the cache for the connector.
If the cache is empty, populate by calling update_cache.
222def get_available_connectors(install_type: InstallType | str | None = None) -> list[str]: 223 """Return a list of all available connectors. 224 225 Connectors will be returned in alphabetical order, with the standard prefix "source-". 226 """ 227 if install_type is None: 228 # No install type specified. Filter for whatever is runnable. 229 if is_docker_installed(): 230 logger.info("Docker is detected. Returning all connectors.") 231 # If Docker is available, return all connectors. 232 return sorted(conn.name for conn in _get_registry_cache().values()) 233 234 logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.") 235 236 # If Docker is not available, return only Python and Manifest-based connectors. 237 return sorted( 238 conn.name 239 for conn in _get_registry_cache().values() 240 if conn.language in {Language.PYTHON, Language.MANIFEST_ONLY} 241 ) 242 243 if not isinstance(install_type, InstallType): 244 install_type = InstallType(install_type) 245 246 if install_type == InstallType.PYTHON: 247 return sorted( 248 conn.name 249 for conn in _get_registry_cache().values() 250 if conn.pypi_package_name is not None 251 ) 252 253 if install_type == InstallType.JAVA: 254 warnings.warn( 255 message="Java connectors are not yet supported.", 256 stacklevel=2, 257 ) 258 return sorted( 259 conn.name for conn in _get_registry_cache().values() if conn.language == Language.JAVA 260 ) 261 262 if install_type == InstallType.DOCKER: 263 return sorted(conn.name for conn in _get_registry_cache().values()) 264 265 if install_type == InstallType.YAML: 266 return sorted( 267 conn.name 268 for conn in _get_registry_cache().values() 269 if InstallType.YAML in conn.install_types 270 ) 271 272 # pragma: no cover # Should never be reached. 273 raise exc.PyAirbyteInputError( 274 message="Invalid install type.", 275 context={ 276 "install_type": install_type, 277 }, 278 )
Return a list of all available connectors.
Connectors will be returned in alphabetical order, with the standard prefix "source-".