airbyte.sources.registry

Connectivity to the connector catalog registry.

  1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  2"""Connectivity to the connector catalog registry."""
  3
  4from __future__ import annotations
  5
  6import json
  7import logging
  8import os
  9import warnings
 10from copy import copy
 11from enum import Enum
 12from pathlib import Path
 13from typing import cast
 14
 15import requests
 16from pydantic import BaseModel
 17
 18from airbyte import exceptions as exc
 19from airbyte._util.meta import is_docker_installed
 20from airbyte.constants import AIRBYTE_OFFLINE_MODE
 21from airbyte.logs import warn_once
 22from airbyte.version import get_version
 23
 24
 25logger = logging.getLogger("airbyte")
 26
 27
 28__cache: dict[str, ConnectorMetadata] | None = None
 29
 30
 31_REGISTRY_ENV_VAR = "AIRBYTE_LOCAL_REGISTRY"
 32_REGISTRY_URL = "https://connectors.airbyte.com/files/registries/v0/oss_registry.json"
 33
 34_PYTHON_LANGUAGE = "python"
 35_MANIFEST_ONLY_LANGUAGE = "manifest-only"
 36
 37_PYTHON_LANGUAGE_TAG = f"language:{_PYTHON_LANGUAGE}"
 38_MANIFEST_ONLY_TAG = f"language:{_MANIFEST_ONLY_LANGUAGE}"
 39
 40
 41class InstallType(str, Enum):
 42    """The type of installation for a connector."""
 43
 44    YAML = "yaml"
 45    PYTHON = "python"
 46    DOCKER = "docker"
 47    JAVA = "java"
 48
 49
 50class Language(str, Enum):
 51    """The language of a connector."""
 52
 53    PYTHON = InstallType.PYTHON.value
 54    JAVA = InstallType.JAVA.value
 55    MANIFEST_ONLY = _MANIFEST_ONLY_LANGUAGE
 56
 57
 58class ConnectorMetadata(BaseModel):
 59    """Metadata for a connector."""
 60
 61    name: str
 62    """Connector name. For example, "source-google-sheets"."""
 63
 64    latest_available_version: str | None
 65    """The latest available version of the connector."""
 66
 67    pypi_package_name: str | None
 68    """The name of the PyPI package for the connector, if it exists."""
 69
 70    language: Language | None
 71    """The language of the connector."""
 72
 73    install_types: set[InstallType]
 74    """The supported install types for the connector."""
 75
 76    suggested_streams: list[str] | None = None
 77    """A list of suggested streams for the connector, if available."""
 78
 79    @property
 80    def default_install_type(self) -> InstallType:
 81        """Return the default install type for the connector."""
 82        if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types:
 83            return InstallType.YAML
 84
 85        if InstallType.PYTHON in self.install_types:
 86            return InstallType.PYTHON
 87
 88        # Else: Java or Docker
 89        return InstallType.DOCKER
 90
 91
 92def _get_registry_url() -> str:
 93    if _REGISTRY_ENV_VAR in os.environ:
 94        return str(os.environ.get(_REGISTRY_ENV_VAR))
 95
 96    return _REGISTRY_URL
 97
 98
 99def _is_registry_disabled(url: str) -> bool:
