airbyte.registry

Connectivity to the connector catalog registry.

  1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  2"""Connectivity to the connector catalog registry."""
  3
  4from __future__ import annotations
  5
  6import json
  7import logging
  8import os
  9import warnings
 10from copy import copy
 11from enum import Enum
 12from pathlib import Path
 13from typing import Any, cast
 14
 15import requests
 16import yaml
 17from pydantic import BaseModel, Field
 18from typing_extensions import Self
 19
 20from airbyte import exceptions as exc
 21from airbyte._registry_utils import fetch_registry_version_date, parse_changelog_html
 22from airbyte._util.meta import is_docker_installed
 23from airbyte.constants import AIRBYTE_OFFLINE_MODE
 24from airbyte.logs import warn_once
 25from airbyte.version import get_version
 26
 27
 28logger = logging.getLogger("airbyte")
 29
 30
 31__cache: dict[str, ConnectorMetadata] | None = None
 32
 33
 34_REGISTRY_ENV_VAR = "AIRBYTE_LOCAL_REGISTRY"
 35_REGISTRY_URL = "https://connectors.airbyte.com/files/registries/v0/oss_registry.json"
 36
 37_PYTHON_LANGUAGE = "python"
 38_MANIFEST_ONLY_LANGUAGE = "manifest-only"
 39
 40_PYTHON_LANGUAGE_TAG = f"language:{_PYTHON_LANGUAGE}"
 41_MANIFEST_ONLY_TAG = f"language:{_MANIFEST_ONLY_LANGUAGE}"
 42
 43_DEFAULT_MANIFEST_URL = (
 44    "https://connectors.airbyte.com/files/metadata/airbyte/{source_name}/{version}/manifest.yaml"
 45)
 46
 47
 48class InstallType(str, Enum):
 49    """The type of installation for a connector."""
 50
 51    YAML = "yaml"
 52    """Manifest-only connectors that can be run without Docker."""
 53    PYTHON = "python"
 54    """Python-based connectors available via PyPI."""
 55    DOCKER = "docker"
 56    """Docker-based connectors (returns all connectors for backward compatibility)."""
 57    JAVA = "java"
 58    """Java-based connectors."""
 59
 60    INSTALLABLE = "installable"
 61    """Connectors installable in the current environment (environment-sensitive).
 62
 63    Returns all connectors if Docker is installed, otherwise only Python and YAML.
 64    """
 65    ANY = "any"
 66    """All connectors in the registry (environment-independent)."""
 67
 68
 69class Language(str, Enum):
 70    """The language of a connector."""
 71
 72    PYTHON = InstallType.PYTHON.value
 73    JAVA = InstallType.JAVA.value
 74    MANIFEST_ONLY = _MANIFEST_ONLY_LANGUAGE
 75
 76
 77class ConnectorMetadata(BaseModel):
 78    """Metadata for a connector."""
 79
 80    name: str
 81    """Connector name. For example, "source-google-sheets"."""
 82
 83    latest_available_version: str | None
 84    """The latest available version of the connector."""
 85
 86    pypi_package_name: str | None
 87    """The name of the PyPI package for the connector, if it exists."""
 88
 89    language: Language | None
 90    """The language of the connector."""
 91
 92    install_types: set[InstallType]
 93    """The supported install types for the connector."""
 94
 95    suggested_streams: list[str] | None = None
 96    """A list of suggested streams for the connector, if available."""
 97
 98    @property
 99    def default_install_type(self) -> InstallType:
