airbyte.sources.registry

Connectivity to the connector catalog registry.

  1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  2"""Connectivity to the connector catalog registry."""
  3
  4from __future__ import annotations
  5
  6import json
  7import logging
  8import os
  9import warnings
 10from copy import copy
 11from dataclasses import dataclass
 12from enum import Enum
 13from pathlib import Path
 14
 15import requests
 16
 17from airbyte import exceptions as exc
 18from airbyte._util.meta import is_docker_installed
 19from airbyte.constants import AIRBYTE_OFFLINE_MODE
 20from airbyte.logs import warn_once
 21from airbyte.version import get_version
 22
 23
 24logger = logging.getLogger("airbyte")
 25
 26
 27__cache: dict[str, ConnectorMetadata] | None = None
 28
 29
 30_REGISTRY_ENV_VAR = "AIRBYTE_LOCAL_REGISTRY"
 31_REGISTRY_URL = "https://connectors.airbyte.com/files/registries/v0/oss_registry.json"
 32
 33_LOWCODE_CDK_TAG = "cdk:low-code"
 34
 35_PYTHON_LANGUAGE = "python"
 36_MANIFEST_ONLY_LANGUAGE = "manifest-only"
 37
 38_PYTHON_LANGUAGE_TAG = f"language:{_PYTHON_LANGUAGE}"
 39_MANIFEST_ONLY_TAG = f"language:{_MANIFEST_ONLY_LANGUAGE}"
 40
 41_LOWCODE_CONNECTORS_NEEDING_PYTHON: list[str] = [
 42    "source-adjust",
 43    "source-alpha-vantage",
 44    "source-amplitude",
 45    "source-apify-dataset",
 46    "source-asana",
 47    "source-avni",
 48    "source-aws-cloudtrail",
 49    "source-bamboo-hr",
 50    "source-braintree",
 51    "source-braze",
 52    "source-chargebee",
 53    "source-close-com",
 54    "source-commercetools",
 55    "source-facebook-pages",
 56    "source-fastbill",
 57    "source-freshdesk",
 58    "source-gitlab",
 59    "source-gnews",
 60    "source-greenhouse",
 61    "source-instagram",
 62    "source-instatus",
 63    "source-intercom",
 64    "source-iterable",
 65    "source-jina-ai-reader",
 66    "source-jira",
 67    "source-klaviyo",
 68    "source-looker",
 69    "source-mailchimp",
 70    "source-mixpanel",
 71    "source-monday",
 72    "source-my-hours",
 73    "source-notion",
 74    "source-okta",
 75    "source-orb",
 76    "source-outreach",
 77    "source-partnerstack",
 78    "source-paypal-transaction",
 79    "source-pinterest",
 80    "source-pipedrive",
 81    "source-pocket",
 82    "source-posthog",
 83    "source-prestashop",
 84    "source-public-apis",
 85    "source-qualaroo",
 86    "source-quickbooks",
 87    "source-railz",
 88    "source-recharge",
 89    "source-recurly",
 90    "source-retently",
 91    "source-rss",
 92    "source-salesloft",
 93    "source-slack",
 94    "source-surveymonkey",
 95    "source-tiktok-marketing",
 96    "source-the-guardian-api",
 97    "source-trello",
 98    "source-typeform",
 99    "source-us-census",
100    "source-xero",
101    "source-younium",
102    "source-zendesk-chat",
103    "source-zendesk-sunshine",
104    "source-zendesk-support",
105    "source-zendesk-talk",
106    "source-zenloop",
107    "source-zoom",
108]
109_LOWCODE_CONNECTORS_FAILING_VALIDATION = [
110    "source-amazon-ads",
111]
112# Connectors that return 404 or some other misc exception.
113_LOWCODE_CONNECTORS_UNEXPECTED_ERRORS: list[str] = []
114# (CDK) FileNotFoundError: Unable to find spec.yaml or spec.json in the package.
115_LOWCODE_CDK_FILE_NOT_FOUND_ERRORS: list[str] = [
116    "source-apple-search-ads",
117    "source-marketo",
118    "source-n8n",
119    "source-onesignal",
120    "source-postmarkapp",
121    "source-sentry",
122    "source-unleash",
123]
124_LOWCODE_CONNECTORS_EXCLUDED: list[str] = [
125    *_LOWCODE_CONNECTORS_FAILING_VALIDATION,
126    *_LOWCODE_CONNECTORS_UNEXPECTED_ERRORS,
127    *_LOWCODE_CONNECTORS_NEEDING_PYTHON,
128    *_LOWCODE_CDK_FILE_NOT_FOUND_ERRORS,
129]
130
131
132class InstallType(str, Enum):
133    """The type of installation for a connector."""
134
135    YAML = "yaml"
136    PYTHON = "python"
137    DOCKER = "docker"
138    JAVA = "java"
139
140
141class Language(str, Enum):
142    """The language of a connector."""
143
144    PYTHON = InstallType.PYTHON.value
145    JAVA = InstallType.JAVA.value
146    MANIFEST_ONLY = _MANIFEST_ONLY_LANGUAGE
147
148
149@dataclass
150class ConnectorMetadata:
151    """Metadata for a connector."""
152
153    name: str
154    """Connector name. For example, "source-google-sheets"."""
155
156    latest_available_version: str | None
157    """The latest available version of the connector."""
158
159    pypi_package_name: str | None
160    """The name of the PyPI package for the connector, if it exists."""
161
162    language: Language | None
163    """The language of the connector."""
164
165    install_types: set[InstallType]
166    """The supported install types for the connector."""
167
168    @property
169    def default_install_type(self) -> InstallType:
170        """Return the default install type for the connector."""
171        if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types:
172            return InstallType.YAML
173
174        if InstallType.PYTHON in self.install_types:
175            return InstallType.PYTHON
176
177        # Else: Java or Docker
178        return InstallType.DOCKER
179
180
181def _get_registry_url() -> str:
182    if _REGISTRY_ENV_VAR in os.environ:
183        return str(os.environ.get(_REGISTRY_ENV_VAR))
184
185    return _REGISTRY_URL
186
187
188def _is_registry_disabled(url: str) -> bool:
189    return url.upper() in {"0", "F", "FALSE"} or AIRBYTE_OFFLINE_MODE
190
191
192def _registry_entry_to_connector_metadata(entry: dict) -> ConnectorMetadata:
193    name = entry["dockerRepository"].replace("airbyte/", "")
194    latest_version: str | None = entry.get("dockerImageTag")
195    tags = entry.get("tags", [])
196    language: Language | None = None
197
198    if "language" in entry and entry["language"] is not None:
199        try:
200            language = Language(entry["language"])
201        except Exception:
202            warnings.warn(
203                message=f"Invalid language for connector {name}: {entry['language']}",
204                stacklevel=2,
205            )
206    if not language and _PYTHON_LANGUAGE_TAG in tags:
207        language = Language.PYTHON
208    if not language and _MANIFEST_ONLY_TAG in tags:
209        language = Language.MANIFEST_ONLY
210
211    remote_registries: dict = entry.get("remoteRegistries", {})
212    pypi_registry: dict = remote_registries.get("pypi", {})
213    pypi_package_name: str = pypi_registry.get("packageName", None)
214    pypi_enabled: bool = pypi_registry.get("enabled", False)
215    install_types: set[InstallType] = {
216        x
217        for x in [
218            InstallType.DOCKER,  # Always True
219            InstallType.PYTHON if language == Language.PYTHON and pypi_enabled else None,
220            InstallType.JAVA if language == Language.JAVA else None,
221            InstallType.YAML if language == Language.MANIFEST_ONLY else None,
222            # TODO: Remove 'cdk:low-code' check once all connectors are migrated to manifest-only.
223            # https://github.com/airbytehq/PyAirbyte/issues/348
224            InstallType.YAML if _LOWCODE_CDK_TAG in tags else None,
225        ]
226        if x
227    }
228
229    return ConnectorMetadata(
230        name=name,
231        latest_available_version=latest_version,
232        pypi_package_name=pypi_package_name if pypi_enabled else None,
233        language=language,
234        install_types=install_types,
235    )
236
237
238def _get_registry_cache(*, force_refresh: bool = False) -> dict[str, ConnectorMetadata]:
239    """Return the registry cache."""
240    global __cache
241    if __cache and not force_refresh:
242        return __cache
243
244    registry_url = _get_registry_url()
245
246    if _is_registry_disabled(registry_url):
247        return {}
248
249    if registry_url.startswith("http"):
250        response = requests.get(
251            registry_url,
252            headers={"User-Agent": f"PyAirbyte/{get_version()}"},
253        )
254        response.raise_for_status()
255        data = response.json()
256    else:
257        # Assume local file
258        with Path(registry_url).open(encoding="utf-8") as f:
259            data = json.load(f)
260
261    new_cache: dict[str, ConnectorMetadata] = {}
262
263    for connector in data["sources"]:
264        connector_metadata = _registry_entry_to_connector_metadata(connector)
265        new_cache[connector_metadata.name] = connector_metadata
266
267    for connector in data["destinations"]:
268        connector_metadata = _registry_entry_to_connector_metadata(connector)
269        new_cache[connector_metadata.name] = connector_metadata
270
271    if len(new_cache) == 0:
272        # This isn't necessarily fatal, since users can bring their own
273        # connector definitions.
274        warn_once(
275            message=f"Connector registry is empty: {registry_url}",
276            with_stack=False,
277        )
278
279    __cache = new_cache
280    return __cache
281
282
283def get_connector_metadata(name: str) -> ConnectorMetadata | None:
284    """Check the cache for the connector.
285
286    If the cache is empty, populate by calling update_cache.
287    """
288    registry_url = _get_registry_url()
289
290    if _is_registry_disabled(registry_url):
291        return None
292
293    cache = copy(_get_registry_cache())
294
295    if not cache:
296        raise exc.PyAirbyteInternalError(
297            message="Connector registry could not be loaded.",
298            context={
299                "registry_url": _get_registry_url(),
300            },
301        )
302    if name not in cache:
303        raise exc.AirbyteConnectorNotRegisteredError(
304            connector_name=name,
305            context={
306                "registry_url": _get_registry_url(),
307                "available_connectors": get_available_connectors(),
308            },
309        )
310    return cache[name]
311
312
313def get_available_connectors(install_type: InstallType | str | None = None) -> list[str]:
314    """Return a list of all available connectors.
315
316    Connectors will be returned in alphabetical order, with the standard prefix "source-".
317    """
318    if install_type is None:
319        # No install type specified. Filter for whatever is runnable.
320        if is_docker_installed():
321            logger.info("Docker is detected. Returning all connectors.")
322            # If Docker is available, return all connectors.
323            return sorted(conn.name for conn in _get_registry_cache().values())
324
325        logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.")
326
327        # If Docker is not available, return only Python and Manifest-based connectors.
328        return sorted(
329            conn.name
330            for conn in _get_registry_cache().values()
331            if conn.language in {Language.PYTHON, Language.MANIFEST_ONLY}
332        )
333
334    if not isinstance(install_type, InstallType):
335        install_type = InstallType(install_type)
336
337    if install_type == InstallType.PYTHON:
338        return sorted(
339            conn.name
340            for conn in _get_registry_cache().values()
341            if conn.pypi_package_name is not None
342        )
343
344    if install_type == InstallType.JAVA:
345        warnings.warn(
346            message="Java connectors are not yet supported.",
347            stacklevel=2,
348        )
349        return sorted(
350            conn.name for conn in _get_registry_cache().values() if conn.language == Language.JAVA
351        )
352
353    if install_type == InstallType.DOCKER:
354        return sorted(conn.name for conn in _get_registry_cache().values())
355
356    if install_type == InstallType.YAML:
357        return sorted(
358            conn.name
359            for conn in _get_registry_cache().values()
360            if InstallType.YAML in conn.install_types
361            and conn.name not in _LOWCODE_CONNECTORS_EXCLUDED
362        )
363
364    # pragma: no cover  # Should never be reached.
365    raise exc.PyAirbyteInputError(
366        message="Invalid install type.",
367        context={
368            "install_type": install_type,
369        },
370    )
logger = <Logger airbyte (INFO)>
class InstallType(builtins.str, enum.Enum):
133class InstallType(str, Enum):
134    """The type of installation for a connector."""
135
136    YAML = "yaml"
137    PYTHON = "python"
138    DOCKER = "docker"
139    JAVA = "java"

