airbyte.sources.registry
Connectivity to the connector catalog registry.
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""Connectivity to the connector catalog registry.""" 3 4from __future__ import annotations 5 6import json 7import logging 8import os 9import warnings 10from copy import copy 11from enum import Enum 12from pathlib import Path 13 14import requests 15from pydantic import BaseModel 16 17from airbyte import exceptions as exc 18from airbyte._util.meta import is_docker_installed 19from airbyte.constants import AIRBYTE_OFFLINE_MODE 20from airbyte.logs import warn_once 21from airbyte.version import get_version 22 23 24logger = logging.getLogger("airbyte") 25 26 27__cache: dict[str, ConnectorMetadata] | None = None 28 29 30_REGISTRY_ENV_VAR = "AIRBYTE_LOCAL_REGISTRY" 31_REGISTRY_URL = "https://connectors.airbyte.com/files/registries/v0/oss_registry.json" 32 33_PYTHON_LANGUAGE = "python" 34_MANIFEST_ONLY_LANGUAGE = "manifest-only" 35 36_PYTHON_LANGUAGE_TAG = f"language:{_PYTHON_LANGUAGE}" 37_MANIFEST_ONLY_TAG = f"language:{_MANIFEST_ONLY_LANGUAGE}" 38 39_LOWCODE_CONNECTORS_FAILING_VALIDATION: list[str] = [] 40# Connectors that return 404 or some other misc exception. 41_LOWCODE_CONNECTORS_UNEXPECTED_ERRORS: list[str] = [ 42 "source-adjust", 43 "source-amazon-ads", 44 "source-marketo", 45] 46# (CDK) FileNotFoundError: Unable to find spec.yaml or spec.json in the package. 47_LOWCODE_CDK_FILE_NOT_FOUND_ERRORS: list[str] = [] 48_LOWCODE_CONNECTORS_EXCLUDED: list[str] = [ 49 *_LOWCODE_CONNECTORS_FAILING_VALIDATION, 50 *_LOWCODE_CONNECTORS_UNEXPECTED_ERRORS, 51 *_LOWCODE_CDK_FILE_NOT_FOUND_ERRORS, 52] 53 54 55class InstallType(str, Enum): 56 """The type of installation for a connector.""" 57 58 YAML = "yaml" 59 PYTHON = "python" 60 DOCKER = "docker" 61 JAVA = "java" 62 63 64class Language(str, Enum): 65 """The language of a connector.""" 66 67 PYTHON = InstallType.PYTHON.value 68 JAVA = InstallType.JAVA.value 69 MANIFEST_ONLY = _MANIFEST_ONLY_LANGUAGE 70 71 72class ConnectorMetadata(BaseModel): 73 """Metadata for a connector.""" 74 75 name: str 76 """Connector name. For example, "source-google-sheets".""" 77 78 latest_available_version: str | None 79 """The latest available version of the connector.""" 80 81 pypi_package_name: str | None 82 """The name of the PyPI package for the connector, if it exists.""" 83 84 language: Language | None 85 """The language of the connector.""" 86 87 install_types: set[InstallType] 88 """The supported install types for the connector.""" 89 90 suggested_streams: list[str] | None = None 91 """A list of suggested streams for the connector, if available.""" 92 93 @property 94 def default_install_type(self) -> InstallType: 95 """Return the default install type for the connector.""" 96 if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types: 97 return InstallType.YAML 98 99 if InstallType.PYTHON in self.install_types: 100 return InstallType.PYTHON 101 102 # Else: Java or Docker 103 return InstallType.DOCKER 104 105 106def _get_registry_url() -> str: 107 if _REGISTRY_ENV_VAR in os.environ: 108 return str(os.environ.get(_REGISTRY_ENV_VAR)) 109 110 return _REGISTRY_URL 111 112 113def _is_registry_disabled(url: str) -> bool: 114 return url.upper() in {"0", "F", "FALSE"} or AIRBYTE_OFFLINE_MODE 115 116 117def _registry_entry_to_connector_metadata(entry: dict) -> ConnectorMetadata: 118 name = entry["dockerRepository"].replace("airbyte/", "") 119 latest_version: str | None = entry.get("dockerImageTag") 120 tags = entry.get("tags", []) 121 language: Language | None = None 122 123 if "language" in entry and entry["language"] is not None: 124 try: 125 language = Language(entry["language"]) 126 except Exception: 127 warnings.warn( 128 message=f"Invalid language for connector {name}: {entry['language']}", 129 stacklevel=2, 130 ) 131 if not language and _PYTHON_LANGUAGE_TAG in tags: 132 language = Language.PYTHON 133 if not language and _MANIFEST_ONLY_TAG in tags: 134 language = Language.MANIFEST_ONLY 135 136 remote_registries: dict = entry.get("remoteRegistries", {}) 137 pypi_registry: dict = remote_registries.get("pypi", {}) 138 pypi_package_name: str = pypi_registry.get("packageName", None) 139 pypi_enabled: bool = pypi_registry.get("enabled", False) 140 install_types: set[InstallType] = { 141 x 142 for x in [ 143 InstallType.DOCKER, # Always True 144 InstallType.PYTHON if language == Language.PYTHON and pypi_enabled else None, 145 InstallType.JAVA if language == Language.JAVA else None, 146 InstallType.YAML if language == Language.MANIFEST_ONLY else None, 147 ] 148 if x 149 } 150 151 return ConnectorMetadata( 152 name=name, 153 latest_available_version=latest_version, 154 pypi_package_name=pypi_package_name if pypi_enabled else None, 155 language=language, 156 install_types=install_types, 157 suggested_streams=entry.get("suggestedStreams", {}).get("streams", None), 158 ) 159 160 161def _get_registry_cache(*, force_refresh: bool = False) -> dict[str, ConnectorMetadata]: 162 """Return the registry cache.""" 163 global __cache 164 if __cache and not force_refresh: 165 return __cache 166 167 registry_url = _get_registry_url() 168 169 if _is_registry_disabled(registry_url): 170 return {} 171 172 if registry_url.startswith("http"): 173 response = requests.get( 174 registry_url, 175 headers={"User-Agent": f"PyAirbyte/{get_version()}"}, 176 ) 177 response.raise_for_status() 178 data = response.json() 179 else: 180 # Assume local file 181 with Path(registry_url).open(encoding="utf-8") as f: 182 data = json.load(f) 183 184 new_cache: dict[str, ConnectorMetadata] = {} 185 186 for connector in data["sources"]: 187 connector_metadata = _registry_entry_to_connector_metadata(connector) 188 new_cache[connector_metadata.name] = connector_metadata 189 190 for connector in data["destinations"]: 191 connector_metadata = _registry_entry_to_connector_metadata(connector) 192 new_cache[connector_metadata.name] = connector_metadata 193 194 if len(new_cache) == 0: 195 # This isn't necessarily fatal, since users can bring their own 196 # connector definitions. 197 warn_once( 198 message=f"Connector registry is empty: {registry_url}", 199 with_stack=False, 200 ) 201 202 __cache = new_cache 203 return __cache 204 205 206def get_connector_metadata(name: str) -> ConnectorMetadata | None: 207 """Check the cache for the connector. 208 209 If the cache is empty, populate by calling update_cache. 210 """ 211 registry_url = _get_registry_url() 212 213 if _is_registry_disabled(registry_url): 214 return None 215 216 cache = copy(_get_registry_cache()) 217 218 if not cache: 219 raise exc.PyAirbyteInternalError( 220 message="Connector registry could not be loaded.", 221 context={ 222 "registry_url": _get_registry_url(), 223 }, 224 ) 225 if name not in cache: 226 raise exc.AirbyteConnectorNotRegisteredError( 227 connector_name=name, 228 context={ 229 "registry_url": _get_registry_url(), 230 "available_connectors": get_available_connectors(), 231 }, 232 ) 233 return cache[name] 234 235 236def get_available_connectors(install_type: InstallType | str | None = None) -> list[str]: 237 """Return a list of all available connectors. 238 239 Connectors will be returned in alphabetical order, with the standard prefix "source-". 240 """ 241 if install_type is None: 242 # No install type specified. Filter for whatever is runnable. 243 if is_docker_installed(): 244 logger.info("Docker is detected. Returning all connectors.") 245 # If Docker is available, return all connectors. 246 return sorted(conn.name for conn in _get_registry_cache().values()) 247 248 logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.") 249 250 # If Docker is not available, return only Python and Manifest-based connectors. 251 return sorted( 252 conn.name 253 for conn in _get_registry_cache().values() 254 if conn.language in {Language.PYTHON, Language.MANIFEST_ONLY} 255 ) 256 257 if not isinstance(install_type, InstallType): 258 install_type = InstallType(install_type) 259 260 if install_type == InstallType.PYTHON: 261 return sorted( 262 conn.name 263 for conn in _get_registry_cache().values() 264 if conn.pypi_package_name is not None 265 ) 266 267 if install_type == InstallType.JAVA: 268 warnings.warn( 269 message="Java connectors are not yet supported.", 270 stacklevel=2, 271 ) 272 return sorted( 273 conn.name for conn in _get_registry_cache().values() if conn.language == Language.JAVA 274 ) 275 276 if install_type == InstallType.DOCKER: 277 return sorted(conn.name for conn in _get_registry_cache().values()) 278 279 if install_type == InstallType.YAML: 280 return sorted( 281 conn.name 282 for conn in _get_registry_cache().values() 283 if InstallType.YAML in conn.install_types 284 and conn.name not in _LOWCODE_CONNECTORS_EXCLUDED 285 ) 286 287 # pragma: no cover # Should never be reached. 288 raise exc.PyAirbyteInputError( 289 message="Invalid install type.", 290 context={ 291 "install_type": install_type, 292 }, 293 )
logger =
<Logger airbyte (INFO)>
class
InstallType(builtins.str, enum.Enum):
56class InstallType(str, Enum): 57 """The type of installation for a connector.""" 58 59 YAML = "yaml" 60 PYTHON = "python" 61 DOCKER = "docker" 62 JAVA = "java"
The type of installation for a connector.
YAML =
<InstallType.YAML: 'yaml'>
PYTHON =
<InstallType.PYTHON: 'python'>
DOCKER =
<InstallType.DOCKER: 'docker'>
JAVA =
<InstallType.JAVA: 'java'>
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
class
Language(builtins.str, enum.Enum):
65class Language(str, Enum): 66 """The language of a connector.""" 67 68 PYTHON = InstallType.PYTHON.value 69 JAVA = InstallType.JAVA.value 70 MANIFEST_ONLY = _MANIFEST_ONLY_LANGUAGE
The language of a connector.
PYTHON =
<Language.PYTHON: 'python'>
JAVA =
<Language.JAVA: 'java'>
MANIFEST_ONLY =
<Language.MANIFEST_ONLY: 'manifest-only'>
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
class
ConnectorMetadata(pydantic.main.BaseModel):
73class ConnectorMetadata(BaseModel): 74 """Metadata for a connector.""" 75 76 name: str 77 """Connector name. For example, "source-google-sheets".""" 78 79 latest_available_version: str | None 80 """The latest available version of the connector.""" 81 82 pypi_package_name: str | None 83 """The name of the PyPI package for the connector, if it exists.""" 84 85 language: Language | None 86 """The language of the connector.""" 87 88 install_types: set[InstallType] 89 """The supported install types for the connector.""" 90 91 suggested_streams: list[str] | None = None 92 """A list of suggested streams for the connector, if available.""" 93 94 @property 95 def default_install_type(self) -> InstallType: 96 """Return the default install type for the connector.""" 97 if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types: 98 return InstallType.YAML 99 100 if InstallType.PYTHON in self.install_types: 101 return InstallType.PYTHON 102 103 # Else: Java or Docker 104 return InstallType.DOCKER
Metadata for a connector.
default_install_type: InstallType
94 @property 95 def default_install_type(self) -> InstallType: 96 """Return the default install type for the connector.""" 97 if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types: 98 return InstallType.YAML 99 100 if InstallType.PYTHON in self.install_types: 101 return InstallType.PYTHON 102 103 # Else: Java or Docker 104 return InstallType.DOCKER
Return the default install type for the connector.
model_config: ClassVar[pydantic.config.ConfigDict] =
{}
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- model_fields
- model_computed_fields
207def get_connector_metadata(name: str) -> ConnectorMetadata | None: 208 """Check the cache for the connector. 209 210 If the cache is empty, populate by calling update_cache. 211 """ 212 registry_url = _get_registry_url() 213 214 if _is_registry_disabled(registry_url): 215 return None 216 217 cache = copy(_get_registry_cache()) 218 219 if not cache: 220 raise exc.PyAirbyteInternalError( 221 message="Connector registry could not be loaded.", 222 context={ 223 "registry_url": _get_registry_url(), 224 }, 225 ) 226 if name not in cache: 227 raise exc.AirbyteConnectorNotRegisteredError( 228 connector_name=name, 229 context={ 230 "registry_url": _get_registry_url(), 231 "available_connectors": get_available_connectors(), 232 }, 233 ) 234 return cache[name]
Check the cache for the connector.
If the cache is empty, populate by calling update_cache.
237def get_available_connectors(install_type: InstallType | str | None = None) -> list[str]: 238 """Return a list of all available connectors. 239 240 Connectors will be returned in alphabetical order, with the standard prefix "source-". 241 """ 242 if install_type is None: 243 # No install type specified. Filter for whatever is runnable. 244 if is_docker_installed(): 245 logger.info("Docker is detected. Returning all connectors.") 246 # If Docker is available, return all connectors. 247 return sorted(conn.name for conn in _get_registry_cache().values()) 248 249 logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.") 250 251 # If Docker is not available, return only Python and Manifest-based connectors. 252 return sorted( 253 conn.name 254 for conn in _get_registry_cache().values() 255 if conn.language in {Language.PYTHON, Language.MANIFEST_ONLY} 256 ) 257 258 if not isinstance(install_type, InstallType): 259 install_type = InstallType(install_type) 260 261 if install_type == InstallType.PYTHON: 262 return sorted( 263 conn.name 264 for conn in _get_registry_cache().values() 265 if conn.pypi_package_name is not None 266 ) 267 268 if install_type == InstallType.JAVA: 269 warnings.warn( 270 message="Java connectors are not yet supported.", 271 stacklevel=2, 272 ) 273 return sorted( 274 conn.name for conn in _get_registry_cache().values() if conn.language == Language.JAVA 275 ) 276 277 if install_type == InstallType.DOCKER: 278 return sorted(conn.name for conn in _get_registry_cache().values()) 279 280 if install_type == InstallType.YAML: 281 return sorted( 282 conn.name 283 for conn in _get_registry_cache().values() 284 if InstallType.YAML in conn.install_types 285 and conn.name not in _LOWCODE_CONNECTORS_EXCLUDED 286 ) 287 288 # pragma: no cover # Should never be reached. 289 raise exc.PyAirbyteInputError( 290 message="Invalid install type.", 291 context={ 292 "install_type": install_type, 293 }, 294 )
Return a list of all available connectors.
Connectors will be returned in alphabetical order, with the standard prefix "source-".