airbyte.registry

Connectivity to the connector catalog registry.

  1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  2"""Connectivity to the connector catalog registry."""
  3
  4from __future__ import annotations
  5
  6import json
  7import logging
  8import os
  9import warnings
 10from copy import copy
 11from enum import Enum
 12from pathlib import Path
 13from typing import Any, cast
 14
 15import requests
 16import yaml
 17from pydantic import BaseModel, Field
 18from typing_extensions import Self
 19
 20from airbyte import exceptions as exc
 21from airbyte._registry_utils import fetch_registry_version_date, parse_changelog_html
 22from airbyte._util.meta import is_docker_installed
 23from airbyte.constants import AIRBYTE_OFFLINE_MODE
 24from airbyte.logs import warn_once
 25from airbyte.version import get_version
 26
 27
 28logger = logging.getLogger("airbyte")
 29
 30
 31__cache: dict[str, ConnectorMetadata] | None = None
 32
 33
 34_REGISTRY_ENV_VAR = "AIRBYTE_LOCAL_REGISTRY"
 35_REGISTRY_URL = "https://connectors.airbyte.com/files/registries/v0/oss_registry.json"
 36
 37_PYTHON_LANGUAGE = "python"
 38_MANIFEST_ONLY_LANGUAGE = "manifest-only"
 39
 40_PYTHON_LANGUAGE_TAG = f"language:{_PYTHON_LANGUAGE}"
 41_MANIFEST_ONLY_TAG = f"language:{_MANIFEST_ONLY_LANGUAGE}"
 42
 43_DEFAULT_MANIFEST_URL = (
 44    "https://connectors.airbyte.com/files/metadata/airbyte/{source_name}/{version}/manifest.yaml"
 45)
 46
 47
 48class InstallType(str, Enum):
 49    """The type of installation for a connector."""
 50
 51    YAML = "yaml"
 52    """Manifest-only connectors that can be run without Docker."""
 53    PYTHON = "python"
 54    """Python-based connectors available via PyPI."""
 55    DOCKER = "docker"
 56    """Docker-based connectors (returns all connectors for backward compatibility)."""
 57    JAVA = "java"
 58    """Java-based connectors."""
 59
 60    INSTALLABLE = "installable"
 61    """Connectors installable in the current environment (environment-sensitive).
 62
 63    Returns all connectors if Docker is installed, otherwise only Python and YAML.
 64    """
 65    ANY = "any"
 66    """All connectors in the registry (environment-independent)."""
 67
 68
 69class Language(str, Enum):
 70    """The language of a connector."""
 71
 72    PYTHON = InstallType.PYTHON.value
 73    JAVA = InstallType.JAVA.value
 74    MANIFEST_ONLY = _MANIFEST_ONLY_LANGUAGE
 75
 76
 77class ConnectorMetadata(BaseModel):
 78    """Metadata for a connector."""
 79
 80    name: str
 81    """Connector name. For example, "source-google-sheets"."""
 82
 83    display_name: str | None = None
 84    """Human-readable connector name."""
 85
 86    connector_type: str | None = None
 87    """Connector type: `source` or `destination`."""
 88
 89    definition_id: str | None = None
 90    """Source or destination definition ID."""
 91
 92    docker_repository: str | None = None
 93    """Docker repository for the connector image."""
 94
 95    latest_available_version: str | None
 96    """The latest available version of the connector."""
 97
 98    pypi_package_name: str | None
 99    """The name of the PyPI package for the connector, if it exists."""