100        """Return the default install type for the connector."""
101        if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types:
102            return InstallType.YAML
103
104        if InstallType.PYTHON in self.install_types:
105            return InstallType.PYTHON
106
107        # Else: Java or Docker
108        return InstallType.DOCKER
109
110
111def _get_registry_url() -> str:
112    if _REGISTRY_ENV_VAR in os.environ:
113        return str(os.environ.get(_REGISTRY_ENV_VAR))
114
115    return _REGISTRY_URL
116
117
118def _is_registry_disabled(url: str) -> bool:
119    return url.upper() in {"0", "F", "FALSE"} or AIRBYTE_OFFLINE_MODE
120
121
122def _registry_entry_to_connector_metadata(entry: dict) -> ConnectorMetadata:
123    name = entry["dockerRepository"].replace("airbyte/", "")
124    latest_version: str | None = entry.get("dockerImageTag")
125    tags = entry.get("tags", [])
126    language: Language | None = None
127
128    if "language" in entry and entry["language"] is not None:
129        try:
130            language = Language(entry["language"])
131        except Exception:
132            warnings.warn(
133                message=f"Invalid language for connector {name}: {entry['language']}",
134                stacklevel=2,
135            )
136    if not language and _PYTHON_LANGUAGE_TAG in tags:
137        language = Language.PYTHON
138    if not language and _MANIFEST_ONLY_TAG in tags:
139        language = Language.MANIFEST_ONLY
140
141    remote_registries: dict = entry.get("remoteRegistries", {})
142    pypi_registry: dict = remote_registries.get("pypi", {})
143    pypi_package_name = cast(
144        "str | None",
145        pypi_registry.get("packageName", None),
146    )
147    pypi_enabled: bool = pypi_registry.get("enabled", False)
148    install_types: set[InstallType] = {
149        x
150        for x in [
151            InstallType.DOCKER,  # Always True
152            InstallType.PYTHON if language == Language.PYTHON and pypi_enabled else None,
153            InstallType.JAVA if language == Language.JAVA else None,
154            InstallType.YAML if language == Language.MANIFEST_ONLY else None,
155        ]
156        if x
157    }
158
159    return ConnectorMetadata(
160        name=name,
161        latest_available_version=latest_version,
162        pypi_package_name=pypi_package_name if pypi_enabled else None,
163        language=language,
164        install_types=install_types,
165        suggested_streams=entry.get("suggestedStreams", {}).get("streams", None),
166    )
167
168
169def _get_registry_cache(
170    *,
171    force_refresh: bool = False,
172) -> dict[str, ConnectorMetadata]:
173    """Return the registry cache.
174
175    Result is a mapping of connector name to ConnectorMetadata.
176    """
177    global __cache
178    if __cache and not force_refresh:
179        return __cache
180
181    registry_url = _get_registry_url()
182
183    if _is_registry_disabled(registry_url):
184        return {}
185
186    if registry_url.startswith("http"):
187        response = requests.get(
188            registry_url,
189            headers={"User-Agent": f"PyAirbyte/{get_version()}"},
190        )
191        response.raise_for_status()
192        data = response.json()
193    else:
194        # Assume local file
195        with Path(registry_url).open(encoding="utf-8") as f:
196            data = json.load(f)
197
198    new_cache: dict[str, ConnectorMetadata] = {}
199
200    for connector in data["sources"]:
201        connector_metadata = _registry_entry_to_connector_metadata(connector)
202        new_cache[connector_metadata.name] = connector_metadata
203
204    for connector in data["destinations"]:
205        connector_metadata = _registry_entry_to_connector_metadata(connector)
206        new_cache[connector_metadata.name] = connector_metadata
207
208    if len(new_cache) == 0:
209        # This isn't necessarily fatal, since users can bring their own
210        # connector definitions.
211        warn_once(
212            message=f"Connector registry is empty: {registry_url}",
213            with_stack=False,
214        )
215
216    __cache = new_cache
217    return __cache
218
219
220def get_connector_metadata(name: str) -> ConnectorMetadata | None:
221    """Check the cache for the connector.
222
223    If the cache is empty, populate by calling update_cache.
224    """
225    registry_url = _get_registry_url()
226
227    if _is_registry_disabled(registry_url):
228        return None
229
230    cache = copy(_get_registry_cache())
231
232    if not cache:
233        raise exc.PyAirbyteInternalError(
234            message="Connector registry could not be loaded.",
235            context={
236                "registry_url": _get_registry_url(),
237            },
238        )
239    if name not in cache:
240        raise exc.AirbyteConnectorNotRegisteredError(
241            connector_name=name,
242            context={
243                "registry_url": _get_registry_url(),
244                "available_connectors": get_available_connectors(),
245            },
246        )
247    return cache[name]
248
249
250def get_available_connectors(
251    install_type: InstallType | str | None = InstallType.INSTALLABLE,
252) -> list[str]:
253    """Return a list of all available connectors.
254
255    Connectors will be returned in alphabetical order, with the standard prefix "source-".
256
257    Args:
258        install_type: The type of installation for the connector.
259            Defaults to `InstallType.INSTALLABLE`.
260    """
261    if install_type is None or install_type == InstallType.INSTALLABLE:
262        # Filter for installable connectors (default behavior).
263        if is_docker_installed():
264            logger.info("Docker is detected. Returning all connectors.")
265            return sorted(_get_registry_cache().keys())
266
267        logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.")
268        return sorted(
269            [
270                connector_name
271                for connector_name, conn_info in _get_registry_cache().items()
272                if conn_info.language in {Language.PYTHON, Language.MANIFEST_ONLY}
273            ]
274        )
275
276    if not isinstance(install_type, InstallType):
277        install_type = InstallType(install_type)
278
279    if install_type == InstallType.PYTHON:
280        return sorted(
281            connector_name
282            for connector_name, conn_info in _get_registry_cache().items()
283            if conn_info.pypi_package_name is not None
284        )
285
286    if install_type == InstallType.JAVA:
287        warnings.warn(
288            message="Java connectors are not yet supported.",
289            stacklevel=2,
290        )
291        return sorted(
292            connector_name
293            for connector_name, conn_info in _get_registry_cache().items()
294            if conn_info.language == Language.JAVA
295        )
296
297    if install_type in {InstallType.DOCKER, InstallType.ANY}:
298        return sorted(_get_registry_cache().keys())
299
300    if install_type == InstallType.YAML:
301        return sorted(
302            conn.name
303            for conn in _get_registry_cache().values()
304            if InstallType.YAML in conn.install_types
305        )
306
307    # pragma: no cover  # Should never be reached.
308    raise exc.PyAirbyteInputError(
309        message="Invalid install type.",
310        context={
311            "install_type": install_type,
312        },
313    )
314
315
316class ConnectorVersionInfo(BaseModel):
317    """Information about a specific connector version."""
318
319    version: str
320    release_date: str | None = None
321    docker_image_url: str
322    changelog_url: str
323    pr_url: str | None = None
324    pr_title: str | None = None
325    parsing_errors: list[str] = Field(default_factory=list)
326
327
328class ApiDocsUrl(BaseModel):
329    """API documentation URL information."""
330
331    title: str
332    url: str
333    source: str
334    doc_type: str = Field(default="other", alias="type")
335    requires_login: bool = Field(default=False, alias="requiresLogin")
336
337    model_config = {"populate_by_name": True}
338
339    @classmethod
340    def from_manifest_dict(cls, manifest_data: dict[str, Any]) -> list[Self]:
341        """Extract documentation URLs from parsed manifest data.
342
343        Args:
344            manifest_data: The parsed manifest.yaml data as a dictionary
345
346        Returns:
347            List of ApiDocsUrl objects extracted from the manifest
348        """
349        results: list[Self] = []
350
351        data_section = manifest_data.get("data")
352        if isinstance(data_section, dict):
353            external_docs = data_section.get("externalDocumentationUrls")
354            if isinstance(external_docs, list):
355                results = [
356                    cls(
357                        title=doc["title"],
358                        url=doc["url"],
359                        source="data_external_docs",
360                        doc_type=doc.get("type", "other"),
361                        requires_login=doc.get("requiresLogin", False),
362                    )
363                    for doc in external_docs
364                ]
365
366        return results
367
368
369def _manifest_url_for(connector_name: str) -> str:
370    """Get the expected URL of the manifest.yaml file for a connector.
371
372    Args:
373        connector_name: The canonical connector name (e.g., "source-facebook-marketing")
374
375    Returns:
376        The URL to the connector's manifest.yaml file
377    """
378    return _DEFAULT_MANIFEST_URL.format(
379        source_name=connector_name,
380        version="latest",
381    )
382
383
384def _fetch_manifest_dict(url: str) -> dict[str, Any]:
385    """Fetch and parse a manifest.yaml file from a URL.
386
387    Args:
388        url: The URL to fetch the manifest from
389
390    Returns:
391        The parsed manifest data as a dictionary, or empty dict if manifest not found (404)
392
393    Raises:
394        HTTPError: If the request fails with a non-404 status code
395    """
396    http_not_found = 404
397
398    response = requests.get(url, timeout=10)
399    if response.status_code == http_not_found:
400        return {}
401
402    response.raise_for_status()
403    return yaml.safe_load(response.text) or {}
404
405
406def _extract_docs_from_registry(connector_name: str) -> list[ApiDocsUrl]:
407    """Extract documentation URLs from connector registry metadata.
408
409    Args:
410        connector_name: The canonical connector name (e.g., "source-facebook-marketing")
411
412    Returns:
413        List of ApiDocsUrl objects extracted from the registry
414    """
415    registry_url = _get_registry_url()
416    response = requests.get(registry_url, timeout=10)
417    response.raise_for_status()
418    registry_data = response.json()
419
420    connector_list = registry_data.get("sources", []) + registry_data.get("destinations", [])
421    connector_entry = None
422    for entry in connector_list:
423        if entry.get("dockerRepository", "").endswith(f"/{connector_name}"):
424            connector_entry = entry
425            break
426
427    docs_urls = []
428
429    if connector_entry and "documentationUrl" in connector_entry:
430        docs_urls.append(
431            ApiDocsUrl(
432                title="Airbyte Documentation",
433                url=connector_entry["documentationUrl"],
434                source="registry",
435            )
436        )
437
438    if connector_entry and "externalDocumentationUrls" in connector_entry:
439        external_docs = connector_entry["externalDocumentationUrls"]
440        if isinstance(external_docs, list):
441            docs_urls.extend(
442                [
443                    ApiDocsUrl(
444                        title=doc["title"],
445                        url=doc["url"],
446                        source="registry_external_docs",
447                        doc_type=doc.get("type", "other"),
448                        requires_login=doc.get("requiresLogin", False),
449                    )
450                    for doc in external_docs
451                ]
452            )
453
454    return docs_urls
455
456
457def get_connector_api_docs_urls(connector_name: str) -> list[ApiDocsUrl]:
458    """Get API documentation URLs for a connector.
459
460    This function retrieves documentation URLs for a connector's upstream API from multiple sources:
461    - Registry metadata (documentationUrl, externalDocumentationUrls)
462    - Connector manifest.yaml file (data.externalDocumentationUrls)
463
464    Args:
465        connector_name: The canonical connector name (e.g., "source-facebook-marketing")
466
467    Returns:
468        List of ApiDocsUrl objects with documentation URLs, deduplicated by URL.
469
470    Raises:
471        AirbyteConnectorNotRegisteredError: If the connector is not found in the registry.
472    """
473    if connector_name not in get_available_connectors(InstallType.ANY):
474        raise exc.AirbyteConnectorNotRegisteredError(
475            connector_name=connector_name,
476            context={
477                "registry_url": _get_registry_url(),
478                "available_connectors": get_available_connectors(InstallType.ANY),
479            },
480        )
481
482    docs_urls: list[ApiDocsUrl] = []
483
484    registry_urls = _extract_docs_from_registry(connector_name)
485    docs_urls.extend(registry_urls)
486
487    manifest_url = _manifest_url_for(connector_name)
488    manifest_data = _fetch_manifest_dict(manifest_url)
489    manifest_urls = ApiDocsUrl.from_manifest_dict(manifest_data)
490    docs_urls.extend(manifest_urls)
491
492    seen_urls = set()
493    unique_docs_urls = []
494    for doc_url in docs_urls:
495        if doc_url.url not in seen_urls:
496            seen_urls.add(doc_url.url)
497            unique_docs_urls.append(doc_url)
498
499    return unique_docs_urls
500
501
502def get_connector_version_history(
503    connector_name: str,
504    *,
505    num_versions_to_validate: int = 5,
506    timeout: int = 30,
507) -> list[ConnectorVersionInfo]:
508    """Get version history for a connector.
509
510    This function retrieves the version history for a connector by:
511    1. Scraping the changelog HTML from docs.airbyte.com
512    2. Parsing version information including PR URLs and titles
513    3. Overriding release dates for the most recent N versions with accurate
514       registry data
515
516    Args:
517        connector_name: Name of the connector (e.g., 'source-faker', 'destination-postgres')
518        num_versions_to_validate: Number of most recent versions to override with
519            registry release dates for accuracy. Defaults to 5.
520        timeout: Timeout in seconds for the changelog fetch. Defaults to 30.
521
522    Returns:
523        List of ConnectorVersionInfo objects, sorted by most recent first.
524
525    Raises:
526        AirbyteConnectorNotRegisteredError: If the connector is not found in the registry.
527
528    Example:
529        >>> versions = get_connector_version_history("source-faker", num_versions_to_validate=3)
530        >>> for v in versions[:5]:
531        ...     print(f"{v.version}: {v.release_date}")
532    """
533    if connector_name not in get_available_connectors(InstallType.ANY):
534        raise exc.AirbyteConnectorNotRegisteredError(
535            connector_name=connector_name,
536            context={
537                "registry_url": _get_registry_url(),
538                "available_connectors": get_available_connectors(InstallType.ANY),
539            },
540        )
541
542    connector_type = "sources" if connector_name.startswith("source-") else "destinations"
543    connector_short_name = connector_name.replace("source-", "").replace("destination-", "")
544
545    changelog_url = f"https://docs.airbyte.com/integrations/{connector_type}/{connector_short_name}"
546
547    try:
548        response = requests.get(
549            changelog_url,
550            headers={"User-Agent": f"PyAirbyte/{get_version()}"},
551            timeout=timeout,
552        )
553        response.raise_for_status()
554        html_content = response.text
555    except requests.exceptions.RequestException as e:
556        logger.warning(f"Failed to fetch changelog for {connector_name}: {e}")
557        return []
558
559    version_dicts = parse_changelog_html(html_content, connector_name)
560
561    if not version_dicts:
562        logger.warning(f"No versions found in changelog for {connector_name}")
563        return []
564
565    versions = [ConnectorVersionInfo(**version_dict) for version_dict in version_dicts]
566
567    for version_info in versions[:num_versions_to_validate]:
568        registry_date = fetch_registry_version_date(connector_name, version_info.version)
569        if registry_date:
570            version_info.release_date = registry_date
571            logger.debug(
572                f"Updated release date for {connector_name} v{version_info.version} "
573                f"from registry: {registry_date}"
574            )
575
576    return versions
logger = <Logger airbyte (INFO)>
class InstallType(builtins.str, enum.Enum):
49class InstallType(str, Enum):
50    """The type of installation for a connector."""
51
52    YAML = "yaml"
53    """Manifest-only connectors that can be run without Docker."""
54    PYTHON = "python"
55    """Python-based connectors available via PyPI."""
56    DOCKER = "docker"
57    """Docker-based connectors (returns all connectors for backward compatibility)."""
58    JAVA = "java"
59    """Java-based connectors."""
60
61    INSTALLABLE = "installable"
62    """Connectors installable in the current environment (environment-sensitive).
63
64    Returns all connectors if Docker is installed, otherwise only Python and YAML.
65    """
66    ANY = "any"
67    """All connectors in the registry (environment-independent)."""