100    return url.upper() in {"0", "F", "FALSE"} or AIRBYTE_OFFLINE_MODE
101
102
103def _registry_entry_to_connector_metadata(entry: dict) -> ConnectorMetadata:
104    name = entry["dockerRepository"].replace("airbyte/", "")
105    latest_version: str | None = entry.get("dockerImageTag")
106    tags = entry.get("tags", [])
107    language: Language | None = None
108
109    if "language" in entry and entry["language"] is not None:
110        try:
111            language = Language(entry["language"])
112        except Exception:
113            warnings.warn(
114                message=f"Invalid language for connector {name}: {entry['language']}",
115                stacklevel=2,
116            )
117    if not language and _PYTHON_LANGUAGE_TAG in tags:
118        language = Language.PYTHON
119    if not language and _MANIFEST_ONLY_TAG in tags:
120        language = Language.MANIFEST_ONLY
121
122    remote_registries: dict = entry.get("remoteRegistries", {})
123    pypi_registry: dict = remote_registries.get("pypi", {})
124    pypi_package_name = cast(
125        "str | None",
126        pypi_registry.get("packageName", None),
127    )
128    pypi_enabled: bool = pypi_registry.get("enabled", False)
129    install_types: set[InstallType] = {
130        x
131        for x in [
132            InstallType.DOCKER,  # Always True
133            InstallType.PYTHON if language == Language.PYTHON and pypi_enabled else None,
134            InstallType.JAVA if language == Language.JAVA else None,
135            InstallType.YAML if language == Language.MANIFEST_ONLY else None,
136        ]
137        if x
138    }
139
140    return ConnectorMetadata(
141        name=name,
142        latest_available_version=latest_version,
143        pypi_package_name=pypi_package_name if pypi_enabled else None,
144        language=language,
145        install_types=install_types,
146        suggested_streams=entry.get("suggestedStreams", {}).get("streams", None),
147    )
148
149
150def _get_registry_cache(*, force_refresh: bool = False) -> dict[str, ConnectorMetadata]:
151    """Return the registry cache."""
152    global __cache
153    if __cache and not force_refresh:
154        return __cache
155
156    registry_url = _get_registry_url()
157
158    if _is_registry_disabled(registry_url):
159        return {}
160
161    if registry_url.startswith("http"):
162        response = requests.get(
163            registry_url,
164            headers={"User-Agent": f"PyAirbyte/{get_version()}"},
165        )
166        response.raise_for_status()
167        data = response.json()
168    else:
169        # Assume local file
170        with Path(registry_url).open(encoding="utf-8") as f:
171            data = json.load(f)
172
173    new_cache: dict[str, ConnectorMetadata] = {}
174
175    for connector in data["sources"]:
176        connector_metadata = _registry_entry_to_connector_metadata(connector)
177        new_cache[connector_metadata.name] = connector_metadata
178
179    for connector in data["destinations"]:
180        connector_metadata = _registry_entry_to_connector_metadata(connector)
181        new_cache[connector_metadata.name] = connector_metadata
182
183    if len(new_cache) == 0:
184        # This isn't necessarily fatal, since users can bring their own
185        # connector definitions.
186        warn_once(
187            message=f"Connector registry is empty: {registry_url}",
188            with_stack=False,
189        )
190
191    __cache = new_cache
192    return __cache
193
194
195def get_connector_metadata(name: str) -> ConnectorMetadata | None:
196    """Check the cache for the connector.
197
198    If the cache is empty, populate by calling update_cache.
199    """
200    registry_url = _get_registry_url()
201
202    if _is_registry_disabled(registry_url):
203        return None
204
205    cache = copy(_get_registry_cache())
206
207    if not cache:
208        raise exc.PyAirbyteInternalError(
209            message="Connector registry could not be loaded.",
210            context={
211                "registry_url": _get_registry_url(),
212            },
213        )
214    if name not in cache:
215        raise exc.AirbyteConnectorNotRegisteredError(
216            connector_name=name,
217            context={
218                "registry_url": _get_registry_url(),
219                "available_connectors": get_available_connectors(),
220            },
221        )
222    return cache[name]
223
224
225def get_available_connectors(install_type: InstallType | str | None = None) -> list[str]:
226    """Return a list of all available connectors.
227
228    Connectors will be returned in alphabetical order, with the standard prefix "source-".
229    """
230    if install_type is None:
231        # No install type specified. Filter for whatever is runnable.
232        if is_docker_installed():
233            logger.info("Docker is detected. Returning all connectors.")
234            # If Docker is available, return all connectors.
235            return sorted(conn.name for conn in _get_registry_cache().values())
236
237        logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.")
238
239        # If Docker is not available, return only Python and Manifest-based connectors.
240        return sorted(
241            conn.name
242            for conn in _get_registry_cache().values()
243            if conn.language in {Language.PYTHON, Language.MANIFEST_ONLY}
244        )
245
246    if not isinstance(install_type, InstallType):
247        install_type = InstallType(install_type)
248
249    if install_type == InstallType.PYTHON:
250        return sorted(
251            conn.name
252            for conn in _get_registry_cache().values()
253            if conn.pypi_package_name is not None
254        )
255
256    if install_type == InstallType.JAVA:
257        warnings.warn(
258            message="Java connectors are not yet supported.",
259            stacklevel=2,
260        )
261        return sorted(
262            conn.name for conn in _get_registry_cache().values() if conn.language == Language.JAVA
263        )
264
265    if install_type == InstallType.DOCKER:
266        return sorted(conn.name for conn in _get_registry_cache().values())
267
268    if install_type == InstallType.YAML:
269        return sorted(
270            conn.name
271            for conn in _get_registry_cache().values()
272            if InstallType.YAML in conn.install_types
273        )
274
275    # pragma: no cover  # Should never be reached.
276    raise exc.PyAirbyteInputError(
277        message="Invalid install type.",
278        context={
279            "install_type": install_type,
280        },
281    )
logger = <Logger airbyte (INFO)>
class InstallType(builtins.str, enum.Enum):
42class InstallType(str, Enum):
43    """The type of installation for a connector."""
44
45    YAML = "yaml"
46    PYTHON = "python"
47    DOCKER = "docker"
48    JAVA = "java"

The type of installation for a connector.

YAML = <InstallType.YAML: 'yaml'>
PYTHON = <InstallType.PYTHON: 'python'>
DOCKER = <InstallType.DOCKER: 'docker'>
JAVA = <InstallType.JAVA: 'java'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class Language(builtins.str, enum.Enum):
51class Language(str, Enum):
52    """The language of a connector."""
53
54    PYTHON = InstallType.PYTHON.value
55    JAVA = InstallType.JAVA.value
56    MANIFEST_ONLY = _MANIFEST_ONLY_LANGUAGE