100
101    language: Language | None
102    """The language of the connector."""
103
104    install_types: set[InstallType]
105    """The supported install types for the connector."""
106
107    suggested_streams: list[str] | None = None
108    """A list of suggested streams for the connector, if available."""
109
110    support_level: str | None = None
111    """Connector support level."""
112
113    release_stage: str | None = None
114    """Connector release stage."""
115
116    source_type: str | None = None
117    """Connector subtype."""
118
119    documentation_url: str | None = None
120    """Connector documentation URL."""
121
122    release_date: str | None = None
123    """Connector release date."""
124
125    github_issue_label: str | None = None
126    """GitHub issue label for the connector."""
127
128    @property
129    def default_install_type(self) -> InstallType:
130        """Return the default install type for the connector."""
131        if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types:
132            return InstallType.YAML
133
134        if InstallType.PYTHON in self.install_types:
135            return InstallType.PYTHON
136
137        # Else: Java or Docker
138        return InstallType.DOCKER
139
140
141def _get_registry_url() -> str:
142    if _REGISTRY_ENV_VAR in os.environ:
143        return str(os.environ.get(_REGISTRY_ENV_VAR))
144
145    return _REGISTRY_URL
146
147
148def _is_registry_disabled(url: str) -> bool:
149    return url.upper() in {"0", "F", "FALSE"} or AIRBYTE_OFFLINE_MODE
150
151
152def _registry_entry_to_connector_metadata(entry: dict) -> ConnectorMetadata:
153    name = entry["dockerRepository"].replace("airbyte/", "")
154    latest_version: str | None = entry.get("dockerImageTag")
155    connector_type = "source" if name.startswith("source-") else "destination"
156    definition_id = entry.get("sourceDefinitionId") or entry.get("destinationDefinitionId")
157    tags = entry.get("tags", [])
158    language: Language | None = None
159
160    if "language" in entry and entry["language"] is not None:
161        try:
162            language = Language(entry["language"])
163        except Exception:
164            warnings.warn(
165                message=f"Invalid language for connector {name}: {entry['language']}",
166                stacklevel=2,
167            )
168    if not language and _PYTHON_LANGUAGE_TAG in tags:
169        language = Language.PYTHON
170    if not language and _MANIFEST_ONLY_TAG in tags:
171        language = Language.MANIFEST_ONLY
172
173    remote_registries: dict = entry.get("remoteRegistries", {})
174    pypi_registry: dict = remote_registries.get("pypi", {})
175    pypi_package_name = cast(
176        "str | None",
177        pypi_registry.get("packageName", None),
178    )
179    pypi_enabled: bool = pypi_registry.get("enabled", False)
180    install_types: set[InstallType] = {
181        x
182        for x in [
183            InstallType.DOCKER,  # Always True
184            InstallType.PYTHON if language == Language.PYTHON and pypi_enabled else None,
185            InstallType.JAVA if language == Language.JAVA else None,
186            InstallType.YAML if language == Language.MANIFEST_ONLY else None,
187        ]
188        if x
189    }
190
191    return ConnectorMetadata(
192        name=name,
193        display_name=entry.get("name"),
194        connector_type=connector_type,
195        definition_id=definition_id,
196        docker_repository=entry.get("dockerRepository"),
197        latest_available_version=latest_version,
198        pypi_package_name=pypi_package_name if pypi_enabled else None,
199        language=language,
200        install_types=install_types,
201        suggested_streams=entry.get("suggestedStreams", {}).get("streams", None),
202        support_level=entry.get("supportLevel"),
203        release_stage=entry.get("releaseStage"),
204        source_type=entry.get("sourceType"),
205        documentation_url=entry.get("documentationUrl"),
206        release_date=entry.get("releaseDate"),
207        github_issue_label=entry.get("githubIssueLabel"),
208    )
209
210
211def _get_registry_cache(
212    *,
213    force_refresh: bool = False,
214) -> dict[str, ConnectorMetadata]:
215    """Return the registry cache.
216
217    Result is a mapping of connector name to ConnectorMetadata.
218    """
219    global __cache
220    if __cache and not force_refresh:
221        return __cache
222
223    registry_url = _get_registry_url()
224
225    if _is_registry_disabled(registry_url):
226        return {}
227
228    if registry_url.startswith("http"):
229        response = requests.get(
230            registry_url,
231            headers={"User-Agent": f"PyAirbyte/{get_version()}"},
232        )
233        response.raise_for_status()
234        data = response.json()
235    else:
236        # Assume local file
237        with Path(registry_url).open(encoding="utf-8") as f:
238            data = json.load(f)
239
240    new_cache: dict[str, ConnectorMetadata] = {}
241
242    for connector in data["sources"]:
243        connector_metadata = _registry_entry_to_connector_metadata(connector)
244        new_cache[connector_metadata.name] = connector_metadata
245
246    for connector in data["destinations"]:
247        connector_metadata = _registry_entry_to_connector_metadata(connector)
248        new_cache[connector_metadata.name] = connector_metadata
249
250    if len(new_cache) == 0:
251        # This isn't necessarily fatal, since users can bring their own
252        # connector definitions.
253        warn_once(
254            message=f"Connector registry is empty: {registry_url}",
255            with_stack=False,
256        )
257
258    __cache = new_cache
259    return __cache
260
261
262def get_connector_metadata(name: str) -> ConnectorMetadata | None:
263    """Check the cache for the connector.
264
265    If the cache is empty, populate by calling update_cache.
266    """
267    registry_url = _get_registry_url()
268
269    if _is_registry_disabled(registry_url):
270        return None
271
272    cache = copy(_get_registry_cache())
273
274    if not cache:
275        raise exc.PyAirbyteInternalError(
276            message="Connector registry could not be loaded.",
277            context={
278                "registry_url": _get_registry_url(),
279            },
280        )
281    if name not in cache:
282        raise exc.AirbyteConnectorNotRegisteredError(
283            connector_name=name,
284            context={
285                "registry_url": _get_registry_url(),
286                "available_connectors": get_available_connectors(),
287            },
288        )
289    return cache[name]
290
291
292def get_available_connectors(
293    install_type: InstallType | str | None = InstallType.INSTALLABLE,
294) -> list[str]:
295    """Return a list of all available connectors.
296
297    Connectors will be returned in alphabetical order, with the standard prefix "source-".
298
299    Args:
300        install_type: The type of installation for the connector.
301            Defaults to `InstallType.INSTALLABLE`.
302    """
303    if install_type is None or install_type == InstallType.INSTALLABLE:
304        # Filter for installable connectors (default behavior).
305        if is_docker_installed():
306            logger.info("Docker is detected. Returning all connectors.")
307            return sorted(_get_registry_cache().keys())
308
309        logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.")
310        return sorted(
311            [
312                connector_name
313                for connector_name, conn_info in _get_registry_cache().items()
314                if conn_info.language in {Language.PYTHON, Language.MANIFEST_ONLY}
315            ]
316        )
317
318    if not isinstance(install_type, InstallType):
319        install_type = InstallType(install_type)
320
321    if install_type == InstallType.PYTHON:
322        return sorted(
323            connector_name
324            for connector_name, conn_info in _get_registry_cache().items()
325            if conn_info.pypi_package_name is not None
326        )
327
328    if install_type == InstallType.JAVA:
329        warnings.warn(
330            message="Java connectors are not yet supported.",
331            stacklevel=2,
332        )
333        return sorted(
334            connector_name
335            for connector_name, conn_info in _get_registry_cache().items()
336            if conn_info.language == Language.JAVA
337        )
338
339    if install_type in {InstallType.DOCKER, InstallType.ANY}:
340        return sorted(_get_registry_cache().keys())
341
342    if install_type == InstallType.YAML:
343        return sorted(
344            conn.name
345            for conn in _get_registry_cache().values()
346            if InstallType.YAML in conn.install_types
347        )
348
349    # pragma: no cover  # Should never be reached.
350    raise exc.PyAirbyteInputError(
351        message="Invalid install type.",
352        context={
353            "install_type": install_type,
354        },
355    )
356
357
358class ConnectorVersionInfo(BaseModel):
359    """Information about a specific connector version."""
360
361    version: str
362    release_date: str | None = None
363    docker_image_url: str
364    changelog_url: str
365    pr_url: str | None = None
366    pr_title: str | None = None
367    parsing_errors: list[str] = Field(default_factory=list)
368
369
370class ApiDocsUrl(BaseModel):
371    """API documentation URL information."""
372
373    title: str
374    url: str
375    source: str
376    doc_type: str = Field(default="other", alias="type")
377    requires_login: bool = Field(default=False, alias="requiresLogin")
378
379    model_config = {"populate_by_name": True}
380
381    @classmethod
382    def from_manifest_dict(cls, manifest_data: dict[str, Any]) -> list[Self]:
383        """Extract documentation URLs from parsed manifest data.
384
385        Args:
386            manifest_data: The parsed manifest.yaml data as a dictionary
387
388        Returns:
389            List of ApiDocsUrl objects extracted from the manifest
390        """
391        results: list[Self] = []
392
393        data_section = manifest_data.get("data")
394        if isinstance(data_section, dict):
395            external_docs = data_section.get("externalDocumentationUrls")
396            if isinstance(external_docs, list):
397                results = [
398                    cls(
399                        title=doc["title"],
400                        url=doc["url"],
401                        source="data_external_docs",
402                        doc_type=doc.get("type", "other"),
403                        requires_login=doc.get("requiresLogin", False),
404                    )
405                    for doc in external_docs
406                ]
407
408        return results
409
410
411def _manifest_url_for(connector_name: str) -> str:
412    """Get the expected URL of the manifest.yaml file for a connector.
413
414    Args:
415        connector_name: The canonical connector name (e.g., "source-facebook-marketing")
416
417    Returns:
418        The URL to the connector's manifest.yaml file
419    """
420    return _DEFAULT_MANIFEST_URL.format(
421        source_name=connector_name,
422        version="latest",
423    )
424
425
426def _fetch_manifest_dict(url: str) -> dict[str, Any]:
427    """Fetch and parse a manifest.yaml file from a URL.
428
429    Args:
430        url: The URL to fetch the manifest from
431
432    Returns:
433        The parsed manifest data as a dictionary, or empty dict if manifest not found (404)
434
435    Raises:
436        HTTPError: If the request fails with a non-404 status code
437    """
438    http_not_found = 404
439
440    response = requests.get(url, timeout=10)
441    if response.status_code == http_not_found:
442        return {}
443
444    response.raise_for_status()
445    return yaml.safe_load(response.text) or {}
446
447
448def _extract_docs_from_registry(connector_name: str) -> list[ApiDocsUrl]:
449    """Extract documentation URLs from connector registry metadata.
450
451    Args:
452        connector_name: The canonical connector name (e.g., "source-facebook-marketing")
453
454    Returns:
455        List of ApiDocsUrl objects extracted from the registry
456    """
457    registry_url = _get_registry_url()
458    response = requests.get(registry_url, timeout=10)
459    response.raise_for_status()
460    registry_data = response.json()
461
462    connector_list = registry_data.get("sources", []) + registry_data.get("destinations", [])
463    connector_entry = None
464    for entry in connector_list:
465        if entry.get("dockerRepository", "").endswith(f"/{connector_name}"):
466            connector_entry = entry
467            break
468
469    docs_urls = []
470
471    if connector_entry and "documentationUrl" in connector_entry:
472        docs_urls.append(
473            ApiDocsUrl(
474                title="Airbyte Documentation",
475                url=connector_entry["documentationUrl"],
476                source="registry",
477            )
478        )
479
480    if connector_entry and "externalDocumentationUrls" in connector_entry:
481        external_docs = connector_entry["externalDocumentationUrls"]
482        if isinstance(external_docs, list):
483            docs_urls.extend(
484                [
485                    ApiDocsUrl(
486                        title=doc["title"],
487                        url=doc["url"],
488                        source="registry_external_docs",
489                        doc_type=doc.get("type", "other"),
490                        requires_login=doc.get("requiresLogin", False),
491                    )
492                    for doc in external_docs
493                ]
494            )
495
496    return docs_urls
497
498
499def get_connector_api_docs_urls(connector_name: str) -> list[ApiDocsUrl]:
500    """Get API documentation URLs for a connector.
501
502    This function retrieves documentation URLs for a connector's upstream API from multiple sources:
503    - Registry metadata (documentationUrl, externalDocumentationUrls)
504    - Connector manifest.yaml file (data.externalDocumentationUrls)
505
506    Args:
507        connector_name: The canonical connector name (e.g., "source-facebook-marketing")
508
509    Returns:
510        List of ApiDocsUrl objects with documentation URLs, deduplicated by URL.
511
512    Raises:
513        AirbyteConnectorNotRegisteredError: If the connector is not found in the registry.
514    """
515    if connector_name not in get_available_connectors(InstallType.ANY):
516        raise exc.AirbyteConnectorNotRegisteredError(
517            connector_name=connector_name,
518            context={
519                "registry_url": _get_registry_url(),
520                "available_connectors": get_available_connectors(InstallType.ANY),
521            },
522        )
523
524    docs_urls: list[ApiDocsUrl] = []
525
526    registry_urls = _extract_docs_from_registry(connector_name)
527    docs_urls.extend(registry_urls)
528
529    manifest_url = _manifest_url_for(connector_name)
530    manifest_data = _fetch_manifest_dict(manifest_url)
531    manifest_urls = ApiDocsUrl.from_manifest_dict(manifest_data)
532    docs_urls.extend(manifest_urls)
533
534    seen_urls = set()
535    unique_docs_urls = []
536    for doc_url in docs_urls:
537        if doc_url.url not in seen_urls:
538            seen_urls.add(doc_url.url)
539            unique_docs_urls.append(doc_url)
540
541    return unique_docs_urls
542
543
544def get_connector_version_history(
545    connector_name: str,
546    *,
547    num_versions_to_validate: int = 5,
548    timeout: int = 30,
549) -> list[ConnectorVersionInfo]:
550    """Get version history for a connector.
551
552    This function retrieves the version history for a connector by:
553    1. Scraping the changelog HTML from docs.airbyte.com
554    2. Parsing version information including PR URLs and titles
555    3. Overriding release dates for the most recent N versions with accurate
556       registry data
557
558    Args:
559        connector_name: Name of the connector (e.g., 'source-faker', 'destination-postgres')
560        num_versions_to_validate: Number of most recent versions to override with
561            registry release dates for accuracy. Defaults to 5.
562        timeout: Timeout in seconds for the changelog fetch. Defaults to 30.
563
564    Returns:
565        List of ConnectorVersionInfo objects, sorted by most recent first.
566
567    Raises:
568        AirbyteConnectorNotRegisteredError: If the connector is not found in the registry.
569
570    Example:
571        >>> versions = get_connector_version_history("source-faker", num_versions_to_validate=3)
572        >>> for v in versions[:5]:
573        ...     print(f"{v.version}: {v.release_date}")
574    """
575    if connector_name not in get_available_connectors(InstallType.ANY):
576        raise exc.AirbyteConnectorNotRegisteredError(
577            connector_name=connector_name,
578            context={
579                "registry_url": _get_registry_url(),
580                "available_connectors": get_available_connectors(InstallType.ANY),
581            },
582        )
583
584    connector_type = "sources" if connector_name.startswith("source-") else "destinations"
585    connector_short_name = connector_name.replace("source-", "").replace("destination-", "")
586
587    changelog_url = f"https://docs.airbyte.com/integrations/{connector_type}/{connector_short_name}"
588
589    try:
590        response = requests.get(
591            changelog_url,
592            headers={"User-Agent": f"PyAirbyte/{get_version()}"},
593            timeout=timeout,
594        )
595        response.raise_for_status()
596        html_content = response.text
597    except requests.exceptions.RequestException as e:
598        logger.warning(f"Failed to fetch changelog for {connector_name}: {e}")
599        return []
600
601    version_dicts = parse_changelog_html(html_content, connector_name)
602
603    if not version_dicts:
604        logger.warning(f"No versions found in changelog for {connector_name}")
605        return []
606
607    versions = [ConnectorVersionInfo(**version_dict) for version_dict in version_dicts]
608
609    for version_info in versions[:num_versions_to_validate]:
610        registry_date = fetch_registry_version_date(connector_name, version_info.version)
611        if registry_date:
612            version_info.release_date = registry_date
613            logger.debug(
614                f"Updated release date for {connector_name} v{version_info.version} "
615                f"from registry: {registry_date}"
616            )
617
618    return versions
logger = <Logger airbyte (INFO)>
class InstallType(builtins.str, enum.Enum):
49class InstallType(str, Enum):
50    """The type of installation for a connector."""
51
52    YAML = "yaml"
53    """Manifest-only connectors that can be run without Docker."""
54    PYTHON = "python"
55    """Python-based connectors available via PyPI."""
56    DOCKER = "docker"
57    """Docker-based connectors (returns all connectors for backward compatibility)."""
58    JAVA = "java"
59    """Java-based connectors."""
60
61    INSTALLABLE = "installable"
62    """Connectors installable in the current environment (environment-sensitive).
63
64    Returns all connectors if Docker is installed, otherwise only Python and YAML.
65    """
66    ANY = "any"
67    """All connectors in the registry (environment-independent)."""

