airbyte.sources.registry

Connectivity to the connector catalog registry.

  1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  2"""Connectivity to the connector catalog registry."""
  3
  4from __future__ import annotations
  5
  6import json
  7import logging
  8import os
  9import warnings
 10from copy import copy
 11from enum import Enum
 12from pathlib import Path
 13
 14import requests
 15from pydantic import BaseModel
 16
 17from airbyte import exceptions as exc
 18from airbyte._util.meta import is_docker_installed
 19from airbyte.constants import AIRBYTE_OFFLINE_MODE
 20from airbyte.logs import warn_once
 21from airbyte.version import get_version
 22
 23
 24logger = logging.getLogger("airbyte")
 25
 26
 27__cache: dict[str, ConnectorMetadata] | None = None
 28
 29
 30_REGISTRY_ENV_VAR = "AIRBYTE_LOCAL_REGISTRY"
 31_REGISTRY_URL = "https://connectors.airbyte.com/files/registries/v0/oss_registry.json"
 32
 33_PYTHON_LANGUAGE = "python"
 34_MANIFEST_ONLY_LANGUAGE = "manifest-only"
 35
 36_PYTHON_LANGUAGE_TAG = f"language:{_PYTHON_LANGUAGE}"
 37_MANIFEST_ONLY_TAG = f"language:{_MANIFEST_ONLY_LANGUAGE}"
 38
 39_LOWCODE_CONNECTORS_FAILING_VALIDATION: list[str] = []
 40# Connectors that return 404 or some other misc exception.
 41_LOWCODE_CONNECTORS_UNEXPECTED_ERRORS: list[str] = [
 42    "source-adjust",
 43    "source-amazon-ads",
 44    "source-marketo",
 45]
 46# (CDK) FileNotFoundError: Unable to find spec.yaml or spec.json in the package.
 47_LOWCODE_CDK_FILE_NOT_FOUND_ERRORS: list[str] = []
 48_LOWCODE_CONNECTORS_EXCLUDED: list[str] = [
 49    *_LOWCODE_CONNECTORS_FAILING_VALIDATION,
 50    *_LOWCODE_CONNECTORS_UNEXPECTED_ERRORS,
 51    *_LOWCODE_CDK_FILE_NOT_FOUND_ERRORS,
 52]
 53
 54
 55class InstallType(str, Enum):
 56    """The type of installation for a connector."""
 57
 58    YAML = "yaml"
 59    PYTHON = "python"
 60    DOCKER = "docker"
 61    JAVA = "java"
 62
 63
 64class Language(str, Enum):
 65    """The language of a connector."""
 66
 67    PYTHON = InstallType.PYTHON.value
 68    JAVA = InstallType.JAVA.value
 69    MANIFEST_ONLY = _MANIFEST_ONLY_LANGUAGE
 70
 71
 72class ConnectorMetadata(BaseModel):
 73    """Metadata for a connector."""
 74
 75    name: str
 76    """Connector name. For example, "source-google-sheets"."""
 77
 78    latest_available_version: str | None
 79    """The latest available version of the connector."""
 80
 81    pypi_package_name: str | None
 82    """The name of the PyPI package for the connector, if it exists."""
 83
 84    language: Language | None
 85    """The language of the connector."""
 86
 87    install_types: set[InstallType]
 88    """The supported install types for the connector."""
 89
 90    suggested_streams: list[str] | None = None
 91    """A list of suggested streams for the connector, if available."""
 92
 93    @property
 94    def default_install_type(self) -> InstallType:
 95        """Return the default install type for the connector."""
 96        if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types:
 97            return InstallType.YAML
 98
 99        if InstallType.PYTHON in self.install_types:
