airbyte.sources.registry

Connectivity to the connector catalog registry.

  1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  2"""Connectivity to the connector catalog registry."""
  3
  4from __future__ import annotations
  5
  6import json
  7import logging
  8import os
  9import warnings
 10from copy import copy
 11from dataclasses import dataclass
 12from enum import Enum
 13from pathlib import Path
 14
 15import requests
 16
 17from airbyte import exceptions as exc
 18from airbyte._util.meta import is_docker_installed
 19from airbyte.constants import AIRBYTE_OFFLINE_MODE
 20from airbyte.logs import warn_once
 21from airbyte.version import get_version
 22
 23
 24logger = logging.getLogger("airbyte")
 25
 26
 27__cache: dict[str, ConnectorMetadata] | None = None
 28
 29
 30_REGISTRY_ENV_VAR = "AIRBYTE_LOCAL_REGISTRY"
 31_REGISTRY_URL = "https://connectors.airbyte.com/files/registries/v0/oss_registry.json"
 32
 33_PYTHON_LANGUAGE = "python"
 34_MANIFEST_ONLY_LANGUAGE = "manifest-only"
 35
 36_PYTHON_LANGUAGE_TAG = f"language:{_PYTHON_LANGUAGE}"
 37_MANIFEST_ONLY_TAG = f"language:{_MANIFEST_ONLY_LANGUAGE}"
 38
 39_LOWCODE_CONNECTORS_NEEDING_PYTHON: list[str] = [
 40    "source-alpha-vantage",
 41    "source-amplitude",
 42    "source-apify-dataset",
 43    "source-asana",
 44    "source-avni",
 45    "source-aws-cloudtrail",
 46    "source-bamboo-hr",
 47    "source-braintree",
 48    "source-braze",
 49    "source-chargebee",
 50    "source-commercetools",
 51    "source-eventbrite",
 52    "source-facebook-pages",
 53    "source-fastbill",
 54    "source-freshdesk",
 55    "source-gitlab",
 56    "source-gnews",
 57    "source-gong",
 58    "source-greenhouse",
 59    "source-instagram",
 60    "source-instatus",
 61    "source-intercom",
 62    "source-iterable",
 63    "source-jina-ai-reader",
 64    "source-jira",
 65    "source-klaviyo",
 66    "source-looker",
 67    "source-mailchimp",
 68    "source-mixpanel",
 69    "source-monday",
 70    "source-my-hours",
 71    "source-notion",
 72    "source-okta",
 73    "source-orb",
 74    "source-outreach",
 75    "source-paypal-transaction",
 76    "source-pinterest",
 77    "source-pipedrive",
 78    "source-pocket",
 79    "source-posthog",
 80    "source-prestashop",
 81    "source-public-apis",
 82    "source-qualaroo",
 83    "source-railz",
 84    "source-recharge",
 85    "source-recurly",
 86    "source-retently",
 87    "source-rss",
 88    "source-salesloft",
 89    "source-service-now",
 90    "source-slack",
 91    "source-surveymonkey",
 92    "source-the-guardian-api",
 93    "source-tiktok-marketing",
 94    "source-trello",
 95    "source-typeform",
 96    "source-us-census",
 97    "source-xero",
 98    "source-younium",
 99    "source-zendesk-chat",
100    "source-zendesk-sunshine",
101    "source-zendesk-support",
102    "source-zendesk-talk",
103    "source-zenloop",
104    "source-zoom",
105]
106_LOWCODE_CONNECTORS_FAILING_VALIDATION: list[str] = []
107# Connectors that return 404 or some other misc exception.
108_LOWCODE_CONNECTORS_UNEXPECTED_ERRORS: list[str] = [
109    "source-adjust",
110    "source-amazon-ads",
111    "source-marketo",
112]
113# (CDK) FileNotFoundError: Unable to find spec.yaml or spec.json in the package.
114_LOWCODE_CDK_FILE_NOT_FOUND_ERRORS: list[str] = []
115_LOWCODE_CONNECTORS_EXCLUDED: list[str] = [
116    *_LOWCODE_CONNECTORS_FAILING_VALIDATION,
117    *_LOWCODE_CONNECTORS_UNEXPECTED_ERRORS,
118    *_LOWCODE_CONNECTORS_NEEDING_PYTHON,
119    *_LOWCODE_CDK_FILE_NOT_FOUND_ERRORS,
120]
121
122
123class InstallType(str, Enum):
124    """The type of installation for a connector."""
125
126    YAML = "yaml"
127    PYTHON = "python"
128    DOCKER = "docker"
129    JAVA = "java"
130
131
132class Language(str, Enum):
133    """The language of a connector."""
134
135    PYTHON = InstallType.PYTHON.value
136    JAVA = InstallType.JAVA.value
137    MANIFEST_ONLY = _MANIFEST_ONLY_LANGUAGE
138
139
140@dataclass
141class ConnectorMetadata:
142    """Metadata for a connector."""
143
144    name: str
145    """Connector name. For example, "source-google-sheets"."""
146
147    latest_available_version: str | None
148    """The latest available version of the connector."""
149
150    pypi_package_name: str | None
151    """The name of the PyPI package for the connector, if it exists."""
152
153    language: Language | None
154    """The language of the connector."""
155
156    install_types: set[InstallType]
157    """The supported install types for the connector."""
158
159    @property
160    def default_install_type(self) -> InstallType:
161        """Return the default install type for the connector."""
162        if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types:
163            return InstallType.YAML
164
165        if InstallType.PYTHON in self.install_types:
166            return InstallType.PYTHON
167
168        # Else: Java or Docker
169        return InstallType.DOCKER
170
171
172def _get_registry_url() -> str:
173    if _REGISTRY_ENV_VAR in os.environ:
174        return str(os.environ.get(_REGISTRY_ENV_VAR))
175
176    return _REGISTRY_URL
177
178
179def _is_registry_disabled(url: str) -> bool:
180    return url.upper() in {"0", "F", "FALSE"} or AIRBYTE_OFFLINE_MODE
181
182
183def _registry_entry_to_connector_metadata(entry: dict) -> ConnectorMetadata:
184    name = entry["dockerRepository"].replace("airbyte/", "")
185    latest_version: str | None = entry.get("dockerImageTag")
186    tags = entry.get("tags", [])
187    language: Language | None = None
188
189    if "language" in entry and entry["language"] is not None:
190        try:
191            language = Language(entry["language"])
192        except Exception:
193            warnings.warn(
194                message=f"Invalid language for connector {name}: {entry['language']}",
195                stacklevel=2,
196            )
197    if not language and _PYTHON_LANGUAGE_TAG in tags:
198        language = Language.PYTHON
199    if not language and _MANIFEST_ONLY_TAG in tags:
200        language = Language.MANIFEST_ONLY
201
202    remote_registries: dict = entry.get("remoteRegistries", {})
203    pypi_registry: dict = remote_registries.get("pypi", {})
204    pypi_package_name: str = pypi_registry.get("packageName", None)
205    pypi_enabled: bool = pypi_registry.get("enabled", False)
206    install_types: set[InstallType] = {
207        x
208        for x in [
209            InstallType.DOCKER,  # Always True
210            InstallType.PYTHON if language == Language.PYTHON and pypi_enabled else None,
211            InstallType.JAVA if language == Language.JAVA else None,
212            InstallType.YAML if language == Language.MANIFEST_ONLY else None,
213        ]
214        if x
215    }
216
217    return ConnectorMetadata(
218        name=name,
219        latest_available_version=latest_version,
220        pypi_package_name=pypi_package_name if pypi_enabled else None,
221        language=language,
222        install_types=install_types,
223    )
224
225
226def _get_registry_cache(*, force_refresh: bool = False) -> dict[str, ConnectorMetadata]:
227    """Return the registry cache."""
228    global __cache
229    if __cache and not force_refresh:
230        return __cache
231
232    registry_url = _get_registry_url()
233
234    if _is_registry_disabled(registry_url):
235        return {}
236
237    if registry_url.startswith("http"):
238        response = requests.get(
239            registry_url,
240            headers={"User-Agent": f"PyAirbyte/{get_version()}"},
241        )
242        response.raise_for_status()
243        data = response.json()
244    else:
245        # Assume local file
246        with Path(registry_url).open(encoding="utf-8") as f:
247            data = json.load(f)
248
249    new_cache: dict[str, ConnectorMetadata] = {}
250
251    for connector in data["sources"]:
252        connector_metadata = _registry_entry_to_connector_metadata(connector)
253        new_cache[connector_metadata.name] = connector_metadata
254
255    for connector in data["destinations"]:
256        connector_metadata = _registry_entry_to_connector_metadata(connector)
257        new_cache[connector_metadata.name] = connector_metadata
258
259    if len(new_cache) == 0:
260        # This isn't necessarily fatal, since users can bring their own
261        # connector definitions.
262        warn_once(
263            message=f"Connector registry is empty: {registry_url}",
264            with_stack=False,
265        )
266
267    __cache = new_cache
268    return __cache
269
270
271def get_connector_metadata(name: str) -> ConnectorMetadata | None:
272    """Check the cache for the connector.
273
274    If the cache is empty, populate by calling update_cache.
275    """
276    registry_url = _get_registry_url()
277
278    if _is_registry_disabled(registry_url):
279        return None
280
281    cache = copy(_get_registry_cache())
282
283    if not cache:
284        raise exc.PyAirbyteInternalError(
285            message="Connector registry could not be loaded.",
286            context={
287                "registry_url": _get_registry_url(),
288            },
289        )
290    if name not in cache:
291        raise exc.AirbyteConnectorNotRegisteredError(
292            connector_name=name,
293            context={
294                "registry_url": _get_registry_url(),
295                "available_connectors": get_available_connectors(),
296            },
297        )
298    return cache[name]
299
300
301def get_available_connectors(install_type: InstallType | str | None = None) -> list[str]:
302    """Return a list of all available connectors.
303
304    Connectors will be returned in alphabetical order, with the standard prefix "source-".
305    """
306    if install_type is None:
307        # No install type specified. Filter for whatever is runnable.
308        if is_docker_installed():
309            logger.info("Docker is detected. Returning all connectors.")
310            # If Docker is available, return all connectors.
311            return sorted(conn.name for conn in _get_registry_cache().values())
312
313        logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.")
314
315        # If Docker is not available, return only Python and Manifest-based connectors.
316        return sorted(
317            conn.name
318            for conn in _get_registry_cache().values()
319            if conn.language in {Language.PYTHON, Language.MANIFEST_ONLY}
320        )
321
322    if not isinstance(install_type, InstallType):
323        install_type = InstallType(install_type)
324
325    if install_type == InstallType.PYTHON:
326        return sorted(
327            conn.name
328            for conn in _get_registry_cache().values()
329            if conn.pypi_package_name is not None
330        )
331
332    if install_type == InstallType.JAVA:
333        warnings.warn(
334            message="Java connectors are not yet supported.",
335            stacklevel=2,
336        )
337        return sorted(
338            conn.name for conn in _get_registry_cache().values() if conn.language == Language.JAVA
339        )
340
341    if install_type == InstallType.DOCKER:
342        return sorted(conn.name for conn in _get_registry_cache().values())
343
344    if install_type == InstallType.YAML:
345        return sorted(
346            conn.name
347            for conn in _get_registry_cache().values()
348            if InstallType.YAML in conn.install_types
349            and conn.name not in _LOWCODE_CONNECTORS_EXCLUDED
350        )
351
352    # pragma: no cover  # Should never be reached.
353    raise exc.PyAirbyteInputError(
354        message="Invalid install type.",
355        context={
356            "install_type": install_type,
357        },
358    )
logger = <Logger airbyte (INFO)>
class InstallType(builtins.str, enum.Enum):
124class InstallType(str, Enum):
125    """The type of installation for a connector."""
126
127    YAML = "yaml"
128    PYTHON = "python"
129    DOCKER = "docker"
130    JAVA = "java"