The type of installation for a connector.

YAML = <InstallType.YAML: 'yaml'>

Manifest-only connectors that can be run without Docker.

PYTHON = <InstallType.PYTHON: 'python'>

Python-based connectors available via PyPI.

DOCKER = <InstallType.DOCKER: 'docker'>

Docker-based connectors (returns all connectors for backward compatibility).

JAVA = <InstallType.JAVA: 'java'>

Java-based connectors.

INSTALLABLE = <InstallType.INSTALLABLE: 'installable'>

Connectors installable in the current environment (environment-sensitive).

Returns all connectors if Docker is installed, otherwise only Python and YAML.

ANY = <InstallType.ANY: 'any'>

All connectors in the registry (environment-independent).

class Language(builtins.str, enum.Enum):
70class Language(str, Enum):
71    """The language of a connector."""
72
73    PYTHON = InstallType.PYTHON.value
74    JAVA = InstallType.JAVA.value
75    MANIFEST_ONLY = _MANIFEST_ONLY_LANGUAGE

The language of a connector.

PYTHON = <Language.PYTHON: 'python'>
JAVA = <Language.JAVA: 'java'>
MANIFEST_ONLY = <Language.MANIFEST_ONLY: 'manifest-only'>
class ConnectorMetadata(pydantic.main.BaseModel):
 78class ConnectorMetadata(BaseModel):
 79    """Metadata for a connector."""
 80
 81    name: str
 82    """Connector name. For example, "source-google-sheets"."""
 83
 84    display_name: str | None = None
 85    """Human-readable connector name."""
 86
 87    connector_type: str | None = None
 88    """Connector type: `source` or `destination`."""
 89
 90    definition_id: str | None = None
 91    """Source or destination definition ID."""
 92
 93    docker_repository: str | None = None
 94    """Docker repository for the connector image."""
 95
 96    latest_available_version: str | None
 97    """The latest available version of the connector."""
 98
 99    pypi_package_name: str | None