100            return InstallType.PYTHON
101
102        # Else: Java or Docker
103        return InstallType.DOCKER
104
105
106def _get_registry_url() -> str:
107    if _REGISTRY_ENV_VAR in os.environ:
108        return str(os.environ.get(_REGISTRY_ENV_VAR))
109
110    return _REGISTRY_URL
111
112
113def _is_registry_disabled(url: str) -> bool:
114    return url.upper() in {"0", "F", "FALSE"} or AIRBYTE_OFFLINE_MODE
115
116
117def _registry_entry_to_connector_metadata(entry: dict) -> ConnectorMetadata:
118    name = entry["dockerRepository"].replace("airbyte/", "")
119    latest_version: str | None = entry.get("dockerImageTag")
120    tags = entry.get("tags", [])
121    language: Language | None = None
122
123    if "language" in entry and entry["language"] is not None:
124        try:
125            language = Language(entry["language"])
126        except Exception:
127            warnings.warn(
128                message=f"Invalid language for connector {name}: {entry['language']}",
129                stacklevel=2,
130            )
131    if not language and _PYTHON_LANGUAGE_TAG in tags:
132        language = Language.PYTHON
133    if not language and _MANIFEST_ONLY_TAG in tags:
134        language = Language.MANIFEST_ONLY
135
136    remote_registries: dict = entry.get("remoteRegistries", {})
137    pypi_registry: dict = remote_registries.get("pypi", {})
138    pypi_package_name: str = pypi_registry.get("packageName", None)
139    pypi_enabled: bool = pypi_registry.get("enabled", False)
140    install_types: set[InstallType] = {
141        x
142        for x in [
143            InstallType.DOCKER,  # Always True
144            InstallType.PYTHON if language == Language.PYTHON and pypi_enabled else None,
145            InstallType.JAVA if language == Language.JAVA else None,
146            InstallType.YAML if language == Language.MANIFEST_ONLY else None,
147        ]
148        if x
149    }
150
151    return ConnectorMetadata(
152        name=name,
153        latest_available_version=latest_version,
154        pypi_package_name=pypi_package_name if pypi_enabled else None,
155        language=language,
156        install_types=install_types,
157        suggested_streams=entry.get("suggestedStreams", {}).get("streams", None),
158    )
159
160
161def _get_registry_cache(*, force_refresh: bool = False) -> dict[str, ConnectorMetadata]:
162    """Return the registry cache."""
163    global __cache
164    if __cache and not force_refresh:
165        return __cache
166
167    registry_url = _get_registry_url()
168
169    if _is_registry_disabled(registry_url):
170        return {}
171
172    if registry_url.startswith("http"):
173        response = requests.get(
174            registry_url,
175            headers={"User-Agent": f"PyAirbyte/{get_version()}"},
176        )
177        response.raise_for_status()
178        data = response.json()
179    else:
180        # Assume local file
181        with Path(registry_url).open(encoding="utf-8") as f:
182            data = json.load(f)
183
184    new_cache: dict[str, ConnectorMetadata] = {}
185
186    for connector in data["sources"]:
187        connector_metadata = _registry_entry_to_connector_metadata(connector)
188        new_cache[connector_metadata.name] = connector_metadata
189
190    for connector in data["destinations"]:
191        connector_metadata = _registry_entry_to_connector_metadata(connector)
192        new_cache[connector_metadata.name] = connector_metadata
193
194    if len(new_cache) == 0:
195        # This isn't necessarily fatal, since users can bring their own
196        # connector definitions.
197        warn_once(
198            message=f"Connector registry is empty: {registry_url}",
199            with_stack=False,
200        )
201
202    __cache = new_cache
203    return __cache
204
205
206def get_connector_metadata(name: str) -> ConnectorMetadata | None:
207    """Check the cache for the connector.
208
209    If the cache is empty, populate by calling update_cache.
210    """
211    registry_url = _get_registry_url()
212
213    if _is_registry_disabled(registry_url):
214        return None
215
216    cache = copy(_get_registry_cache())
217
218    if not cache:
219        raise exc.PyAirbyteInternalError(
220            message="Connector registry could not be loaded.",
221            context={
222                "registry_url": _get_registry_url(),
223            },
224        )
225    if name not in cache:
226        raise exc.AirbyteConnectorNotRegisteredError(
227            connector_name=name,
228            context={
229                "registry_url": _get_registry_url(),
230                "available_connectors": get_available_connectors(),
231            },
232        )
233    return cache[name]
234
235
236def get_available_connectors(install_type: InstallType | str | None = None) -> list[str]:
237    """Return a list of all available connectors.
238
239    Connectors will be returned in alphabetical order, with the standard prefix "source-".
240    """
241    if install_type is None:
242        # No install type specified. Filter for whatever is runnable.
243        if is_docker_installed():
244            logger.info("Docker is detected. Returning all connectors.")
245            # If Docker is available, return all connectors.
246            return sorted(conn.name for conn in _get_registry_cache().values())
247
248        logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.")
249
250        # If Docker is not available, return only Python and Manifest-based connectors.
251        return sorted(
252            conn.name
253            for conn in _get_registry_cache().values()
254            if conn.language in {Language.PYTHON, Language.MANIFEST_ONLY}
255        )
256
257    if not isinstance(install_type, InstallType):
258        install_type = InstallType(install_type)
259
260    if install_type == InstallType.PYTHON:
261        return sorted(
262            conn.name
263            for conn in _get_registry_cache().values()
264            if conn.pypi_package_name is not None
265        )
266
267    if install_type == InstallType.JAVA:
268        warnings.warn(
269            message="Java connectors are not yet supported.",
270            stacklevel=2,
271        )
272        return sorted(
273            conn.name for conn in _get_registry_cache().values() if conn.language == Language.JAVA
274        )
275
276    if install_type == InstallType.DOCKER:
277        return sorted(conn.name for conn in _get_registry_cache().values())
278
279    if install_type == InstallType.YAML:
280        return sorted(
281            conn.name
282            for conn in _get_registry_cache().values()
283            if InstallType.YAML in conn.install_types
284            and conn.name not in _LOWCODE_CONNECTORS_EXCLUDED
285        )
286
287    # pragma: no cover  # Should never be reached.
288    raise exc.PyAirbyteInputError(
289        message="Invalid install type.",
290        context={
291            "install_type": install_type,
292        },
293    )
logger = <Logger airbyte (INFO)>
class InstallType(builtins.str, enum.Enum):
56class InstallType(str, Enum):
57    """The type of installation for a connector."""
58
59    YAML = "yaml"
60    PYTHON = "python"
61    DOCKER = "docker"
62    JAVA = "java"