The type of installation for a connector.

YAML = <InstallType.YAML: 'yaml'>
PYTHON = <InstallType.PYTHON: 'python'>
DOCKER = <InstallType.DOCKER: 'docker'>
JAVA = <InstallType.JAVA: 'java'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class Language(builtins.str, enum.Enum):
133class Language(str, Enum):
134    """The language of a connector."""
135
136    PYTHON = InstallType.PYTHON.value
137    JAVA = InstallType.JAVA.value
138    MANIFEST_ONLY = _MANIFEST_ONLY_LANGUAGE

The language of a connector.

PYTHON = <Language.PYTHON: 'python'>
JAVA = <Language.JAVA: 'java'>
MANIFEST_ONLY = <Language.MANIFEST_ONLY: 'manifest-only'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
@dataclass
class ConnectorMetadata:
141@dataclass
142class ConnectorMetadata:
143    """Metadata for a connector."""
144
145    name: str
146    """Connector name. For example, "source-google-sheets"."""
147
148    latest_available_version: str | None
149    """The latest available version of the connector."""
150
151    pypi_package_name: str | None
152    """The name of the PyPI package for the connector, if it exists."""
153
154    language: Language | None
155    """The language of the connector."""
156
157    install_types: set[InstallType]
158    """The supported install types for the connector."""
159
160    @property
161    def default_install_type(self) -> InstallType:
162        """Return the default install type for the connector."""
163        if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types:
164            return InstallType.YAML
165
166        if InstallType.PYTHON in self.install_types:
167            return InstallType.PYTHON
168
169        # Else: Java or Docker
170        return InstallType.DOCKER