100    """The name of the PyPI package for the connector, if it exists."""
101
102    language: Language | None
103    """The language of the connector."""
104
105    install_types: set[InstallType]
106    """The supported install types for the connector."""
107
108    suggested_streams: list[str] | None = None
109    """A list of suggested streams for the connector, if available."""
110
111    support_level: str | None = None
112    """Connector support level."""
113
114    release_stage: str | None = None
115    """Connector release stage."""
116
117    source_type: str | None = None
118    """Connector subtype."""
119
120    documentation_url: str | None = None
121    """Connector documentation URL."""
122
123    release_date: str | None = None
124    """Connector release date."""
125
126    github_issue_label: str | None = None
127    """GitHub issue label for the connector."""
128
129    @property
130    def default_install_type(self) -> InstallType:
131        """Return the default install type for the connector."""
132        if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types:
133            return InstallType.YAML
134
135        if InstallType.PYTHON in self.install_types:
136            return InstallType.PYTHON
137
138        # Else: Java or Docker
139        return InstallType.DOCKER

Metadata for a connector.

name: str = PydanticUndefined

Connector name. For example, "source-google-sheets".

display_name: str | None = None

Human-readable connector name.

connector_type: str | None = None

Connector type: source or destination.

definition_id: str | None = None

Source or destination definition ID.

