airbyte.sources.registry

Connectivity to the connector catalog registry.

  1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  2"""Connectivity to the connector catalog registry."""
  3
  4from __future__ import annotations
  5
  6import json
  7import logging
  8import os
  9import warnings
 10from copy import copy
 11from dataclasses import dataclass
 12from enum import Enum
 13from pathlib import Path
 14
 15import requests
 16
 17from airbyte import exceptions as exc
 18from airbyte._util.meta import is_docker_installed
 19from airbyte.constants import AIRBYTE_OFFLINE_MODE
 20from airbyte.logs import warn_once
 21from airbyte.version import get_version
 22
 23
 24logger = logging.getLogger("airbyte")
 25
 26
 27__cache: dict[str, ConnectorMetadata] | None = None
 28
 29
 30_REGISTRY_ENV_VAR = "AIRBYTE_LOCAL_REGISTRY"
 31_REGISTRY_URL = "https://connectors.airbyte.com/files/registries/v0/oss_registry.json"
 32
 33_PYTHON_LANGUAGE = "python"
 34_MANIFEST_ONLY_LANGUAGE = "manifest-only"
 35
 36_PYTHON_LANGUAGE_TAG = f"language:{_PYTHON_LANGUAGE}"
 37_MANIFEST_ONLY_TAG = f"language:{_MANIFEST_ONLY_LANGUAGE}"
 38
 39_LOWCODE_CONNECTORS_FAILING_VALIDATION: list[str] = []
 40# Connectors that return 404 or some other misc exception.
 41_LOWCODE_CONNECTORS_UNEXPECTED_ERRORS: list[str] = [
 42    "source-adjust",
 43    "source-amazon-ads",
 44    "source-marketo",
 45]
 46# (CDK) FileNotFoundError: Unable to find spec.yaml or spec.json in the package.
 47_LOWCODE_CDK_FILE_NOT_FOUND_ERRORS: list[str] = []
 48_LOWCODE_CONNECTORS_EXCLUDED: list[str] = [
 49    *_LOWCODE_CONNECTORS_FAILING_VALIDATION,
 50    *_LOWCODE_CONNECTORS_UNEXPECTED_ERRORS,
 51    *_LOWCODE_CDK_FILE_NOT_FOUND_ERRORS,
 52]
 53
 54
 55class InstallType(str, Enum):
 56    """The type of installation for a connector."""
 57
 58    YAML = "yaml"
 59    PYTHON = "python"
 60    DOCKER = "docker"
 61    JAVA = "java"
 62
 63
 64class Language(str, Enum):
 65    """The language of a connector."""
 66
 67    PYTHON = InstallType.PYTHON.value
 68    JAVA = InstallType.JAVA.value
 69    MANIFEST_ONLY = _MANIFEST_ONLY_LANGUAGE
 70
 71
 72@dataclass
 73class ConnectorMetadata:
 74    """Metadata for a connector."""
 75
 76    name: str
 77    """Connector name. For example, "source-google-sheets"."""
 78
 79    latest_available_version: str | None
 80    """The latest available version of the connector."""
 81
 82    pypi_package_name: str | None
 83    """The name of the PyPI package for the connector, if it exists."""
 84
 85    language: Language | None
 86    """The language of the connector."""
 87
 88    install_types: set[InstallType]
 89    """The supported install types for the connector."""
 90
 91    @property
 92    def default_install_type(self) -> InstallType:
 93        """Return the default install type for the connector."""
 94        if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types:
 95            return InstallType.YAML
 96
 97        if InstallType.PYTHON in self.install_types:
 98            return InstallType.PYTHON
 99