Metadata for a connector.

ConnectorMetadata( name: str, latest_available_version: str | None, pypi_package_name: str | None, language: Language | None, install_types: set[InstallType])
name: str

Connector name. For example, "source-google-sheets".

latest_available_version: str | None

The latest available version of the connector.

pypi_package_name: str | None

The name of the PyPI package for the connector, if it exists.

language: Language | None

The language of the connector.

install_types: set[InstallType]

The supported install types for the connector.

default_install_type: InstallType
160    @property
161    def default_install_type(self) -> InstallType:
162        """Return the default install type for the connector."""
163        if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types:
164            return InstallType.YAML
165
166        if InstallType.PYTHON in self.install_types:
167            return InstallType.PYTHON
168
169        # Else: Java or Docker
170        return InstallType.DOCKER

Return the default install type for the connector.

def get_connector_metadata(name: str) -> ConnectorMetadata | None:
272def get_connector_metadata(name: str) -> ConnectorMetadata | None:
273    """Check the cache for the connector.
274
275    If the cache is empty, populate by calling update_cache.
276    """
277    registry_url = _get_registry_url()
278
279    if _is_registry_disabled(registry_url):
280        return None
281
282    cache = copy(_get_registry_cache())
283
284    if not cache:
285        raise exc.PyAirbyteInternalError(
286            message="Connector registry could not be loaded.",
287            context={
288                "registry_url": _get_registry_url(),
289            },
290        )
291    if name not in cache:
292        raise exc.AirbyteConnectorNotRegisteredError(
293            connector_name=name,
294            context={
295                "registry_url": _get_registry_url(),
296                "available_connectors": get_available_connectors(),
297            },
298        )
299    return cache[name]