docker_repository: str | None = None

Docker repository for the connector image.

latest_available_version: str | None = PydanticUndefined

The latest available version of the connector.

pypi_package_name: str | None = PydanticUndefined

The name of the PyPI package for the connector, if it exists.

language: Language | None = PydanticUndefined

The language of the connector.

install_types: set[InstallType] = PydanticUndefined

The supported install types for the connector.

suggested_streams: list[str] | None = None

A list of suggested streams for the connector, if available.

support_level: str | None = None

Connector support level.

release_stage: str | None = None

Connector release stage.

source_type: str | None = None

Connector subtype.

documentation_url: str | None = None

Connector documentation URL.

release_date: str | None = None

Connector release date.

github_issue_label: str | None = None

GitHub issue label for the connector.

default_install_type: InstallType
129    @property
130    def default_install_type(self) -> InstallType:
131        """Return the default install type for the connector."""
132        if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types:
133            return InstallType.YAML
134
135        if InstallType.PYTHON in self.install_types:
136            return InstallType.PYTHON
137
138        # Else: Java or Docker
139        return InstallType.DOCKER

Return the default install type for the connector.

def get_connector_metadata(name: str) -> ConnectorMetadata | None:
263def get_connector_metadata(name: str) -> ConnectorMetadata | None:
264    """Check the cache for the connector.
265
266    If the cache is empty, populate by calling update_cache.
267    """
268    registry_url = _get_registry_url()
269
270    if _is_registry_disabled(registry_url):
271        return None
272
273    cache = copy(_get_registry_cache())
274
275    if not cache:
276        raise exc.PyAirbyteInternalError(
277            message="Connector registry could not be loaded.",
278            context={
279                "registry_url": _get_registry_url(),
280            },
281        )
282    if name not in cache:
283        raise exc.AirbyteConnectorNotRegisteredError(
284            connector_name=name,
285            context={
286                "registry_url": _get_registry_url(),
287                "available_connectors": get_available_connectors(),
288            },
289        )
290    return cache[name]