The type of installation for a connector.

YAML = <InstallType.YAML: 'yaml'>
PYTHON = <InstallType.PYTHON: 'python'>
DOCKER = <InstallType.DOCKER: 'docker'>
JAVA = <InstallType.JAVA: 'java'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class Language(builtins.str, enum.Enum):
142class Language(str, Enum):
143    """The language of a connector."""
144
145    PYTHON = InstallType.PYTHON.value
146    JAVA = InstallType.JAVA.value
147    MANIFEST_ONLY = _MANIFEST_ONLY_LANGUAGE

The language of a connector.

PYTHON = <Language.PYTHON: 'python'>
JAVA = <Language.JAVA: 'java'>
MANIFEST_ONLY = <Language.MANIFEST_ONLY: 'manifest-only'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
@dataclass
class ConnectorMetadata:
150@dataclass
151class ConnectorMetadata:
152    """Metadata for a connector."""
153
154    name: str
155    """Connector name. For example, "source-google-sheets"."""
156
157    latest_available_version: str | None
158    """The latest available version of the connector."""
159
160    pypi_package_name: str | None
161    """The name of the PyPI package for the connector, if it exists."""
162
163    language: Language | None
164    """The language of the connector."""
165
166    install_types: set[InstallType]
167    """The supported install types for the connector."""
168
169    @property
170    def default_install_type(self) -> InstallType:
171        """Return the default install type for the connector."""
172        if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types:
173            return InstallType.YAML
174
175        if InstallType.PYTHON in self.install_types:
176            return InstallType.PYTHON
177
178        # Else: Java or Docker
179        return InstallType.DOCKER