The type of installation for a connector.

YAML = <InstallType.YAML: 'yaml'>

Manifest-only connectors that can be run without Docker.

PYTHON = <InstallType.PYTHON: 'python'>

Python-based connectors available via PyPI.

DOCKER = <InstallType.DOCKER: 'docker'>

Docker-based connectors (returns all connectors for backward compatibility).

JAVA = <InstallType.JAVA: 'java'>

Java-based connectors.

INSTALLABLE = <InstallType.INSTALLABLE: 'installable'>

Connectors installable in the current environment (environment-sensitive).

Returns all connectors if Docker is installed, otherwise only Python and YAML.

ANY = <InstallType.ANY: 'any'>

All connectors in the registry (environment-independent).

class Language(builtins.str, enum.Enum):
70class Language(str, Enum):
71    """The language of a connector."""
72
73    PYTHON = InstallType.PYTHON.value
74    JAVA = InstallType.JAVA.value
75    MANIFEST_ONLY = _MANIFEST_ONLY_LANGUAGE

The language of a connector.

PYTHON = <Language.PYTHON: 'python'>
JAVA = <Language.JAVA: 'java'>
MANIFEST_ONLY = <Language.MANIFEST_ONLY: 'manifest-only'>
class ConnectorMetadata(pydantic.main.BaseModel):
 78class ConnectorMetadata(BaseModel):
 79    """Metadata for a connector."""
 80
 81    name: str
 82    """Connector name. For example, "source-google-sheets"."""
 83
 84    latest_available_version: str | None
 85    """The latest available version of the connector."""
 86
 87    pypi_package_name: str | None
 88    """The name of the PyPI package for the connector, if it exists."""
 89
 90    language: Language | None
 91    """The language of the connector."""
 92
 93    install_types: set[InstallType]
 94    """The supported install types for the connector."""
 95
 96    suggested_streams: list[str] | None = None
 97    """A list of suggested streams for the connector, if available."""
 98
 99    @property