Check the cache for the connector.

If the cache is empty, populate by calling update_cache.

def get_available_connectors( install_type: InstallType | str | None = <InstallType.INSTALLABLE: 'installable'>) -> list[str]:
293def get_available_connectors(
294    install_type: InstallType | str | None = InstallType.INSTALLABLE,
295) -> list[str]:
296    """Return a list of all available connectors.
297
298    Connectors will be returned in alphabetical order, with the standard prefix "source-".
299
300    Args:
301        install_type: The type of installation for the connector.
302            Defaults to `InstallType.INSTALLABLE`.
303    """
304    if install_type is None or install_type == InstallType.INSTALLABLE:
305        # Filter for installable connectors (default behavior).
306        if is_docker_installed():
307            logger.info("Docker is detected. Returning all connectors.")
308            return sorted(_get_registry_cache().keys())
309
310        logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.")
311        return sorted(
312            [
313                connector_name
314                for connector_name, conn_info in _get_registry_cache().items()
315                if conn_info.language in {Language.PYTHON, Language.MANIFEST_ONLY}
316            ]
317        )
318
319    if not isinstance(install_type, InstallType):
320        install_type = InstallType(install_type)
321
322    if install_type == InstallType.PYTHON:
323        return sorted(
324            connector_name
325            for connector_name, conn_info in _get_registry_cache().items()
326            if conn_info.pypi_package_name is not None
327        )
328
329    if install_type == InstallType.JAVA:
330        warnings.warn(
331            message="Java connectors are not yet supported.",
332            stacklevel=2,
333        )
334        return sorted(
335            connector_name
336            for connector_name, conn_info in _get_registry_cache().items()
337            if conn_info.language == Language.JAVA
338        )
339
340    if install_type in {InstallType.DOCKER, InstallType.ANY}:
341        return sorted(_get_registry_cache().keys())
342
343    if install_type == InstallType.YAML:
344        return sorted(
345            conn.name
346            for conn in _get_registry_cache().values()
347            if InstallType.YAML in conn.install_types
348        )
349
350    # pragma: no cover  # Should never be reached.
351    raise exc.PyAirbyteInputError(
352        message="Invalid install type.",
353        context={
354            "install_type": install_type,
355        },
356    )

Return a list of all available connectors.

