airbyte.sources.registry

Connectivity to the connector catalog registry.

  1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  2"""Connectivity to the connector catalog registry."""
  3
  4from __future__ import annotations
  5
  6import json
  7import logging
  8import os
  9import warnings
 10from copy import copy
 11from enum import Enum
 12from pathlib import Path
 13
 14import requests
 15from pydantic import BaseModel
 16
 17from airbyte import exceptions as exc
 18from airbyte._util.meta import is_docker_installed
 19from airbyte.constants import AIRBYTE_OFFLINE_MODE
 20from airbyte.logs import warn_once
 21from airbyte.version import get_version
 22
 23
 24logger = logging.getLogger("airbyte")
 25
 26
 27__cache: dict[str, ConnectorMetadata] | None = None
 28
 29
 30_REGISTRY_ENV_VAR = "AIRBYTE_LOCAL_REGISTRY"
 31_REGISTRY_URL = "https://connectors.airbyte.com/files/registries/v0/oss_registry.json"
 32
 33_PYTHON_LANGUAGE = "python"
 34_MANIFEST_ONLY_LANGUAGE = "manifest-only"
 35
 36_PYTHON_LANGUAGE_TAG = f"language:{_PYTHON_LANGUAGE}"
 37_MANIFEST_ONLY_TAG = f"language:{_MANIFEST_ONLY_LANGUAGE}"
 38
 39
 40class InstallType(str, Enum):
 41    """The type of installation for a connector."""
 42
 43    YAML = "yaml"
 44    PYTHON = "python"
 45    DOCKER = "docker"
 46    JAVA = "java"
 47
 48
 49class Language(str, Enum):
 50    """The language of a connector."""
 51
 52    PYTHON = InstallType.PYTHON.value
 53    JAVA = InstallType.JAVA.value
 54    MANIFEST_ONLY = _MANIFEST_ONLY_LANGUAGE
 55
 56
 57class ConnectorMetadata(BaseModel):
 58    """Metadata for a connector."""
 59
 60    name: str
 61    """Connector name. For example, "source-google-sheets"."""
 62
 63    latest_available_version: str | None
 64    """The latest available version of the connector."""
 65
 66    pypi_package_name: str | None
 67    """The name of the PyPI package for the connector, if it exists."""
 68
 69    language: Language | None
 70    """The language of the connector."""
 71
 72    install_types: set[InstallType]
 73    """The supported install types for the connector."""
 74
 75    suggested_streams: list[str] | None = None
 76    """A list of suggested streams for the connector, if available."""
 77
 78    @property
 79    def default_install_type(self) -> InstallType:
 80        """Return the default install type for the connector."""
 81        if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types:
 82            return InstallType.YAML
 83
 84        if InstallType.PYTHON in self.install_types:
 85            return InstallType.PYTHON
 86
 87        # Else: Java or Docker
 88        return InstallType.DOCKER
 89
 90
 91def _get_registry_url() -> str:
 92    if _REGISTRY_ENV_VAR in os.environ:
 93        return str(os.environ.get(_REGISTRY_ENV_VAR))
 94
 95    return _REGISTRY_URL
 96
 97
 98def _is_registry_disabled(url: str) -> bool:
 99    return url.upper() in {"0", "F", "FALSE"} or AIRBYTE_OFFLINE_MODE