100    def default_install_type(self) -> InstallType:
101        """Return the default install type for the connector."""
102        if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types:
103            return InstallType.YAML
104
105        if InstallType.PYTHON in self.install_types:
106            return InstallType.PYTHON
107
108        # Else: Java or Docker
109        return InstallType.DOCKER

Metadata for a connector.

name: str = PydanticUndefined

Connector name. For example, "source-google-sheets".

latest_available_version: str | None = PydanticUndefined

The latest available version of the connector.

pypi_package_name: str | None = PydanticUndefined

The name of the PyPI package for the connector, if it exists.

language: Language | None = PydanticUndefined

The language of the connector.

install_types: set[InstallType] = PydanticUndefined

The supported install types for the connector.

suggested_streams: list[str] | None = None

A list of suggested streams for the connector, if available.

default_install_type: InstallType
 99    @property
100    def default_install_type(self) -> InstallType:
101        """Return the default install type for the connector."""
102        if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types:
103            return InstallType.YAML
104
105        if InstallType.PYTHON in self.install_types:
106            return InstallType.PYTHON
107
108        # Else: Java or Docker
109        return InstallType.DOCKER

Return the default install type for the connector.

def get_connector_metadata(name: str) -> ConnectorMetadata | None:
221def get_connector_metadata(name: str) -> ConnectorMetadata | None:
222    """Check the cache for the connector.
223
224    If the cache is empty, populate by calling update_cache.
225    """
226    registry_url = _get_registry_url()
227
228    if _is_registry_disabled(registry_url):
229        return None
230
231    cache = copy(_get_registry_cache())
232
233    if not cache:
234        raise exc.PyAirbyteInternalError(
235            message="Connector registry could not be loaded.",
236            context={
237                "registry_url": _get_registry_url(),
238            },
239        )
240    if name not in cache:
241        raise exc.AirbyteConnectorNotRegisteredError(
242            connector_name=name,
243            context={
244                "registry_url": _get_registry_url(),
245                "available_connectors": get_available_connectors(),
246            },
247        )
248    return cache[name]