100        # Else: Java or Docker
101        return InstallType.DOCKER
102
103
104def _get_registry_url() -> str:
105    if _REGISTRY_ENV_VAR in os.environ:
106        return str(os.environ.get(_REGISTRY_ENV_VAR))
107
108    return _REGISTRY_URL
109
110
111def _is_registry_disabled(url: str) -> bool:
112    return url.upper() in {"0", "F", "FALSE"} or AIRBYTE_OFFLINE_MODE
113
114
115def _registry_entry_to_connector_metadata(entry: dict) -> ConnectorMetadata:
116    name = entry["dockerRepository"].replace("airbyte/", "")
117    latest_version: str | None = entry.get("dockerImageTag")
118    tags = entry.get("tags", [])
119    language: Language | None = None
120
121    if "language" in entry and entry["language"] is not None:
122        try:
123            language = Language(entry["language"])
124        except Exception:
125            warnings.warn(
126                message=f"Invalid language for connector {name}: {entry['language']}",
127                stacklevel=2,
128            )
129    if not language and _PYTHON_LANGUAGE_TAG in tags:
130        language = Language.PYTHON
131    if not language and _MANIFEST_ONLY_TAG in tags:
132        language = Language.MANIFEST_ONLY
133
134    remote_registries: dict = entry.get("remoteRegistries", {})
135    pypi_registry: dict = remote_registries.get("pypi", {})
136    pypi_package_name: str = pypi_registry.get("packageName", None)
137    pypi_enabled: bool = pypi_registry.get("enabled", False)
138    install_types: set[InstallType] = {
139        x
140        for x in [
141            InstallType.DOCKER,  # Always True
142            InstallType.PYTHON if language == Language.PYTHON and pypi_enabled else None,
143            InstallType.JAVA if language == Language.JAVA else None,
144            InstallType.YAML if language == Language.MANIFEST_ONLY else None,
145        ]
146        if x
147    }
148
149    return ConnectorMetadata(
150        name=name,
151        latest_available_version=latest_version,
152        pypi_package_name=pypi_package_name if pypi_enabled else None,
153        language=language,
154        install_types=install_types,
155    )
156
157
158def _get_registry_cache(*, force_refresh: bool = False) -> dict[str, ConnectorMetadata]:
159    """Return the registry cache."""
160    global __cache
161    if __cache and not force_refresh:
162        return __cache
163
164    registry_url = _get_registry_url()
165
166    if _is_registry_disabled(registry_url):
167        return {}
168
169    if registry_url.startswith("http"):
170        response = requests.get(
171            registry_url,
172            headers={"User-Agent": f"PyAirbyte/{get_version()}"},
173        )
174        response.raise_for_status()
175        data = response.json()
176    else:
177        # Assume local file
178        with Path(registry_url).open(encoding="utf-8") as f:
179            data = json.load(f)
180
181    new_cache: dict[str, ConnectorMetadata] = {}
182
183    for connector in data["sources"]:
184        connector_metadata = _registry_entry_to_connector_metadata(connector)
185        new_cache[connector_metadata.name] = connector_metadata
186
187    for connector in data["destinations"]:
188        connector_metadata = _registry_entry_to_connector_metadata(connector)
189        new_cache[connector_metadata.name] = connector_metadata
190
191    if len(new_cache) == 0:
192        # This isn't necessarily fatal, since users can bring their own
193        # connector definitions.
194        warn_once(
195            message=f"Connector registry is empty: {registry_url}",
196            with_stack=False,
197        )
198
199    __cache = new_cache
200    return __cache
201
202
203def get_connector_metadata(name: str) -> ConnectorMetadata | None:
204    """Check the cache for the connector.
205
206    If the cache is empty, populate by calling update_cache.
207    """
208    registry_url = _get_registry_url()
209
210    if _is_registry_disabled(registry_url):
211        return None
212
213    cache = copy(_get_registry_cache())
214
215    if not cache:
216        raise exc.PyAirbyteInternalError(
217            message="Connector registry could not be loaded.",
218            context={
219                "registry_url": _get_registry_url(),
220            },
221        )
222    if name not in cache:
223        raise exc.AirbyteConnectorNotRegisteredError(
224            connector_name=name,
225            context={
226                "registry_url": _get_registry_url(),
227                "available_connectors": get_available_connectors(),
228            },
229        )
230    return cache[name]
231
232
233def get_available_connectors(install_type: InstallType | str | None = None) -> list[str]:
234    """Return a list of all available connectors.
235
236    Connectors will be returned in alphabetical order, with the standard prefix "source-".
237    """
238    if install_type is None:
239        # No install type specified. Filter for whatever is runnable.
240        if is_docker_installed():
241            logger.info("Docker is detected. Returning all connectors.")
242            # If Docker is available, return all connectors.
243            return sorted(conn.name for conn in _get_registry_cache().values())
244
245        logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.")
246
247        # If Docker is not available, return only Python and Manifest-based connectors.
248        return sorted(
249            conn.name
250            for conn in _get_registry_cache().values()
251            if conn.language in {Language.PYTHON, Language.MANIFEST_ONLY}
252        )
253
254    if not isinstance(install_type, InstallType):
255        install_type = InstallType(install_type)
256
257    if install_type == InstallType.PYTHON:
258        return sorted(
259            conn.name
260            for conn in _get_registry_cache().values()
261            if conn.pypi_package_name is not None
262        )
263
264    if install_type == InstallType.JAVA:
265        warnings.warn(
266            message="Java connectors are not yet supported.",
267            stacklevel=2,
268        )
269        return sorted(
270            conn.name for conn in _get_registry_cache().values() if conn.language == Language.JAVA
271        )
272
273    if install_type == InstallType.DOCKER:
274        return sorted(conn.name for conn in _get_registry_cache().values())
275
276    if install_type == InstallType.YAML:
277        return sorted(
278            conn.name
279            for conn in _get_registry_cache().values()
280            if InstallType.YAML in conn.install_types
281            and conn.name not in _LOWCODE_CONNECTORS_EXCLUDED
282        )
283
284    # pragma: no cover  # Should never be reached.
285    raise exc.PyAirbyteInputError(
286        message="Invalid install type.",
287        context={
288            "install_type": install_type,
289        },
290    )
logger = <Logger airbyte (INFO)>
class InstallType(builtins.str, enum.Enum):
56class InstallType(str, Enum):
57    """The type of installation for a connector."""
58
59    YAML = "yaml"
60    PYTHON = "python"
61    DOCKER = "docker"
62    JAVA = "java"