Metadata for a connector.

ConnectorMetadata( name: str, latest_available_version: str | None, pypi_package_name: str | None, language: Language | None, install_types: set[InstallType])
name: str

Connector name. For example, "source-google-sheets".

latest_available_version: str | None

The latest available version of the connector.

pypi_package_name: str | None

The name of the PyPI package for the connector, if it exists.

language: Language | None

The language of the connector.

install_types: set[InstallType]

The supported install types for the connector.

default_install_type: InstallType
169    @property
170    def default_install_type(self) -> InstallType:
171        """Return the default install type for the connector."""
172        if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types:
173            return InstallType.YAML
174
175        if InstallType.PYTHON in self.install_types:
176            return InstallType.PYTHON
177
178        # Else: Java or Docker
179        return InstallType.DOCKER

Return the default install type for the connector.

def get_connector_metadata(name: str) -> ConnectorMetadata | None:
284def get_connector_metadata(name: str) -> ConnectorMetadata | None:
285    """Check the cache for the connector.
286
287    If the cache is empty, populate by calling update_cache.
288    """
289    registry_url = _get_registry_url()
290
291    if _is_registry_disabled(registry_url):
292        return None
293
294    cache = copy(_get_registry_cache())
295
296    if not cache:
297        raise exc.PyAirbyteInternalError(
298            message="Connector registry could not be loaded.",
299            context={
300                "registry_url": _get_registry_url(),
301            },
302        )
303    if name not in cache:
304        raise exc.AirbyteConnectorNotRegisteredError(
305            connector_name=name,
306            context={
307                "registry_url": _get_registry_url(),
308                "available_connectors": get_available_connectors(),
309            },
310        )
311    return cache[name]