Check the cache for the connector.

If the cache is empty, populate by calling update_cache.

def get_available_connectors( install_type: InstallType | str | None = <InstallType.INSTALLABLE: 'installable'>) -> list[str]:
251def get_available_connectors(
252    install_type: InstallType | str | None = InstallType.INSTALLABLE,
253) -> list[str]:
254    """Return a list of all available connectors.
255
256    Connectors will be returned in alphabetical order, with the standard prefix "source-".
257
258    Args:
259        install_type: The type of installation for the connector.
260            Defaults to `InstallType.INSTALLABLE`.
261    """
262    if install_type is None or install_type == InstallType.INSTALLABLE:
263        # Filter for installable connectors (default behavior).
264        if is_docker_installed():
265            logger.info("Docker is detected. Returning all connectors.")
266            return sorted(_get_registry_cache().keys())
267
268        logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.")
269        return sorted(
270            [
271                connector_name
272                for connector_name, conn_info in _get_registry_cache().items()
273                if conn_info.language in {Language.PYTHON, Language.MANIFEST_ONLY}
274            ]
275        )
276
277    if not isinstance(install_type, InstallType):
278        install_type = InstallType(install_type)
279
280    if install_type == InstallType.PYTHON:
281        return sorted(
282            connector_name
283            for connector_name, conn_info in _get_registry_cache().items()
284            if conn_info.pypi_package_name is not None
285        )
286
287    if install_type == InstallType.JAVA:
288        warnings.warn(
289            message="Java connectors are not yet supported.",
290            stacklevel=2,
291        )
292        return sorted(
293            connector_name
294            for connector_name, conn_info in _get_registry_cache().items()
295            if conn_info.language == Language.JAVA
296        )
297
298    if install_type in {InstallType.DOCKER, InstallType.ANY}:
299        return sorted(_get_registry_cache().keys())
300
301    if install_type == InstallType.YAML:
302        return sorted(
303            conn.name
304            for conn in _get_registry_cache().values()
305            if InstallType.YAML in conn.install_types
306        )
307
308    # pragma: no cover  # Should never be reached.
309    raise exc.PyAirbyteInputError(
310        message="Invalid install type.",
311        context={
312            "install_type": install_type,
313        },
314    )