The type of installation for a connector.

YAML = <InstallType.YAML: 'yaml'>
PYTHON = <InstallType.PYTHON: 'python'>
DOCKER = <InstallType.DOCKER: 'docker'>
JAVA = <InstallType.JAVA: 'java'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class Language(builtins.str, enum.Enum):
65class Language(str, Enum):
66    """The language of a connector."""
67
68    PYTHON = InstallType.PYTHON.value
69    JAVA = InstallType.JAVA.value
70    MANIFEST_ONLY = _MANIFEST_ONLY_LANGUAGE

The language of a connector.

PYTHON = <Language.PYTHON: 'python'>
JAVA = <Language.JAVA: 'java'>
MANIFEST_ONLY = <Language.MANIFEST_ONLY: 'manifest-only'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
@dataclass
class ConnectorMetadata:
 73@dataclass
 74class ConnectorMetadata:
 75    """Metadata for a connector."""
 76
 77    name: str
 78    """Connector name. For example, "source-google-sheets"."""
 79
 80    latest_available_version: str | None
 81    """The latest available version of the connector."""
 82
 83    pypi_package_name: str | None
 84    """The name of the PyPI package for the connector, if it exists."""
 85
 86    language: Language | None
 87    """The language of the connector."""
 88
 89    install_types: set[InstallType]
 90    """The supported install types for the connector."""
 91
 92    @property
 93    def default_install_type(self) -> InstallType:
 94        """Return the default install type for the connector."""
 95        if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types:
 96            return InstallType.YAML
 97
 98        if InstallType.PYTHON in self.install_types:
 99            return InstallType.PYTHON