The type of installation for a connector.

YAML = <InstallType.YAML: 'yaml'>
PYTHON = <InstallType.PYTHON: 'python'>
DOCKER = <InstallType.DOCKER: 'docker'>
JAVA = <InstallType.JAVA: 'java'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class Language(builtins.str, enum.Enum):
65class Language(str, Enum):
66    """The language of a connector."""
67
68    PYTHON = InstallType.PYTHON.value
69    JAVA = InstallType.JAVA.value
70    MANIFEST_ONLY = _MANIFEST_ONLY_LANGUAGE

The language of a connector.

PYTHON = <Language.PYTHON: 'python'>
JAVA = <Language.JAVA: 'java'>
MANIFEST_ONLY = <Language.MANIFEST_ONLY: 'manifest-only'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class ConnectorMetadata(pydantic.main.BaseModel):
 73class ConnectorMetadata(BaseModel):
 74    """Metadata for a connector."""
 75
 76    name: str
 77    """Connector name. For example, "source-google-sheets"."""
 78
 79    latest_available_version: str | None
 80    """The latest available version of the connector."""
 81
 82    pypi_package_name: str | None
 83    """The name of the PyPI package for the connector, if it exists."""
 84
 85    language: Language | None
 86    """The language of the connector."""
 87
 88    install_types: set[InstallType]
 89    """The supported install types for the connector."""
 90
 91    suggested_streams: list[str] | None = None
 92    """A list of suggested streams for the connector, if available."""
 93
 94    @property
 95    def default_install_type(self) -> InstallType:
 96        """Return the default install type for the connector."""
 97        if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types:
 98            return InstallType.YAML
 99