Return a list of all available connectors.

Connectors will be returned in alphabetical order, with the standard prefix "source-".

Arguments:
class ConnectorVersionInfo(pydantic.main.BaseModel):
317class ConnectorVersionInfo(BaseModel):
318    """Information about a specific connector version."""
319
320    version: str
321    release_date: str | None = None
322    docker_image_url: str
323    changelog_url: str
324    pr_url: str | None = None
325    pr_title: str | None = None
326    parsing_errors: list[str] = Field(default_factory=list)

Information about a specific connector version.

version: str = PydanticUndefined
release_date: str | None = None
docker_image_url: str = PydanticUndefined
changelog_url: str = PydanticUndefined
pr_url: str | None = None
pr_title: str | None = None
parsing_errors: list[str] = PydanticUndefined
class ApiDocsUrl(pydantic.main.BaseModel):
329class ApiDocsUrl(BaseModel):
330    """API documentation URL information."""
331
332    title: str
333    url: str
334    source: str
335    doc_type: str = Field(default="other", alias="type")
336    requires_login: bool = Field(default=False, alias="requiresLogin")
337
338    model_config = {"populate_by_name": True}
339
340    @classmethod
341    def from_manifest_dict(cls, manifest_data: dict[str, Any]) -> list[Self]:
342        """Extract documentation URLs from parsed manifest data.
343
344        Args:
345            manifest_data: The parsed manifest.yaml data as a dictionary
346
347        Returns:
348            List of ApiDocsUrl objects extracted from the manifest
349        """
350        results: list[Self] = []
351
352        data_section = manifest_data.get("data")
353        if isinstance(data_section, dict):
354            external_docs = data_section.get("externalDocumentationUrls")
355            if isinstance(external_docs, list):
356                results = [
357                    cls(
358                        title=doc["title"],
359                        url=doc["url"],
360                        source="data_external_docs",
361                        doc_type=doc.get("type", "other"),
362                        requires_login=doc.get("requiresLogin", False),
363                    )
364                    for doc in external_docs
365                ]
366
367        return results