Check the cache for the connector.

If the cache is empty, populate by calling update_cache.

def get_available_connectors( install_type: InstallType | str | None = None) -> list[str]:
302def get_available_connectors(install_type: InstallType | str | None = None) -> list[str]:
303    """Return a list of all available connectors.
304
305    Connectors will be returned in alphabetical order, with the standard prefix "source-".
306    """
307    if install_type is None:
308        # No install type specified. Filter for whatever is runnable.
309        if is_docker_installed():
310            logger.info("Docker is detected. Returning all connectors.")
311            # If Docker is available, return all connectors.
312            return sorted(conn.name for conn in _get_registry_cache().values())
313
314        logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.")
315
316        # If Docker is not available, return only Python and Manifest-based connectors.
317        return sorted(
318            conn.name
319            for conn in _get_registry_cache().values()
320            if conn.language in {Language.PYTHON, Language.MANIFEST_ONLY}
321        )
322
323    if not isinstance(install_type, InstallType):
324        install_type = InstallType(install_type)
325
326    if install_type == InstallType.PYTHON:
327        return sorted(
328            conn.name
329            for conn in _get_registry_cache().values()
330            if conn.pypi_package_name is not None
331        )
332
333    if install_type == InstallType.JAVA:
334        warnings.warn(
335            message="Java connectors are not yet supported.",
336            stacklevel=2,
337        )
338        return sorted(
339            conn.name for conn in _get_registry_cache().values() if conn.language == Language.JAVA
340        )
341
342    if install_type == InstallType.DOCKER:
343        return sorted(conn.name for conn in _get_registry_cache().values())
344
345    if install_type == InstallType.YAML:
346        return sorted(
347            conn.name
348            for conn in _get_registry_cache().values()
349            if InstallType.YAML in conn.install_types
350            and conn.name not in _LOWCODE_CONNECTORS_EXCLUDED
351        )
352
353    # pragma: no cover  # Should never be reached.
354    raise exc.PyAirbyteInputError(
355        message="Invalid install type.",
356        context={
357            "install_type": install_type,
358        },
359    )

Return a list of all available connectors.

Connectors will be returned in alphabetical order, with the standard prefix "source-".