100
101
102def _registry_entry_to_connector_metadata(entry: dict) -> ConnectorMetadata:
103    name = entry["dockerRepository"].replace("airbyte/", "")
104    latest_version: str | None = entry.get("dockerImageTag")
105    tags = entry.get("tags", [])
106    language: Language | None = None
107
108    if "language" in entry and entry["language"] is not None:
109        try:
110            language = Language(entry["language"])
111        except Exception:
112            warnings.warn(
113                message=f"Invalid language for connector {name}: {entry['language']}",
114                stacklevel=2,
115            )
116    if not language and _PYTHON_LANGUAGE_TAG in tags:
117        language = Language.PYTHON
118    if not language and _MANIFEST_ONLY_TAG in tags:
119        language = Language.MANIFEST_ONLY
120
121    remote_registries: dict = entry.get("remoteRegistries", {})
122    pypi_registry: dict = remote_registries.get("pypi", {})
123    pypi_package_name: str = pypi_registry.get("packageName", None)
124    pypi_enabled: bool = pypi_registry.get("enabled", False)
125    install_types: set[InstallType] = {
126        x
127        for x in [
128            InstallType.DOCKER,  # Always True
129            InstallType.PYTHON if language == Language.PYTHON and pypi_enabled else None,
130            InstallType.JAVA if language == Language.JAVA else None,
131            InstallType.YAML if language == Language.MANIFEST_ONLY else None,
132        ]
133        if x
134    }
135
136    return ConnectorMetadata(
137        name=name,
138        latest_available_version=latest_version,
139        pypi_package_name=pypi_package_name if pypi_enabled else None,
140        language=language,
141        install_types=install_types,
142        suggested_streams=entry.get("suggestedStreams", {}).get("streams", None),
143    )
144
145
146def _get_registry_cache(*, force_refresh: bool = False) -> dict[str, ConnectorMetadata]:
147    """Return the registry cache."""
148    global __cache
149    if __cache and not force_refresh:
150        return __cache
151
152    registry_url = _get_registry_url()
153
154    if _is_registry_disabled(registry_url):
155        return {}
156
157    if registry_url.startswith("http"):
158        response = requests.get(
159            registry_url,
160            headers={"User-Agent": f"PyAirbyte/{get_version()}"},
161        )
162        response.raise_for_status()
163        data = response.json()
164    else:
165        # Assume local file
166        with Path(registry_url).open(encoding="utf-8") as f:
167            data = json.load(f)
168
169    new_cache: dict[str, ConnectorMetadata] = {}
170
171    for connector in data["sources"]:
172        connector_metadata = _registry_entry_to_connector_metadata(connector)
173        new_cache[connector_metadata.name] = connector_metadata
174
175    for connector in data["destinations"]:
176        connector_metadata = _registry_entry_to_connector_metadata(connector)
177        new_cache[connector_metadata.name] = connector_metadata
178
179    if len(new_cache) == 0:
180        # This isn't necessarily fatal, since users can bring their own
181        # connector definitions.
182        warn_once(
183            message=f"Connector registry is empty: {registry_url}",
184            with_stack=False,
185        )
186
187    __cache = new_cache
188    return __cache
189
190
191def get_connector_metadata(name: str) -> ConnectorMetadata | None:
192    """Check the cache for the connector.
193
194    If the cache is empty, populate by calling update_cache.
195    """
196    registry_url = _get_registry_url()
197
198    if _is_registry_disabled(registry_url):
199        return None
200
201    cache = copy(_get_registry_cache())
202
203    if not cache:
204        raise exc.PyAirbyteInternalError(
205            message="Connector registry could not be loaded.",
206            context={
207                "registry_url": _get_registry_url(),
208            },
209        )
210    if name not in cache:
211        raise exc.AirbyteConnectorNotRegisteredError(
212            connector_name=name,
213            context={
214                "registry_url": _get_registry_url(),
215                "available_connectors": get_available_connectors(),
216            },
217        )
218    return cache[name]
219
220
221def get_available_connectors(install_type: InstallType | str | None = None) -> list[str]:
222    """Return a list of all available connectors.
223
224    Connectors will be returned in alphabetical order, with the standard prefix "source-".
225    """
226    if install_type is None:
227        # No install type specified. Filter for whatever is runnable.
228        if is_docker_installed():
229            logger.info("Docker is detected. Returning all connectors.")
230            # If Docker is available, return all connectors.
231            return sorted(conn.name for conn in _get_registry_cache().values())
232
233        logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.")
234
235        # If Docker is not available, return only Python and Manifest-based connectors.
236        return sorted(
237            conn.name
238            for conn in _get_registry_cache().values()
239            if conn.language in {Language.PYTHON, Language.MANIFEST_ONLY}
240        )
241
242    if not isinstance(install_type, InstallType):
243        install_type = InstallType(install_type)
244
245    if install_type == InstallType.PYTHON:
246        return sorted(
247            conn.name
248            for conn in _get_registry_cache().values()
249            if conn.pypi_package_name is not None
250        )
251
252    if install_type == InstallType.JAVA:
253        warnings.warn(
254            message="Java connectors are not yet supported.",
255            stacklevel=2,
256        )
257        return sorted(
258            conn.name for conn in _get_registry_cache().values() if conn.language == Language.JAVA
259        )
260
261    if install_type == InstallType.DOCKER:
262        return sorted(conn.name for conn in _get_registry_cache().values())
263
264    if install_type == InstallType.YAML:
265        return sorted(
266            conn.name
267            for conn in _get_registry_cache().values()
268            if InstallType.YAML in conn.install_types
269        )
270
271    # pragma: no cover  # Should never be reached.
272    raise exc.PyAirbyteInputError(
273        message="Invalid install type.",
274        context={
275            "install_type": install_type,
276        },
277    )
logger = <Logger airbyte (INFO)>
class InstallType(builtins.str, enum.Enum):
41class InstallType(str, Enum):
42    """The type of installation for a connector."""
43
44    YAML = "yaml"
45    PYTHON = "python"
46    DOCKER = "docker"
47    JAVA = "java"

The type of installation for a connector.

YAML = <InstallType.YAML: 'yaml'>
PYTHON = <InstallType.PYTHON: 'python'>
DOCKER = <InstallType.DOCKER: 'docker'>
JAVA = <InstallType.JAVA: 'java'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class Language(builtins.str, enum.Enum):
50class Language(str, Enum):
51    """The language of a connector."""
52
53    PYTHON = InstallType.PYTHON.value
54    JAVA = InstallType.JAVA.value
55    MANIFEST_ONLY = _MANIFEST_ONLY_LANGUAGE