100
101        # Else: Java or Docker
102        return InstallType.DOCKER

Metadata for a connector.

ConnectorMetadata( name: str, latest_available_version: str | None, pypi_package_name: str | None, language: Language | None, install_types: set[InstallType])
name: str

Connector name. For example, "source-google-sheets".

latest_available_version: str | None

The latest available version of the connector.

pypi_package_name: str | None

The name of the PyPI package for the connector, if it exists.

language: Language | None

The language of the connector.

install_types: set[InstallType]

The supported install types for the connector.

default_install_type: InstallType
 92    @property
 93    def default_install_type(self) -> InstallType:
 94        """Return the default install type for the connector."""
 95        if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types:
 96            return InstallType.YAML
 97
 98        if InstallType.PYTHON in self.install_types:
 99            return InstallType.PYTHON
100
101        # Else: Java or Docker
102        return InstallType.DOCKER

Return the default install type for the connector.

def get_connector_metadata(name: str) -> ConnectorMetadata | None:
204def get_connector_metadata(name: str) -> ConnectorMetadata | None:
205    """Check the cache for the connector.
206
207    If the cache is empty, populate by calling update_cache.
208    """
209    registry_url = _get_registry_url()
210
211    if _is_registry_disabled(registry_url):
212        return None
213
214    cache = copy(_get_registry_cache())
215
216    if not cache:
217        raise exc.PyAirbyteInternalError(
218            message="Connector registry could not be loaded.",
219            context={
220                "registry_url": _get_registry_url(),
221            },
222        )
223    if name not in cache:
224        raise exc.AirbyteConnectorNotRegisteredError(
225            connector_name=name,
226            context={
227                "registry_url": _get_registry_url(),
228                "available_connectors": get_available_connectors(),
229            },
230        )
231    return cache[name]

Check the cache for the connector.

If the cache is empty, populate by calling update_cache.

def get_available_connectors( install_type: InstallType | str | None = None) -> list[str]:
234def get_available_connectors(install_type: InstallType | str | None = None) -> list[str]:
235    """Return a list of all available connectors.
236
237    Connectors will be returned in alphabetical order, with the standard prefix "source-".
238    """
239    if install_type is None:
240        # No install type specified. Filter for whatever is runnable.
241        if is_docker_installed():
242            logger.info("Docker is detected. Returning all connectors.")
243            # If Docker is available, return all connectors.
244            return sorted(conn.name for conn in _get_registry_cache().values())
245
246        logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.")
247
248        # If Docker is not available, return only Python and Manifest-based connectors.
249        return sorted(
250            conn.name
251            for conn in _get_registry_cache().values()
252            if conn.language in {Language.PYTHON, Language.MANIFEST_ONLY}
253        )
254
255    if not isinstance(install_type, InstallType):
256        install_type = InstallType(install_type)
257
258    if install_type == InstallType.PYTHON:
259        return sorted(
260            conn.name
261            for conn in _get_registry_cache().values()
262            if conn.pypi_package_name is not None
263        )
264
265    if install_type == InstallType.JAVA:
266        warnings.warn(
267            message="Java connectors are not yet supported.",
268            stacklevel=2,
269        )
270        return sorted(
271            conn.name for conn in _get_registry_cache().values() if conn.language == Language.JAVA
272        )
273
274    if install_type == InstallType.DOCKER:
275        return sorted(conn.name for conn in _get_registry_cache().values())
276
277    if install_type == InstallType.YAML:
278        return sorted(
279            conn.name
280            for conn in _get_registry_cache().values()
281            if InstallType.YAML in conn.install_types
282            and conn.name not in _LOWCODE_CONNECTORS_EXCLUDED
283        )
284
285    # pragma: no cover  # Should never be reached.
286    raise exc.PyAirbyteInputError(
287        message="Invalid install type.",
288        context={
289            "install_type": install_type,
290        },
291    )

Return a list of all available connectors.

Connectors will be returned in alphabetical order, with the standard prefix "source-".