The language of a connector.

PYTHON = <Language.PYTHON: 'python'>
JAVA = <Language.JAVA: 'java'>
MANIFEST_ONLY = <Language.MANIFEST_ONLY: 'manifest-only'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class ConnectorMetadata(pydantic.main.BaseModel):
59class ConnectorMetadata(BaseModel):
60    """Metadata for a connector."""
61
62    name: str
63    """Connector name. For example, "source-google-sheets"."""
64
65    latest_available_version: str | None
66    """The latest available version of the connector."""
67
68    pypi_package_name: str | None
69    """The name of the PyPI package for the connector, if it exists."""
70
71    language: Language | None
72    """The language of the connector."""
73
74    install_types: set[InstallType]
75    """The supported install types for the connector."""
76
77    suggested_streams: list[str] | None = None
78    """A list of suggested streams for the connector, if available."""
79
80    @property
81    def default_install_type(self) -> InstallType:
82        """Return the default install type for the connector."""
83        if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types:
84            return InstallType.YAML
85
86        if InstallType.PYTHON in self.install_types:
87            return InstallType.PYTHON
88
89        # Else: Java or Docker
90        return InstallType.DOCKER

Metadata for a connector.

name: str

Connector name. For example, "source-google-sheets".

latest_available_version: str | None

The latest available version of the connector.

pypi_package_name: str | None

The name of the PyPI package for the connector, if it exists.

language: Language | None

The language of the connector.

install_types: set[InstallType]

The supported install types for the connector.

suggested_streams: list[str] | None

A list of suggested streams for the connector, if available.

default_install_type: InstallType
80    @property
81    def default_install_type(self) -> InstallType:
82        """Return the default install type for the connector."""
83        if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types:
84            return InstallType.YAML
85
86        if InstallType.PYTHON in self.install_types:
87            return InstallType.PYTHON
88
89        # Else: Java or Docker
90        return InstallType.DOCKER

Return the default install type for the connector.

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
def get_connector_metadata(name: str) -> ConnectorMetadata | None:
196def get_connector_metadata(name: str) -> ConnectorMetadata | None:
197    """Check the cache for the connector.
198
199    If the cache is empty, populate by calling update_cache.
200    """
201    registry_url = _get_registry_url()
202
203    if _is_registry_disabled(registry_url):
204        return None
205
206    cache = copy(_get_registry_cache())
207
208    if not cache:
209        raise exc.PyAirbyteInternalError(
210            message="Connector registry could not be loaded.",
211            context={
212                "registry_url": _get_registry_url(),
213            },
214        )
215    if name not in cache:
216        raise exc.AirbyteConnectorNotRegisteredError(
217            connector_name=name,
218            context={
219                "registry_url": _get_registry_url(),
220                "available_connectors": get_available_connectors(),
221            },
222        )
223    return cache[name]

Check the cache for the connector.

If the cache is empty, populate by calling update_cache.

def get_available_connectors( install_type: InstallType | str | None = None) -> list[str]:
226def get_available_connectors(install_type: InstallType | str | None = None) -> list[str]:
227    """Return a list of all available connectors.
228
229    Connectors will be returned in alphabetical order, with the standard prefix "source-".
230    """
231    if install_type is None:
232        # No install type specified. Filter for whatever is runnable.
233        if is_docker_installed():
234            logger.info("Docker is detected. Returning all connectors.")
235            # If Docker is available, return all connectors.
236            return sorted(conn.name for conn in _get_registry_cache().values())
237
238        logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.")
239
240        # If Docker is not available, return only Python and Manifest-based connectors.
241        return sorted(
242            conn.name
243            for conn in _get_registry_cache().values()
244            if conn.language in {Language.PYTHON, Language.MANIFEST_ONLY}
245        )
246
247    if not isinstance(install_type, InstallType):
248        install_type = InstallType(install_type)
249
250    if install_type == InstallType.PYTHON:
251        return sorted(
252            conn.name
253            for conn in _get_registry_cache().values()
254            if conn.pypi_package_name is not None
255        )
256
257    if install_type == InstallType.JAVA:
258        warnings.warn(
259            message="Java connectors are not yet supported.",
260            stacklevel=2,
261        )
262        return sorted(
263            conn.name for conn in _get_registry_cache().values() if conn.language == Language.JAVA
264        )
265
266    if install_type == InstallType.DOCKER:
267        return sorted(conn.name for conn in _get_registry_cache().values())
268
269    if install_type == InstallType.YAML:
270        return sorted(
271            conn.name
272            for conn in _get_registry_cache().values()
273            if InstallType.YAML in conn.install_types
274        )
275
276    # pragma: no cover  # Should never be reached.
277    raise exc.PyAirbyteInputError(
278        message="Invalid install type.",
279        context={
280            "install_type": install_type,
281        },
282    )

Return a list of all available connectors.

Connectors will be returned in alphabetical order, with the standard prefix "source-".