The language of a connector.

PYTHON = <Language.PYTHON: 'python'>
JAVA = <Language.JAVA: 'java'>
MANIFEST_ONLY = <Language.MANIFEST_ONLY: 'manifest-only'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class ConnectorMetadata(pydantic.main.BaseModel):
58class ConnectorMetadata(BaseModel):
59    """Metadata for a connector."""
60
61    name: str
62    """Connector name. For example, "source-google-sheets"."""
63
64    latest_available_version: str | None
65    """The latest available version of the connector."""
66
67    pypi_package_name: str | None
68    """The name of the PyPI package for the connector, if it exists."""
69
70    language: Language | None
71    """The language of the connector."""
72
73    install_types: set[InstallType]
74    """The supported install types for the connector."""
75
76    suggested_streams: list[str] | None = None
77    """A list of suggested streams for the connector, if available."""
78
79    @property
80    def default_install_type(self) -> InstallType:
81        """Return the default install type for the connector."""
82        if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types:
83            return InstallType.YAML
84
85        if InstallType.PYTHON in self.install_types:
86            return InstallType.PYTHON
87
88        # Else: Java or Docker
89        return InstallType.DOCKER

Metadata for a connector.

name: str

Connector name. For example, "source-google-sheets".

latest_available_version: str | None

The latest available version of the connector.

pypi_package_name: str | None

The name of the PyPI package for the connector, if it exists.

language: Language | None

The language of the connector.

install_types: set[InstallType]

The supported install types for the connector.

suggested_streams: list[str] | None

A list of suggested streams for the connector, if available.

default_install_type: InstallType
79    @property
80    def default_install_type(self) -> InstallType:
81        """Return the default install type for the connector."""
82        if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types:
83            return InstallType.YAML
84
85        if InstallType.PYTHON in self.install_types:
86            return InstallType.PYTHON
87
88        # Else: Java or Docker
89        return InstallType.DOCKER

Return the default install type for the connector.

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
model_fields
model_computed_fields
def get_connector_metadata(name: str) -> ConnectorMetadata | None:
192def get_connector_metadata(name: str) -> ConnectorMetadata | None:
193    """Check the cache for the connector.
194
195    If the cache is empty, populate by calling update_cache.
196    """
197    registry_url = _get_registry_url()
198
199    if _is_registry_disabled(registry_url):
200        return None
201
202    cache = copy(_get_registry_cache())
203
204    if not cache:
205        raise exc.PyAirbyteInternalError(
206            message="Connector registry could not be loaded.",
207            context={
208                "registry_url": _get_registry_url(),
209            },
210        )
211    if name not in cache:
212        raise exc.AirbyteConnectorNotRegisteredError(
213            connector_name=name,
214            context={
215                "registry_url": _get_registry_url(),
216                "available_connectors": get_available_connectors(),
217            },
218        )
219    return cache[name]

Check the cache for the connector.

If the cache is empty, populate by calling update_cache.

def get_available_connectors( install_type: InstallType | str | None = None) -> list[str]:
222def get_available_connectors(install_type: InstallType | str | None = None) -> list[str]:
223    """Return a list of all available connectors.
224
225    Connectors will be returned in alphabetical order, with the standard prefix "source-".
226    """
227    if install_type is None:
228        # No install type specified. Filter for whatever is runnable.
229        if is_docker_installed():
230            logger.info("Docker is detected. Returning all connectors.")
231            # If Docker is available, return all connectors.
232            return sorted(conn.name for conn in _get_registry_cache().values())
233
234        logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.")
235
236        # If Docker is not available, return only Python and Manifest-based connectors.
237        return sorted(
238            conn.name
239            for conn in _get_registry_cache().values()
240            if conn.language in {Language.PYTHON, Language.MANIFEST_ONLY}
241        )
242
243    if not isinstance(install_type, InstallType):
244        install_type = InstallType(install_type)
245
246    if install_type == InstallType.PYTHON:
247        return sorted(
248            conn.name
249            for conn in _get_registry_cache().values()
250            if conn.pypi_package_name is not None
251        )
252
253    if install_type == InstallType.JAVA:
254        warnings.warn(
255            message="Java connectors are not yet supported.",
256            stacklevel=2,
257        )
258        return sorted(
259            conn.name for conn in _get_registry_cache().values() if conn.language == Language.JAVA
260        )
261
262    if install_type == InstallType.DOCKER:
263        return sorted(conn.name for conn in _get_registry_cache().values())
264
265    if install_type == InstallType.YAML:
266        return sorted(
267            conn.name
268            for conn in _get_registry_cache().values()
269            if InstallType.YAML in conn.install_types
270        )
271
272    # pragma: no cover  # Should never be reached.
273    raise exc.PyAirbyteInputError(
274        message="Invalid install type.",
275        context={
276            "install_type": install_type,
277        },
278    )

Return a list of all available connectors.

Connectors will be returned in alphabetical order, with the standard prefix "source-".