100        if InstallType.PYTHON in self.install_types:
101            return InstallType.PYTHON
102
103        # Else: Java or Docker
104        return InstallType.DOCKER

Metadata for a connector.

name: str

Connector name. For example, "source-google-sheets".

latest_available_version: str | None

The latest available version of the connector.

pypi_package_name: str | None

The name of the PyPI package for the connector, if it exists.

language: Language | None

The language of the connector.

install_types: set[InstallType]

The supported install types for the connector.

suggested_streams: list[str] | None

A list of suggested streams for the connector, if available.

default_install_type: InstallType
 94    @property
 95    def default_install_type(self) -> InstallType:
 96        """Return the default install type for the connector."""
 97        if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types:
 98            return InstallType.YAML
 99
100        if InstallType.PYTHON in self.install_types:
101            return InstallType.PYTHON
102
103        # Else: Java or Docker
104        return InstallType.DOCKER

Return the default install type for the connector.

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
model_fields
model_computed_fields
def get_connector_metadata(name: str) -> ConnectorMetadata | None:
207def get_connector_metadata(name: str) -> ConnectorMetadata | None:
208    """Check the cache for the connector.
209
210    If the cache is empty, populate by calling update_cache.
211    """
212    registry_url = _get_registry_url()
213
214    if _is_registry_disabled(registry_url):
215        return None
216
217    cache = copy(_get_registry_cache())
218
219    if not cache:
220        raise exc.PyAirbyteInternalError(
221            message="Connector registry could not be loaded.",
222            context={
223                "registry_url": _get_registry_url(),
224            },
225        )
226    if name not in cache:
227        raise exc.AirbyteConnectorNotRegisteredError(
228            connector_name=name,
229            context={
230                "registry_url": _get_registry_url(),
231                "available_connectors": get_available_connectors(),
232            },
233        )
234    return cache[name]

Check the cache for the connector.

If the cache is empty, populate by calling update_cache.

def get_available_connectors( install_type: InstallType | str | None = None) -> list[str]:
237def get_available_connectors(install_type: InstallType | str | None = None) -> list[str]:
238    """Return a list of all available connectors.
239
240    Connectors will be returned in alphabetical order, with the standard prefix "source-".
241    """
242    if install_type is None:
243        # No install type specified. Filter for whatever is runnable.
244        if is_docker_installed():
245            logger.info("Docker is detected. Returning all connectors.")
246            # If Docker is available, return all connectors.
247            return sorted(conn.name for conn in _get_registry_cache().values())
248
249        logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.")
250
251        # If Docker is not available, return only Python and Manifest-based connectors.
252        return sorted(
253            conn.name
254            for conn in _get_registry_cache().values()
255            if conn.language in {Language.PYTHON, Language.MANIFEST_ONLY}
256        )
257
258    if not isinstance(install_type, InstallType):
259        install_type = InstallType(install_type)
260
261    if install_type == InstallType.PYTHON:
262        return sorted(
263            conn.name
264            for conn in _get_registry_cache().values()
265            if conn.pypi_package_name is not None
266        )
267
268    if install_type == InstallType.JAVA:
269        warnings.warn(
270            message="Java connectors are not yet supported.",
271            stacklevel=2,
272        )
273        return sorted(
274            conn.name for conn in _get_registry_cache().values() if conn.language == Language.JAVA
275        )
276
277    if install_type == InstallType.DOCKER:
278        return sorted(conn.name for conn in _get_registry_cache().values())
279
280    if install_type == InstallType.YAML:
281        return sorted(
282            conn.name
283            for conn in _get_registry_cache().values()
284            if InstallType.YAML in conn.install_types
285            and conn.name not in _LOWCODE_CONNECTORS_EXCLUDED
286        )
287
288    # pragma: no cover  # Should never be reached.
289    raise exc.PyAirbyteInputError(
290        message="Invalid install type.",
291        context={
292            "install_type": install_type,
293        },
294    )

Return a list of all available connectors.

Connectors will be returned in alphabetical order, with the standard prefix "source-".