Connectors will be returned in alphabetical order, with the standard prefix "source-".

Arguments:
class ConnectorVersionInfo(pydantic.main.BaseModel):
359class ConnectorVersionInfo(BaseModel):
360    """Information about a specific connector version."""
361
362    version: str
363    release_date: str | None = None
364    docker_image_url: str
365    changelog_url: str
366    pr_url: str | None = None
367    pr_title: str | None = None
368    parsing_errors: list[str] = Field(default_factory=list)

Information about a specific connector version.

version: str = PydanticUndefined
release_date: str | None = None
docker_image_url: str = PydanticUndefined
changelog_url: str = PydanticUndefined
pr_url: str | None = None
pr_title: str | None = None
parsing_errors: list[str] = PydanticUndefined
class ApiDocsUrl(pydantic.main.BaseModel):
371class ApiDocsUrl(BaseModel):
372    """API documentation URL information."""
373
374    title: str
375    url: str
376    source: str
377    doc_type: str = Field(default="other", alias="type")
378    requires_login: bool = Field(default=False, alias="requiresLogin")
379
380    model_config = {"populate_by_name": True}
381
382    @classmethod
383    def from_manifest_dict(cls, manifest_data: dict[str, Any]) -> list[Self]:
384        """Extract documentation URLs from parsed manifest data.
385
386        Args:
387            manifest_data: The parsed manifest.yaml data as a dictionary
388
389        Returns:
390            List of ApiDocsUrl objects extracted from the manifest
391        """
392        results: list[Self] = []
393
394        data_section = manifest_data.get("data")
395        if isinstance(data_section, dict):
396            external_docs = data_section.get("externalDocumentationUrls")
397            if isinstance(external_docs, list):
398                results = [
399                    cls(
400                        title=doc["title"],
401                        url=doc["url"],
402                        source="data_external_docs",
403                        doc_type=doc.get("type", "other"),
404                        requires_login=doc.get("requiresLogin", False),
405                    )
406                    for doc in external_docs
407                ]
408
409        return results

API documentation URL information.

title: str = PydanticUndefined
url: str = PydanticUndefined
source: str = PydanticUndefined
doc_type: str = 'other'
requires_login: bool = False
@classmethod
def from_manifest_dict( cls, manifest_data: dict[str, typing.Any]) -> list[typing_extensions.Self]:
382    @classmethod
383    def from_manifest_dict(cls, manifest_data: dict[str, Any]) -> list[Self]:
384        """Extract documentation URLs from parsed manifest data.
385
386        Args:
387            manifest_data: The parsed manifest.yaml data as a dictionary
388
389        Returns:
390            List of ApiDocsUrl objects extracted from the manifest
391        """
392        results: list[Self] = []
393
394        data_section = manifest_data.get("data")
395        if isinstance(data_section, dict):
396            external_docs = data_section.get("externalDocumentationUrls")
397            if isinstance(external_docs, list):
398                results = [
399                    cls(
400                        title=doc["title"],
401                        url=doc["url"],
402                        source="data_external_docs",
403                        doc_type=doc.get("type", "other"),
404                        requires_login=doc.get("requiresLogin", False),
405                    )
406                    for doc in external_docs
407                ]
408
409        return results

Extract documentation URLs from parsed manifest data.

Arguments:
  • manifest_data: The parsed manifest.yaml data as a dictionary
Returns:

List of ApiDocsUrl objects extracted from the manifest

def get_connector_api_docs_urls(connector_name: str) -> list[ApiDocsUrl]:
500def get_connector_api_docs_urls(connector_name: str) -> list[ApiDocsUrl]:
501    """Get API documentation URLs for a connector.
502
503    This function retrieves documentation URLs for a connector's upstream API from multiple sources:
504    - Registry metadata (documentationUrl, externalDocumentationUrls)
505    - Connector manifest.yaml file (data.externalDocumentationUrls)
506
507    Args:
508        connector_name: The canonical connector name (e.g., "source-facebook-marketing")
509
510    Returns:
511        List of ApiDocsUrl objects with documentation URLs, deduplicated by URL.
512
513    Raises:
514        AirbyteConnectorNotRegisteredError: If the connector is not found in the registry.
515    """
516    if connector_name not in get_available_connectors(InstallType.ANY):
517        raise exc.AirbyteConnectorNotRegisteredError(
518            connector_name=connector_name,
519            context={
520                "registry_url": _get_registry_url(),
521                "available_connectors": get_available_connectors(InstallType.ANY),
522            },
523        )
524
525    docs_urls: list[ApiDocsUrl] = []
526
527    registry_urls = _extract_docs_from_registry(connector_name)
528    docs_urls.extend(registry_urls)
529
530    manifest_url = _manifest_url_for(connector_name)
531    manifest_data = _fetch_manifest_dict(manifest_url)
532    manifest_urls = ApiDocsUrl.from_manifest_dict(manifest_data)
533    docs_urls.extend(manifest_urls)
534
535    seen_urls = set()
536    unique_docs_urls = []
537    for doc_url in docs_urls:
538        if doc_url.url not in seen_urls:
539            seen_urls.add(doc_url.url)
540            unique_docs_urls.append(doc_url)
541
542    return unique_docs_urls