API documentation URL information.

title: str = PydanticUndefined
url: str = PydanticUndefined
source: str = PydanticUndefined
doc_type: str = 'other'
requires_login: bool = False
@classmethod
def from_manifest_dict( cls, manifest_data: dict[str, typing.Any]) -> list[typing_extensions.Self]:
340    @classmethod
341    def from_manifest_dict(cls, manifest_data: dict[str, Any]) -> list[Self]:
342        """Extract documentation URLs from parsed manifest data.
343
344        Args:
345            manifest_data: The parsed manifest.yaml data as a dictionary
346
347        Returns:
348            List of ApiDocsUrl objects extracted from the manifest
349        """
350        results: list[Self] = []
351
352        data_section = manifest_data.get("data")
353        if isinstance(data_section, dict):
354            external_docs = data_section.get("externalDocumentationUrls")
355            if isinstance(external_docs, list):
356                results = [
357                    cls(
358                        title=doc["title"],
359                        url=doc["url"],
360                        source="data_external_docs",
361                        doc_type=doc.get("type", "other"),
362                        requires_login=doc.get("requiresLogin", False),
363                    )
364                    for doc in external_docs
365                ]
366
367        return results

Extract documentation URLs from parsed manifest data.

Arguments:
  • manifest_data: The parsed manifest.yaml data as a dictionary
Returns:

List of ApiDocsUrl objects extracted from the manifest

def get_connector_api_docs_urls(connector_name: str) -> list[ApiDocsUrl]:
458def get_connector_api_docs_urls(connector_name: str) -> list[ApiDocsUrl]:
459    """Get API documentation URLs for a connector.
460
461    This function retrieves documentation URLs for a connector's upstream API from multiple sources:
462    - Registry metadata (documentationUrl, externalDocumentationUrls)
463    - Connector manifest.yaml file (data.externalDocumentationUrls)
464
465    Args:
466        connector_name: The canonical connector name (e.g., "source-facebook-marketing")
467
468    Returns:
469        List of ApiDocsUrl objects with documentation URLs, deduplicated by URL.
470
471    Raises:
472        AirbyteConnectorNotRegisteredError: If the connector is not found in the registry.
473    """
474    if connector_name not in get_available_connectors(InstallType.ANY):
475        raise exc.AirbyteConnectorNotRegisteredError(
476            connector_name=connector_name,
477            context={
478                "registry_url": _get_registry_url(),
479                "available_connectors": get_available_connectors(InstallType.ANY),
480            },
481        )
482
483    docs_urls: list[ApiDocsUrl] = []
484
485    registry_urls = _extract_docs_from_registry(connector_name)
486    docs_urls.extend(registry_urls)
487
488    manifest_url = _manifest_url_for(connector_name)
489    manifest_data = _fetch_manifest_dict(manifest_url)
490    manifest_urls = ApiDocsUrl.from_manifest_dict(manifest_data)
491    docs_urls.extend(manifest_urls)
492
493    seen_urls = set()
494    unique_docs_urls = []
495    for doc_url in docs_urls:
496        if doc_url.url not in seen_urls:
497            seen_urls.add(doc_url.url)
498            unique_docs_urls.append(doc_url)
499
500    return unique_docs_urls