Check the cache for the connector.

If the cache is empty, populate by calling update_cache.

def get_available_connectors( install_type: InstallType | str | None = None) -> list[str]:
314def get_available_connectors(install_type: InstallType | str | None = None) -> list[str]:
315    """Return a list of all available connectors.
316
317    Connectors will be returned in alphabetical order, with the standard prefix "source-".
318    """
319    if install_type is None:
320        # No install type specified. Filter for whatever is runnable.
321        if is_docker_installed():
322            logger.info("Docker is detected. Returning all connectors.")
323            # If Docker is available, return all connectors.
324            return sorted(conn.name for conn in _get_registry_cache().values())
325
326        logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.")
327
328        # If Docker is not available, return only Python and Manifest-based connectors.
329        return sorted(
330            conn.name
331            for conn in _get_registry_cache().values()
332            if conn.language in {Language.PYTHON, Language.MANIFEST_ONLY}
333        )
334
335    if not isinstance(install_type, InstallType):
336        install_type = InstallType(install_type)
337
338    if install_type == InstallType.PYTHON:
339        return sorted(
340            conn.name
341            for conn in _get_registry_cache().values()
342            if conn.pypi_package_name is not None
343        )
344
345    if install_type == InstallType.JAVA:
346        warnings.warn(
347            message="Java connectors are not yet supported.",
348            stacklevel=2,
349        )
350        return sorted(
351            conn.name for conn in _get_registry_cache().values() if conn.language == Language.JAVA
352        )
353
354    if install_type == InstallType.DOCKER:
355        return sorted(conn.name for conn in _get_registry_cache().values())
356
357    if install_type == InstallType.YAML:
358        return sorted(
359            conn.name
360            for conn in _get_registry_cache().values()
361            if InstallType.YAML in conn.install_types
362            and conn.name not in _LOWCODE_CONNECTORS_EXCLUDED
363        )
364
365    # pragma: no cover  # Should never be reached.
366    raise exc.PyAirbyteInputError(
367        message="Invalid install type.",
368        context={
369            "install_type": install_type,
370        },
371    )

Return a list of all available connectors.

Connectors will be returned in alphabetical order, with the standard prefix "source-".