Get API documentation URLs for a connector.

This function retrieves documentation URLs for a connector's upstream API from multiple sources:

  • Registry metadata (documentationUrl, externalDocumentationUrls)
  • Connector manifest.yaml file (data.externalDocumentationUrls)
Arguments:
  • connector_name: The canonical connector name (e.g., "source-facebook-marketing")
Returns:

List of ApiDocsUrl objects with documentation URLs, deduplicated by URL.

Raises:
  • AirbyteConnectorNotRegisteredError: If the connector is not found in the registry.
def get_connector_version_history( connector_name: str, *, num_versions_to_validate: int = 5, timeout: int = 30) -> list[ConnectorVersionInfo]:
545def get_connector_version_history(
546    connector_name: str,
547    *,
548    num_versions_to_validate: int = 5,
549    timeout: int = 30,
550) -> list[ConnectorVersionInfo]:
551    """Get version history for a connector.
552
553    This function retrieves the version history for a connector by:
554    1. Scraping the changelog HTML from docs.airbyte.com
555    2. Parsing version information including PR URLs and titles
556    3. Overriding release dates for the most recent N versions with accurate
557       registry data
558
559    Args:
560        connector_name: Name of the connector (e.g., 'source-faker', 'destination-postgres')
561        num_versions_to_validate: Number of most recent versions to override with
562            registry release dates for accuracy. Defaults to 5.
563        timeout: Timeout in seconds for the changelog fetch. Defaults to 30.
564
565    Returns:
566        List of ConnectorVersionInfo objects, sorted by most recent first.
567
568    Raises:
569        AirbyteConnectorNotRegisteredError: If the connector is not found in the registry.
570
571    Example:
572        >>> versions = get_connector_version_history("source-faker", num_versions_to_validate=3)
573        >>> for v in versions[:5]:
574        ...     print(f"{v.version}: {v.release_date}")
575    """
576    if connector_name not in get_available_connectors(InstallType.ANY):
577        raise exc.AirbyteConnectorNotRegisteredError(
578            connector_name=connector_name,
579            context={
580                "registry_url": _get_registry_url(),
581                "available_connectors": get_available_connectors(InstallType.ANY),
582            },
583        )
584
585    connector_type = "sources" if connector_name.startswith("source-") else "destinations"
586    connector_short_name = connector_name.replace("source-", "").replace("destination-", "")
587
588    changelog_url = f"https://docs.airbyte.com/integrations/{connector_type}/{connector_short_name}"
589
590    try:
591        response = requests.get(
592            changelog_url,
593            headers={"User-Agent": f"PyAirbyte/{get_version()}"},
594            timeout=timeout,
595        )
596        response.raise_for_status()
597        html_content = response.text
598    except requests.exceptions.RequestException as e:
599        logger.warning(f"Failed to fetch changelog for {connector_name}: {e}")
600        return []
601
602    version_dicts = parse_changelog_html(html_content, connector_name)
603
604    if not version_dicts:
605        logger.warning(f"No versions found in changelog for {connector_name}")
606        return []
607
608    versions = [ConnectorVersionInfo(**version_dict) for version_dict in version_dicts]
609
610    for version_info in versions[:num_versions_to_validate]:
611        registry_date = fetch_registry_version_date(connector_name, version_info.version)
612        if registry_date:
613            version_info.release_date = registry_date
614            logger.debug(
615                f"Updated release date for {connector_name} v{version_info.version} "
616                f"from registry: {registry_date}"
617            )
618
619    return versions

Get version history for a connector.

This function retrieves the version history for a connector by:

  1. Scraping the changelog HTML from docs.airbyte.com
  2. Parsing version information including PR URLs and titles
  3. Overriding release dates for the most recent N versions with accurate registry data
Arguments:
  • connector_name: Name of the connector (e.g., 'source-faker', 'destination-postgres')
  • num_versions_to_validate: Number of most recent versions to override with registry release dates for accuracy. Defaults to 5.
  • timeout: Timeout in seconds for the changelog fetch. Defaults to 30.
Returns:

List of ConnectorVersionInfo objects, sorted by most recent first.

Raises:
  • AirbyteConnectorNotRegisteredError: If the connector is not found in the registry.
Example:
>>> versions = get_connector_version_history("source-faker", num_versions_to_validate=3)
>>> for v in versions[:5]:
...     print(f"{v.version}: {v.release_date}")