airbyte.sources.registry
Connectivity to the connector catalog registry.
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""Connectivity to the connector catalog registry.""" 3 4from __future__ import annotations 5 6import json 7import logging 8import os 9import warnings 10from copy import copy 11from enum import Enum 12from pathlib import Path 13from typing import cast 14 15import requests 16from pydantic import BaseModel 17 18from airbyte import exceptions as exc 19from airbyte._util.meta import is_docker_installed 20from airbyte.constants import AIRBYTE_OFFLINE_MODE 21from airbyte.logs import warn_once 22from airbyte.version import get_version 23 24 25logger = logging.getLogger("airbyte") 26 27 28__cache: dict[str, ConnectorMetadata] | None = None 29 30 31_REGISTRY_ENV_VAR = "AIRBYTE_LOCAL_REGISTRY" 32_REGISTRY_URL = "https://connectors.airbyte.com/files/registries/v0/oss_registry.json" 33 34_PYTHON_LANGUAGE = "python" 35_MANIFEST_ONLY_LANGUAGE = "manifest-only" 36 37_PYTHON_LANGUAGE_TAG = f"language:{_PYTHON_LANGUAGE}" 38_MANIFEST_ONLY_TAG = f"language:{_MANIFEST_ONLY_LANGUAGE}" 39 40 41class InstallType(str, Enum): 42 """The type of installation for a connector.""" 43 44 YAML = "yaml" 45 PYTHON = "python" 46 DOCKER = "docker" 47 JAVA = "java" 48 49 50class Language(str, Enum): 51 """The language of a connector.""" 52 53 PYTHON = InstallType.PYTHON.value 54 JAVA = InstallType.JAVA.value 55 MANIFEST_ONLY = _MANIFEST_ONLY_LANGUAGE 56 57 58class ConnectorMetadata(BaseModel): 59 """Metadata for a connector.""" 60 61 name: str 62 """Connector name. For example, "source-google-sheets".""" 63 64 latest_available_version: str | None 65 """The latest available version of the connector.""" 66 67 pypi_package_name: str | None 68 """The name of the PyPI package for the connector, if it exists.""" 69 70 language: Language | None 71 """The language of the connector.""" 72 73 install_types: set[InstallType] 74 """The supported install types for the connector.""" 75 76 suggested_streams: list[str] | None = None 77 """A list of suggested streams for the connector, if available.""" 78 79 @property 80 def default_install_type(self) -> InstallType: 81 """Return the default install type for the connector.""" 82 if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types: 83 return InstallType.YAML 84 85 if InstallType.PYTHON in self.install_types: 86 return InstallType.PYTHON 87 88 # Else: Java or Docker 89 return InstallType.DOCKER 90 91 92def _get_registry_url() -> str: 93 if _REGISTRY_ENV_VAR in os.environ: 94 return str(os.environ.get(_REGISTRY_ENV_VAR)) 95 96 return _REGISTRY_URL 97 98 99def _is_registry_disabled(url: str) -> bool: 100 return url.upper() in {"0", "F", "FALSE"} or AIRBYTE_OFFLINE_MODE 101 102 103def _registry_entry_to_connector_metadata(entry: dict) -> ConnectorMetadata: 104 name = entry["dockerRepository"].replace("airbyte/", "") 105 latest_version: str | None = entry.get("dockerImageTag") 106 tags = entry.get("tags", []) 107 language: Language | None = None 108 109 if "language" in entry and entry["language"] is not None: 110 try: 111 language = Language(entry["language"]) 112 except Exception: 113 warnings.warn( 114 message=f"Invalid language for connector {name}: {entry['language']}", 115 stacklevel=2, 116 ) 117 if not language and _PYTHON_LANGUAGE_TAG in tags: 118 language = Language.PYTHON 119 if not language and _MANIFEST_ONLY_TAG in tags: 120 language = Language.MANIFEST_ONLY 121 122 remote_registries: dict = entry.get("remoteRegistries", {}) 123 pypi_registry: dict = remote_registries.get("pypi", {}) 124 pypi_package_name = cast( 125 "str | None", 126 pypi_registry.get("packageName", None), 127 ) 128 pypi_enabled: bool = pypi_registry.get("enabled", False) 129 install_types: set[InstallType] = { 130 x 131 for x in [ 132 InstallType.DOCKER, # Always True 133 InstallType.PYTHON if language == Language.PYTHON and pypi_enabled else None, 134 InstallType.JAVA if language == Language.JAVA else None, 135 InstallType.YAML if language == Language.MANIFEST_ONLY else None, 136 ] 137 if x 138 } 139 140 return ConnectorMetadata( 141 name=name, 142 latest_available_version=latest_version, 143 pypi_package_name=pypi_package_name if pypi_enabled else None, 144 language=language, 145 install_types=install_types, 146 suggested_streams=entry.get("suggestedStreams", {}).get("streams", None), 147 ) 148 149 150def _get_registry_cache(*, force_refresh: bool = False) -> dict[str, ConnectorMetadata]: 151 """Return the registry cache.""" 152 global __cache 153 if __cache and not force_refresh: 154 return __cache 155 156 registry_url = _get_registry_url() 157 158 if _is_registry_disabled(registry_url): 159 return {} 160 161 if registry_url.startswith("http"): 162 response = requests.get( 163 registry_url, 164 headers={"User-Agent": f"PyAirbyte/{get_version()}"}, 165 ) 166 response.raise_for_status() 167 data = response.json() 168 else: 169 # Assume local file 170 with Path(registry_url).open(encoding="utf-8") as f: 171 data = json.load(f) 172 173 new_cache: dict[str, ConnectorMetadata] = {} 174 175 for connector in data["sources"]: 176 connector_metadata = _registry_entry_to_connector_metadata(connector) 177 new_cache[connector_metadata.name] = connector_metadata 178 179 for connector in data["destinations"]: 180 connector_metadata = _registry_entry_to_connector_metadata(connector) 181 new_cache[connector_metadata.name] = connector_metadata 182 183 if len(new_cache) == 0: 184 # This isn't necessarily fatal, since users can bring their own 185 # connector definitions. 186 warn_once( 187 message=f"Connector registry is empty: {registry_url}", 188 with_stack=False, 189 ) 190 191 __cache = new_cache 192 return __cache 193 194 195def get_connector_metadata(name: str) -> ConnectorMetadata | None: 196 """Check the cache for the connector. 197 198 If the cache is empty, populate by calling update_cache. 199 """ 200 registry_url = _get_registry_url() 201 202 if _is_registry_disabled(registry_url): 203 return None 204 205 cache = copy(_get_registry_cache()) 206 207 if not cache: 208 raise exc.PyAirbyteInternalError( 209 message="Connector registry could not be loaded.", 210 context={ 211 "registry_url": _get_registry_url(), 212 }, 213 ) 214 if name not in cache: 215 raise exc.AirbyteConnectorNotRegisteredError( 216 connector_name=name, 217 context={ 218 "registry_url": _get_registry_url(), 219 "available_connectors": get_available_connectors(), 220 }, 221 ) 222 return cache[name] 223 224 225def get_available_connectors(install_type: InstallType | str | None = None) -> list[str]: 226 """Return a list of all available connectors. 227 228 Connectors will be returned in alphabetical order, with the standard prefix "source-". 229 """ 230 if install_type is None: 231 # No install type specified. Filter for whatever is runnable. 232 if is_docker_installed(): 233 logger.info("Docker is detected. Returning all connectors.") 234 # If Docker is available, return all connectors. 235 return sorted(conn.name for conn in _get_registry_cache().values()) 236 237 logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.") 238 239 # If Docker is not available, return only Python and Manifest-based connectors. 240 return sorted( 241 conn.name 242 for conn in _get_registry_cache().values() 243 if conn.language in {Language.PYTHON, Language.MANIFEST_ONLY} 244 ) 245 246 if not isinstance(install_type, InstallType): 247 install_type = InstallType(install_type) 248 249 if install_type == InstallType.PYTHON: 250 return sorted( 251 conn.name 252 for conn in _get_registry_cache().values() 253 if conn.pypi_package_name is not None 254 ) 255 256 if install_type == InstallType.JAVA: 257 warnings.warn( 258 message="Java connectors are not yet supported.", 259 stacklevel=2, 260 ) 261 return sorted( 262 conn.name for conn in _get_registry_cache().values() if conn.language == Language.JAVA 263 ) 264 265 if install_type == InstallType.DOCKER: 266 return sorted(conn.name for conn in _get_registry_cache().values()) 267 268 if install_type == InstallType.YAML: 269 return sorted( 270 conn.name 271 for conn in _get_registry_cache().values() 272 if InstallType.YAML in conn.install_types 273 ) 274 275 # pragma: no cover # Should never be reached. 276 raise exc.PyAirbyteInputError( 277 message="Invalid install type.", 278 context={ 279 "install_type": install_type, 280 }, 281 )
logger =
<Logger airbyte (INFO)>
class
InstallType(builtins.str, enum.Enum):
42class InstallType(str, Enum): 43 """The type of installation for a connector.""" 44 45 YAML = "yaml" 46 PYTHON = "python" 47 DOCKER = "docker" 48 JAVA = "java"
The type of installation for a connector.
YAML =
<InstallType.YAML: 'yaml'>
PYTHON =
<InstallType.PYTHON: 'python'>
DOCKER =
<InstallType.DOCKER: 'docker'>
JAVA =
<InstallType.JAVA: 'java'>
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
class
Language(builtins.str, enum.Enum):
51class Language(str, Enum): 52 """The language of a connector.""" 53 54 PYTHON = InstallType.PYTHON.value 55 JAVA = InstallType.JAVA.value 56 MANIFEST_ONLY = _MANIFEST_ONLY_LANGUAGE
The language of a connector.
PYTHON =
<Language.PYTHON: 'python'>
JAVA =
<Language.JAVA: 'java'>
MANIFEST_ONLY =
<Language.MANIFEST_ONLY: 'manifest-only'>
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
class
ConnectorMetadata(pydantic.main.BaseModel):
59class ConnectorMetadata(BaseModel): 60 """Metadata for a connector.""" 61 62 name: str 63 """Connector name. For example, "source-google-sheets".""" 64 65 latest_available_version: str | None 66 """The latest available version of the connector.""" 67 68 pypi_package_name: str | None 69 """The name of the PyPI package for the connector, if it exists.""" 70 71 language: Language | None 72 """The language of the connector.""" 73 74 install_types: set[InstallType] 75 """The supported install types for the connector.""" 76 77 suggested_streams: list[str] | None = None 78 """A list of suggested streams for the connector, if available.""" 79 80 @property 81 def default_install_type(self) -> InstallType: 82 """Return the default install type for the connector.""" 83 if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types: 84 return InstallType.YAML 85 86 if InstallType.PYTHON in self.install_types: 87 return InstallType.PYTHON 88 89 # Else: Java or Docker 90 return InstallType.DOCKER
Metadata for a connector.
default_install_type: InstallType
80 @property 81 def default_install_type(self) -> InstallType: 82 """Return the default install type for the connector.""" 83 if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types: 84 return InstallType.YAML 85 86 if InstallType.PYTHON in self.install_types: 87 return InstallType.PYTHON 88 89 # Else: Java or Docker 90 return InstallType.DOCKER
Return the default install type for the connector.
model_config: ClassVar[pydantic.config.ConfigDict] =
{}
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
196def get_connector_metadata(name: str) -> ConnectorMetadata | None: 197 """Check the cache for the connector. 198 199 If the cache is empty, populate by calling update_cache. 200 """ 201 registry_url = _get_registry_url() 202 203 if _is_registry_disabled(registry_url): 204 return None 205 206 cache = copy(_get_registry_cache()) 207 208 if not cache: 209 raise exc.PyAirbyteInternalError( 210 message="Connector registry could not be loaded.", 211 context={ 212 "registry_url": _get_registry_url(), 213 }, 214 ) 215 if name not in cache: 216 raise exc.AirbyteConnectorNotRegisteredError( 217 connector_name=name, 218 context={ 219 "registry_url": _get_registry_url(), 220 "available_connectors": get_available_connectors(), 221 }, 222 ) 223 return cache[name]
Check the cache for the connector.
If the cache is empty, populate by calling update_cache.
226def get_available_connectors(install_type: InstallType | str | None = None) -> list[str]: 227 """Return a list of all available connectors. 228 229 Connectors will be returned in alphabetical order, with the standard prefix "source-". 230 """ 231 if install_type is None: 232 # No install type specified. Filter for whatever is runnable. 233 if is_docker_installed(): 234 logger.info("Docker is detected. Returning all connectors.") 235 # If Docker is available, return all connectors. 236 return sorted(conn.name for conn in _get_registry_cache().values()) 237 238 logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.") 239 240 # If Docker is not available, return only Python and Manifest-based connectors. 241 return sorted( 242 conn.name 243 for conn in _get_registry_cache().values() 244 if conn.language in {Language.PYTHON, Language.MANIFEST_ONLY} 245 ) 246 247 if not isinstance(install_type, InstallType): 248 install_type = InstallType(install_type) 249 250 if install_type == InstallType.PYTHON: 251 return sorted( 252 conn.name 253 for conn in _get_registry_cache().values() 254 if conn.pypi_package_name is not None 255 ) 256 257 if install_type == InstallType.JAVA: 258 warnings.warn( 259 message="Java connectors are not yet supported.", 260 stacklevel=2, 261 ) 262 return sorted( 263 conn.name for conn in _get_registry_cache().values() if conn.language == Language.JAVA 264 ) 265 266 if install_type == InstallType.DOCKER: 267 return sorted(conn.name for conn in _get_registry_cache().values()) 268 269 if install_type == InstallType.YAML: 270 return sorted( 271 conn.name 272 for conn in _get_registry_cache().values() 273 if InstallType.YAML in conn.install_types 274 ) 275 276 # pragma: no cover # Should never be reached. 277 raise exc.PyAirbyteInputError( 278 message="Invalid install type.", 279 context={ 280 "install_type": install_type, 281 }, 282 )
Return a list of all available connectors.
Connectors will be returned in alphabetical order, with the standard prefix "source-".