Get API documentation URLs for a connector.

This function retrieves documentation URLs for a connector's upstream API from multiple sources:

  • Registry metadata (documentationUrl, externalDocumentationUrls)
  • Connector manifest.yaml file (data.externalDocumentationUrls)
Arguments:
  • connector_name: The canonical connector name (e.g., "source-facebook-marketing")
Returns:

List of ApiDocsUrl objects with documentation URLs, deduplicated by URL.

Raises:
  • AirbyteConnectorNotRegisteredError: If the connector is not found in the registry.
def get_connector_version_history( connector_name: str, *, num_versions_to_validate: int = 5, timeout: int = 30) -> list[ConnectorVersionInfo]:
503def get_connector_version_history(
504    connector_name: str,
505    *,
506    num_versions_to_validate: int = 5,
507    timeout: int = 30,
508) -> list[ConnectorVersionInfo]:
509    """Get version history for a connector.
510
511    This function retrieves the version history for a connector by:
512    1. Scraping the changelog HTML from docs.airbyte.com
513    2. Parsing version information including PR URLs and titles
514    3. Overriding release dates for the most recent N versions with accurate
515       registry data
516
517    Args:
518        connector_name: Name of the connector (e.g., 'source-faker', 'destination-postgres')
519        num_versions_to_validate: Number of most recent versions to override with
520            registry release dates for accuracy. Defaults to 5.
521        timeout: Timeout in seconds for the changelog fetch. Defaults to 30.
522
523    Returns:
524        List of ConnectorVersionInfo objects, sorted by most recent first.
525
526    Raises:
527        AirbyteConnectorNotRegisteredError: If the connector is not found in the registry.
528
529    Example:
530        >>> versions = get_connector_version_history("source-faker", num_versions_to_validate=3)
531        >>> for v in versions[:5]:
532        ...     print(f"{v.version}: {v.release_date}")
533    """
534    if connector_name not in get_available_connectors(InstallType.ANY):
535        raise exc.AirbyteConnectorNotRegisteredError(
536            connector_name=connector_name,
537            context={
538                "registry_url": _get_registry_url(),
539                "available_connectors": get_available_connectors(InstallType.ANY),
540            },
541        )
542
543    connector_type = "sources" if connector_name.startswith("source-") else "destinations"
544    connector_short_name = connector_name.replace("source-", "").replace("destination-", "")
545
546    changelog_url = f"https://docs.airbyte.com/integrations/{connector_type}/{connector_short_name}"
547
548    try:
549        response = requests.get(
550            changelog_url,
551            headers={"User-Agent": f"PyAirbyte/{get_version()}"},
552            timeout=timeout,
553        )
554        response.raise_for_status()
555        html_content = response.text
556    except requests.exceptions.RequestException as e:
557        logger.warning(f"Failed to fetch changelog for {connector_name}: {e}")
558        return []
559
560    version_dicts = parse_changelog_html(html_content, connector_name)
561
562    if not version_dicts:
563        logger.warning(f"No versions found in changelog for {connector_name}")
564        return []
565
566    versions = [ConnectorVersionInfo(**version_dict) for version_dict in version_dicts]
567
568    for version_info in versions[:num_versions_to_validate]:
569        registry_date = fetch_registry_version_date(connector_name, version_info.version)
570        if registry_date:
571            version_info.release_date = registry_date
572            logger.debug(
573                f"Updated release date for {connector_name} v{version_info.version} "
574                f"from registry: {registry_date}"
575            )
576
577    return versions

Get version history for a connector.

This function retrieves the version history for a connector by:

  1. Scraping the changelog HTML from docs.airbyte.com
  2. Parsing version information including PR URLs and titles
  3. Overriding release dates for the most recent N versions with accurate registry data
Arguments:
  • connector_name: Name of the connector (e.g., 'source-faker', 'destination-postgres')
  • num_versions_to_validate: Number of most recent versions to override with registry release dates for accuracy. Defaults to 5.
  • timeout: Timeout in seconds for the changelog fetch. Defaults to 30.
Returns:

List of ConnectorVersionInfo objects, sorted by most recent first.

Raises:
  • AirbyteConnectorNotRegisteredError: If the connector is not found in the registry.
Example:
>>> versions = get_connector_version_history("source-faker", num_versions_to_validate=3)
>>> for v in versions[:5]:
...     print(f"{v.version}: {v.release_date}")