airbyte.registry

Connectivity to the connector catalog registry.

  1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  2"""Connectivity to the connector catalog registry."""
  3
  4from __future__ import annotations
  5
  6import json
  7import logging
  8import os
  9import warnings
 10from copy import copy
 11from enum import Enum
 12from pathlib import Path
 13from typing import Any, cast
 14
 15import requests
 16import yaml
 17from pydantic import BaseModel, Field
 18from typing_extensions import Self
 19
 20from airbyte import exceptions as exc
 21from airbyte._registry_utils import fetch_registry_version_date, parse_changelog_html
 22from airbyte._util.meta import is_docker_installed
 23from airbyte.constants import AIRBYTE_OFFLINE_MODE
 24from airbyte.logs import warn_once
 25from airbyte.version import get_version
 26
 27
 28logger = logging.getLogger("airbyte")
 29
 30
 31__cache: dict[str, ConnectorMetadata] | None = None
 32
 33
 34_REGISTRY_ENV_VAR = "AIRBYTE_LOCAL_REGISTRY"
 35_REGISTRY_URL = "https://connectors.airbyte.com/files/registries/v0/oss_registry.json"
 36
 37_PYTHON_LANGUAGE = "python"
 38_MANIFEST_ONLY_LANGUAGE = "manifest-only"
 39
 40_PYTHON_LANGUAGE_TAG = f"language:{_PYTHON_LANGUAGE}"
 41_MANIFEST_ONLY_TAG = f"language:{_MANIFEST_ONLY_LANGUAGE}"
 42
 43_DEFAULT_MANIFEST_URL = (
 44    "https://connectors.airbyte.com/files/metadata/airbyte/{source_name}/{version}/manifest.yaml"
 45)
 46
 47
 48class InstallType(str, Enum):
 49    """The type of installation for a connector."""
 50
 51    YAML = "yaml"
 52    PYTHON = "python"
 53    DOCKER = "docker"
 54    JAVA = "java"
 55
 56
 57class Language(str, Enum):
 58    """The language of a connector."""
 59
 60    PYTHON = InstallType.PYTHON.value
 61    JAVA = InstallType.JAVA.value
 62    MANIFEST_ONLY = _MANIFEST_ONLY_LANGUAGE
 63
 64
 65class ConnectorMetadata(BaseModel):
 66    """Metadata for a connector."""
 67
 68    name: str
 69    """Connector name. For example, "source-google-sheets"."""
 70
 71    latest_available_version: str | None
 72    """The latest available version of the connector."""
 73
 74    pypi_package_name: str | None
 75    """The name of the PyPI package for the connector, if it exists."""
 76
 77    language: Language | None
 78    """The language of the connector."""
 79
 80    install_types: set[InstallType]
 81    """The supported install types for the connector."""
 82
 83    suggested_streams: list[str] | None = None
 84    """A list of suggested streams for the connector, if available."""
 85
 86    @property
 87    def default_install_type(self) -> InstallType:
 88        """Return the default install type for the connector."""
 89        if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types:
 90            return InstallType.YAML
 91
 92        if InstallType.PYTHON in self.install_types:
 93            return InstallType.PYTHON
 94
 95        # Else: Java or Docker
 96        return InstallType.DOCKER
 97
 98
 99def _get_registry_url() -> str:
100    if _REGISTRY_ENV_VAR in os.environ:
101        return str(os.environ.get(_REGISTRY_ENV_VAR))
102
103    return _REGISTRY_URL
104
105
106def _is_registry_disabled(url: str) -> bool:
107    return url.upper() in {"0", "F", "FALSE"} or AIRBYTE_OFFLINE_MODE
108
109
110def _registry_entry_to_connector_metadata(entry: dict) -> ConnectorMetadata:
111    name = entry["dockerRepository"].replace("airbyte/", "")
112    latest_version: str | None = entry.get("dockerImageTag")
113    tags = entry.get("tags", [])
114    language: Language | None = None
115
116    if "language" in entry and entry["language"] is not None:
117        try:
118            language = Language(entry["language"])
119        except Exception:
120            warnings.warn(
121                message=f"Invalid language for connector {name}: {entry['language']}",
122                stacklevel=2,
123            )
124    if not language and _PYTHON_LANGUAGE_TAG in tags:
125        language = Language.PYTHON
126    if not language and _MANIFEST_ONLY_TAG in tags:
127        language = Language.MANIFEST_ONLY
128
129    remote_registries: dict = entry.get("remoteRegistries", {})
130    pypi_registry: dict = remote_registries.get("pypi", {})
131    pypi_package_name = cast(
132        "str | None",
133        pypi_registry.get("packageName", None),
134    )
135    pypi_enabled: bool = pypi_registry.get("enabled", False)
136    install_types: set[InstallType] = {
137        x
138        for x in [
139            InstallType.DOCKER,  # Always True
140            InstallType.PYTHON if language == Language.PYTHON and pypi_enabled else None,
141            InstallType.JAVA if language == Language.JAVA else None,
142            InstallType.YAML if language == Language.MANIFEST_ONLY else None,
143        ]
144        if x
145    }
146
147    return ConnectorMetadata(
148        name=name,
149        latest_available_version=latest_version,
150        pypi_package_name=pypi_package_name if pypi_enabled else None,
151        language=language,
152        install_types=install_types,
153        suggested_streams=entry.get("suggestedStreams", {}).get("streams", None),
154    )
155
156
157def _get_registry_cache(*, force_refresh: bool = False) -> dict[str, ConnectorMetadata]:
158    """Return the registry cache."""
159    global __cache
160    if __cache and not force_refresh:
161        return __cache
162
163    registry_url = _get_registry_url()
164
165    if _is_registry_disabled(registry_url):
166        return {}
167
168    if registry_url.startswith("http"):
169        response = requests.get(
170            registry_url,
171            headers={"User-Agent": f"PyAirbyte/{get_version()}"},
172        )
173        response.raise_for_status()
174        data = response.json()
175    else:
176        # Assume local file
177        with Path(registry_url).open(encoding="utf-8") as f:
178            data = json.load(f)
179
180    new_cache: dict[str, ConnectorMetadata] = {}
181
182    for connector in data["sources"]:
183        connector_metadata = _registry_entry_to_connector_metadata(connector)
184        new_cache[connector_metadata.name] = connector_metadata
185
186    for connector in data["destinations"]:
187        connector_metadata = _registry_entry_to_connector_metadata(connector)
188        new_cache[connector_metadata.name] = connector_metadata
189
190    if len(new_cache) == 0:
191        # This isn't necessarily fatal, since users can bring their own
192        # connector definitions.
193        warn_once(
194            message=f"Connector registry is empty: {registry_url}",
195            with_stack=False,
196        )
197
198    __cache = new_cache
199    return __cache
200
201
202def get_connector_metadata(name: str) -> ConnectorMetadata | None:
203    """Check the cache for the connector.
204
205    If the cache is empty, populate by calling update_cache.
206    """
207    registry_url = _get_registry_url()
208
209    if _is_registry_disabled(registry_url):
210        return None
211
212    cache = copy(_get_registry_cache())
213
214    if not cache:
215        raise exc.PyAirbyteInternalError(
216            message="Connector registry could not be loaded.",
217            context={
218                "registry_url": _get_registry_url(),
219            },
220        )
221    if name not in cache:
222        raise exc.AirbyteConnectorNotRegisteredError(
223            connector_name=name,
224            context={
225                "registry_url": _get_registry_url(),
226                "available_connectors": get_available_connectors(),
227            },
228        )
229    return cache[name]
230
231
232def get_available_connectors(install_type: InstallType | str | None = None) -> list[str]:
233    """Return a list of all available connectors.
234
235    Connectors will be returned in alphabetical order, with the standard prefix "source-".
236    """
237    if install_type is None:
238        # No install type specified. Filter for whatever is runnable.
239        if is_docker_installed():
240            logger.info("Docker is detected. Returning all connectors.")
241            # If Docker is available, return all connectors.
242            return sorted(conn.name for conn in _get_registry_cache().values())
243
244        logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.")
245
246        # If Docker is not available, return only Python and Manifest-based connectors.
247        return sorted(
248            conn.name
249            for conn in _get_registry_cache().values()
250            if conn.language in {Language.PYTHON, Language.MANIFEST_ONLY}
251        )
252
253    if not isinstance(install_type, InstallType):
254        install_type = InstallType(install_type)
255
256    if install_type == InstallType.PYTHON:
257        return sorted(
258            conn.name
259            for conn in _get_registry_cache().values()
260            if conn.pypi_package_name is not None
261        )
262
263    if install_type == InstallType.JAVA:
264        warnings.warn(
265            message="Java connectors are not yet supported.",
266            stacklevel=2,
267        )
268        return sorted(
269            conn.name for conn in _get_registry_cache().values() if conn.language == Language.JAVA
270        )
271
272    if install_type == InstallType.DOCKER:
273        return sorted(conn.name for conn in _get_registry_cache().values())
274
275    if install_type == InstallType.YAML:
276        return sorted(
277            conn.name
278            for conn in _get_registry_cache().values()
279            if InstallType.YAML in conn.install_types
280        )
281
282    # pragma: no cover  # Should never be reached.
283    raise exc.PyAirbyteInputError(
284        message="Invalid install type.",
285        context={
286            "install_type": install_type,
287        },
288    )
289
290
291class ConnectorVersionInfo(BaseModel):
292    """Information about a specific connector version."""
293
294    version: str
295    release_date: str | None = None
296    docker_image_url: str
297    changelog_url: str
298    pr_url: str | None = None
299    pr_title: str | None = None
300    parsing_errors: list[str] = Field(default_factory=list)
301
302
303class ApiDocsUrl(BaseModel):
304    """API documentation URL information."""
305
306    title: str
307    url: str
308    source: str
309    doc_type: str = Field(default="other", alias="type")
310    requires_login: bool = Field(default=False, alias="requiresLogin")
311
312    model_config = {"populate_by_name": True}
313
314    @classmethod
315    def from_manifest_dict(cls, manifest_data: dict[str, Any]) -> list[Self]:
316        """Extract documentation URLs from parsed manifest data.
317
318        Args:
319            manifest_data: The parsed manifest.yaml data as a dictionary
320
321        Returns:
322            List of ApiDocsUrl objects extracted from the manifest
323        """
324        results: list[Self] = []
325
326        data_section = manifest_data.get("data")
327        if isinstance(data_section, dict):
328            external_docs = data_section.get("externalDocumentationUrls")
329            if isinstance(external_docs, list):
330                results = [
331                    cls(
332                        title=doc["title"],
333                        url=doc["url"],
334                        source="data_external_docs",
335                        doc_type=doc.get("type", "other"),
336                        requires_login=doc.get("requiresLogin", False),
337                    )
338                    for doc in external_docs
339                ]
340
341        return results
342
343
344def _manifest_url_for(connector_name: str) -> str:
345    """Get the expected URL of the manifest.yaml file for a connector.
346
347    Args:
348        connector_name: The canonical connector name (e.g., "source-facebook-marketing")
349
350    Returns:
351        The URL to the connector's manifest.yaml file
352    """
353    return _DEFAULT_MANIFEST_URL.format(
354        source_name=connector_name,
355        version="latest",
356    )
357
358
359def _fetch_manifest_dict(url: str) -> dict[str, Any]:
360    """Fetch and parse a manifest.yaml file from a URL.
361
362    Args:
363        url: The URL to fetch the manifest from
364
365    Returns:
366        The parsed manifest data as a dictionary, or empty dict if manifest not found (404)
367
368    Raises:
369        HTTPError: If the request fails with a non-404 status code
370    """
371    http_not_found = 404
372
373    response = requests.get(url, timeout=10)
374    if response.status_code == http_not_found:
375        return {}
376
377    response.raise_for_status()
378    return yaml.safe_load(response.text) or {}
379
380
381def _extract_docs_from_registry(connector_name: str) -> list[ApiDocsUrl]:
382    """Extract documentation URLs from connector registry metadata.
383
384    Args:
385        connector_name: The canonical connector name (e.g., "source-facebook-marketing")
386
387    Returns:
388        List of ApiDocsUrl objects extracted from the registry
389    """
390    registry_url = _get_registry_url()
391    response = requests.get(registry_url, timeout=10)
392    response.raise_for_status()
393    registry_data = response.json()
394
395    connector_list = registry_data.get("sources", []) + registry_data.get("destinations", [])
396    connector_entry = None
397    for entry in connector_list:
398        if entry.get("dockerRepository", "").endswith(f"/{connector_name}"):
399            connector_entry = entry
400            break
401
402    docs_urls = []
403
404    if connector_entry and "documentationUrl" in connector_entry:
405        docs_urls.append(
406            ApiDocsUrl(
407                title="Airbyte Documentation",
408                url=connector_entry["documentationUrl"],
409                source="registry",
410            )
411        )
412
413    if connector_entry and "externalDocumentationUrls" in connector_entry:
414        external_docs = connector_entry["externalDocumentationUrls"]
415        if isinstance(external_docs, list):
416            docs_urls.extend(
417                [
418                    ApiDocsUrl(
419                        title=doc["title"],
420                        url=doc["url"],
421                        source="registry_external_docs",
422                        doc_type=doc.get("type", "other"),
423                        requires_login=doc.get("requiresLogin", False),
424                    )
425                    for doc in external_docs
426                ]
427            )
428
429    return docs_urls
430
431
432def get_connector_api_docs_urls(connector_name: str) -> list[ApiDocsUrl]:
433    """Get API documentation URLs for a connector.
434
435    This function retrieves documentation URLs for a connector's upstream API from multiple sources:
436    - Registry metadata (documentationUrl, externalDocumentationUrls)
437    - Connector manifest.yaml file (data.externalDocumentationUrls)
438
439    Args:
440        connector_name: The canonical connector name (e.g., "source-facebook-marketing")
441
442    Returns:
443        List of ApiDocsUrl objects with documentation URLs, deduplicated by URL.
444
445    Raises:
446        AirbyteConnectorNotRegisteredError: If the connector is not found in the registry.
447    """
448    if connector_name not in get_available_connectors(InstallType.DOCKER):
449        raise exc.AirbyteConnectorNotRegisteredError(
450            connector_name=connector_name,
451            context={
452                "registry_url": _get_registry_url(),
453                "available_connectors": get_available_connectors(InstallType.DOCKER),
454            },
455        )
456
457    docs_urls: list[ApiDocsUrl] = []
458
459    registry_urls = _extract_docs_from_registry(connector_name)
460    docs_urls.extend(registry_urls)
461
462    manifest_url = _manifest_url_for(connector_name)
463    manifest_data = _fetch_manifest_dict(manifest_url)
464    manifest_urls = ApiDocsUrl.from_manifest_dict(manifest_data)
465    docs_urls.extend(manifest_urls)
466
467    seen_urls = set()
468    unique_docs_urls = []
469    for doc_url in docs_urls:
470        if doc_url.url not in seen_urls:
471            seen_urls.add(doc_url.url)
472            unique_docs_urls.append(doc_url)
473
474    return unique_docs_urls
475
476
477def get_connector_version_history(
478    connector_name: str,
479    *,
480    num_versions_to_validate: int = 5,
481    timeout: int = 30,
482) -> list[ConnectorVersionInfo]:
483    """Get version history for a connector.
484
485    This function retrieves the version history for a connector by:
486    1. Scraping the changelog HTML from docs.airbyte.com
487    2. Parsing version information including PR URLs and titles
488    3. Overriding release dates for the most recent N versions with accurate
489       registry data
490
491    Args:
492        connector_name: Name of the connector (e.g., 'source-faker', 'destination-postgres')
493        num_versions_to_validate: Number of most recent versions to override with
494            registry release dates for accuracy. Defaults to 5.
495        timeout: Timeout in seconds for the changelog fetch. Defaults to 30.
496
497    Returns:
498        List of ConnectorVersionInfo objects, sorted by most recent first.
499
500    Raises:
501        AirbyteConnectorNotRegisteredError: If the connector is not found in the registry.
502
503    Example:
504        >>> versions = get_connector_version_history("source-faker", num_versions_to_validate=3)
505        >>> for v in versions[:5]:
506        ...     print(f"{v.version}: {v.release_date}")
507    """
508    if connector_name not in get_available_connectors(InstallType.DOCKER):
509        raise exc.AirbyteConnectorNotRegisteredError(
510            connector_name=connector_name,
511            context={
512                "registry_url": _get_registry_url(),
513                "available_connectors": get_available_connectors(InstallType.DOCKER),
514            },
515        )
516
517    connector_type = "sources" if connector_name.startswith("source-") else "destinations"
518    connector_short_name = connector_name.replace("source-", "").replace("destination-", "")
519
520    changelog_url = f"https://docs.airbyte.com/integrations/{connector_type}/{connector_short_name}"
521
522    try:
523        response = requests.get(
524            changelog_url,
525            headers={"User-Agent": f"PyAirbyte/{get_version()}"},
526            timeout=timeout,
527        )
528        response.raise_for_status()
529        html_content = response.text
530    except requests.exceptions.RequestException as e:
531        logger.warning(f"Failed to fetch changelog for {connector_name}: {e}")
532        return []
533
534    version_dicts = parse_changelog_html(html_content, connector_name)
535
536    if not version_dicts:
537        logger.warning(f"No versions found in changelog for {connector_name}")
538        return []
539
540    versions = [ConnectorVersionInfo(**version_dict) for version_dict in version_dicts]
541
542    for version_info in versions[:num_versions_to_validate]:
543        registry_date = fetch_registry_version_date(connector_name, version_info.version)
544        if registry_date:
545            version_info.release_date = registry_date
546            logger.debug(
547                f"Updated release date for {connector_name} v{version_info.version} "
548                f"from registry: {registry_date}"
549            )
550
551    return versions
logger = <Logger airbyte (INFO)>
class InstallType(builtins.str, enum.Enum):
49class InstallType(str, Enum):
50    """The type of installation for a connector."""
51
52    YAML = "yaml"
53    PYTHON = "python"
54    DOCKER = "docker"
55    JAVA = "java"

The type of installation for a connector.

YAML = <InstallType.YAML: 'yaml'>
PYTHON = <InstallType.PYTHON: 'python'>
DOCKER = <InstallType.DOCKER: 'docker'>
JAVA = <InstallType.JAVA: 'java'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class Language(builtins.str, enum.Enum):
58class Language(str, Enum):
59    """The language of a connector."""
60
61    PYTHON = InstallType.PYTHON.value
62    JAVA = InstallType.JAVA.value
63    MANIFEST_ONLY = _MANIFEST_ONLY_LANGUAGE

The language of a connector.

PYTHON = <Language.PYTHON: 'python'>
JAVA = <Language.JAVA: 'java'>
MANIFEST_ONLY = <Language.MANIFEST_ONLY: 'manifest-only'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class ConnectorMetadata(pydantic.main.BaseModel):
66class ConnectorMetadata(BaseModel):
67    """Metadata for a connector."""
68
69    name: str
70    """Connector name. For example, "source-google-sheets"."""
71
72    latest_available_version: str | None
73    """The latest available version of the connector."""
74
75    pypi_package_name: str | None
76    """The name of the PyPI package for the connector, if it exists."""
77
78    language: Language | None
79    """The language of the connector."""
80
81    install_types: set[InstallType]
82    """The supported install types for the connector."""
83
84    suggested_streams: list[str] | None = None
85    """A list of suggested streams for the connector, if available."""
86
87    @property
88    def default_install_type(self) -> InstallType:
89        """Return the default install type for the connector."""
90        if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types:
91            return InstallType.YAML
92
93        if InstallType.PYTHON in self.install_types:
94            return InstallType.PYTHON
95
96        # Else: Java or Docker
97        return InstallType.DOCKER

Metadata for a connector.

name: str

Connector name. For example, "source-google-sheets".

latest_available_version: str | None

The latest available version of the connector.

pypi_package_name: str | None

The name of the PyPI package for the connector, if it exists.

language: Language | None

The language of the connector.

install_types: set[InstallType]

The supported install types for the connector.

suggested_streams: list[str] | None

A list of suggested streams for the connector, if available.

default_install_type: InstallType
87    @property
88    def default_install_type(self) -> InstallType:
89        """Return the default install type for the connector."""
90        if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types:
91            return InstallType.YAML
92
93        if InstallType.PYTHON in self.install_types:
94            return InstallType.PYTHON
95
96        # Else: Java or Docker
97        return InstallType.DOCKER

Return the default install type for the connector.

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
def get_connector_metadata(name: str) -> ConnectorMetadata | None:
203def get_connector_metadata(name: str) -> ConnectorMetadata | None:
204    """Check the cache for the connector.
205
206    If the cache is empty, populate by calling update_cache.
207    """
208    registry_url = _get_registry_url()
209
210    if _is_registry_disabled(registry_url):
211        return None
212
213    cache = copy(_get_registry_cache())
214
215    if not cache:
216        raise exc.PyAirbyteInternalError(
217            message="Connector registry could not be loaded.",
218            context={
219                "registry_url": _get_registry_url(),
220            },
221        )
222    if name not in cache:
223        raise exc.AirbyteConnectorNotRegisteredError(
224            connector_name=name,
225            context={
226                "registry_url": _get_registry_url(),
227                "available_connectors": get_available_connectors(),
228            },
229        )
230    return cache[name]

Check the cache for the connector.

If the cache is empty, populate by calling update_cache.

def get_available_connectors( install_type: InstallType | str | None = None) -> list[str]:
233def get_available_connectors(install_type: InstallType | str | None = None) -> list[str]:
234    """Return a list of all available connectors.
235
236    Connectors will be returned in alphabetical order, with the standard prefix "source-".
237    """
238    if install_type is None:
239        # No install type specified. Filter for whatever is runnable.
240        if is_docker_installed():
241            logger.info("Docker is detected. Returning all connectors.")
242            # If Docker is available, return all connectors.
243            return sorted(conn.name for conn in _get_registry_cache().values())
244
245        logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.")
246
247        # If Docker is not available, return only Python and Manifest-based connectors.
248        return sorted(
249            conn.name
250            for conn in _get_registry_cache().values()
251            if conn.language in {Language.PYTHON, Language.MANIFEST_ONLY}
252        )
253
254    if not isinstance(install_type, InstallType):
255        install_type = InstallType(install_type)
256
257    if install_type == InstallType.PYTHON:
258        return sorted(
259            conn.name
260            for conn in _get_registry_cache().values()
261            if conn.pypi_package_name is not None
262        )
263
264    if install_type == InstallType.JAVA:
265        warnings.warn(
266            message="Java connectors are not yet supported.",
267            stacklevel=2,
268        )
269        return sorted(
270            conn.name for conn in _get_registry_cache().values() if conn.language == Language.JAVA
271        )
272
273    if install_type == InstallType.DOCKER:
274        return sorted(conn.name for conn in _get_registry_cache().values())
275
276    if install_type == InstallType.YAML:
277        return sorted(
278            conn.name
279            for conn in _get_registry_cache().values()
280            if InstallType.YAML in conn.install_types
281        )
282
283    # pragma: no cover  # Should never be reached.
284    raise exc.PyAirbyteInputError(
285        message="Invalid install type.",
286        context={
287            "install_type": install_type,
288        },
289    )

Return a list of all available connectors.

Connectors will be returned in alphabetical order, with the standard prefix "source-".

class ConnectorVersionInfo(pydantic.main.BaseModel):
292class ConnectorVersionInfo(BaseModel):
293    """Information about a specific connector version."""
294
295    version: str
296    release_date: str | None = None
297    docker_image_url: str
298    changelog_url: str
299    pr_url: str | None = None
300    pr_title: str | None = None
301    parsing_errors: list[str] = Field(default_factory=list)

Information about a specific connector version.

version: str
release_date: str | None
docker_image_url: str
changelog_url: str
pr_url: str | None
pr_title: str | None
parsing_errors: list[str]
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
class ApiDocsUrl(pydantic.main.BaseModel):
304class ApiDocsUrl(BaseModel):
305    """API documentation URL information."""
306
307    title: str
308    url: str
309    source: str
310    doc_type: str = Field(default="other", alias="type")
311    requires_login: bool = Field(default=False, alias="requiresLogin")
312
313    model_config = {"populate_by_name": True}
314
315    @classmethod
316    def from_manifest_dict(cls, manifest_data: dict[str, Any]) -> list[Self]:
317        """Extract documentation URLs from parsed manifest data.
318
319        Args:
320            manifest_data: The parsed manifest.yaml data as a dictionary
321
322        Returns:
323            List of ApiDocsUrl objects extracted from the manifest
324        """
325        results: list[Self] = []
326
327        data_section = manifest_data.get("data")
328        if isinstance(data_section, dict):
329            external_docs = data_section.get("externalDocumentationUrls")
330            if isinstance(external_docs, list):
331                results = [
332                    cls(
333                        title=doc["title"],
334                        url=doc["url"],
335                        source="data_external_docs",
336                        doc_type=doc.get("type", "other"),
337                        requires_login=doc.get("requiresLogin", False),
338                    )
339                    for doc in external_docs
340                ]
341
342        return results

API documentation URL information.

title: str
url: str
source: str
doc_type: str
requires_login: bool
model_config = {'populate_by_name': True, 'validate_by_alias': True, 'validate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

@classmethod
def from_manifest_dict( cls, manifest_data: dict[str, typing.Any]) -> list[typing_extensions.Self]:
315    @classmethod
316    def from_manifest_dict(cls, manifest_data: dict[str, Any]) -> list[Self]:
317        """Extract documentation URLs from parsed manifest data.
318
319        Args:
320            manifest_data: The parsed manifest.yaml data as a dictionary
321
322        Returns:
323            List of ApiDocsUrl objects extracted from the manifest
324        """
325        results: list[Self] = []
326
327        data_section = manifest_data.get("data")
328        if isinstance(data_section, dict):
329            external_docs = data_section.get("externalDocumentationUrls")
330            if isinstance(external_docs, list):
331                results = [
332                    cls(
333                        title=doc["title"],
334                        url=doc["url"],
335                        source="data_external_docs",
336                        doc_type=doc.get("type", "other"),
337                        requires_login=doc.get("requiresLogin", False),
338                    )
339                    for doc in external_docs
340                ]
341
342        return results

Extract documentation URLs from parsed manifest data.

Arguments:
  • manifest_data: The parsed manifest.yaml data as a dictionary
Returns:

List of ApiDocsUrl objects extracted from the manifest

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
def get_connector_api_docs_urls(connector_name: str) -> list[ApiDocsUrl]:
433def get_connector_api_docs_urls(connector_name: str) -> list[ApiDocsUrl]:
434    """Get API documentation URLs for a connector.
435
436    This function retrieves documentation URLs for a connector's upstream API from multiple sources:
437    - Registry metadata (documentationUrl, externalDocumentationUrls)
438    - Connector manifest.yaml file (data.externalDocumentationUrls)
439
440    Args:
441        connector_name: The canonical connector name (e.g., "source-facebook-marketing")
442
443    Returns:
444        List of ApiDocsUrl objects with documentation URLs, deduplicated by URL.
445
446    Raises:
447        AirbyteConnectorNotRegisteredError: If the connector is not found in the registry.
448    """
449    if connector_name not in get_available_connectors(InstallType.DOCKER):
450        raise exc.AirbyteConnectorNotRegisteredError(
451            connector_name=connector_name,
452            context={
453                "registry_url": _get_registry_url(),
454                "available_connectors": get_available_connectors(InstallType.DOCKER),
455            },
456        )
457
458    docs_urls: list[ApiDocsUrl] = []
459
460    registry_urls = _extract_docs_from_registry(connector_name)
461    docs_urls.extend(registry_urls)
462
463    manifest_url = _manifest_url_for(connector_name)
464    manifest_data = _fetch_manifest_dict(manifest_url)
465    manifest_urls = ApiDocsUrl.from_manifest_dict(manifest_data)
466    docs_urls.extend(manifest_urls)
467
468    seen_urls = set()
469    unique_docs_urls = []
470    for doc_url in docs_urls:
471        if doc_url.url not in seen_urls:
472            seen_urls.add(doc_url.url)
473            unique_docs_urls.append(doc_url)
474
475    return unique_docs_urls

Get API documentation URLs for a connector.

This function retrieves documentation URLs for a connector's upstream API from multiple sources:

  • Registry metadata (documentationUrl, externalDocumentationUrls)
  • Connector manifest.yaml file (data.externalDocumentationUrls)
Arguments:
  • connector_name: The canonical connector name (e.g., "source-facebook-marketing")
Returns:

List of ApiDocsUrl objects with documentation URLs, deduplicated by URL.

Raises:
  • AirbyteConnectorNotRegisteredError: If the connector is not found in the registry.
def get_connector_version_history( connector_name: str, *, num_versions_to_validate: int = 5, timeout: int = 30) -> list[ConnectorVersionInfo]:
478def get_connector_version_history(
479    connector_name: str,
480    *,
481    num_versions_to_validate: int = 5,
482    timeout: int = 30,
483) -> list[ConnectorVersionInfo]:
484    """Get version history for a connector.
485
486    This function retrieves the version history for a connector by:
487    1. Scraping the changelog HTML from docs.airbyte.com
488    2. Parsing version information including PR URLs and titles
489    3. Overriding release dates for the most recent N versions with accurate
490       registry data
491
492    Args:
493        connector_name: Name of the connector (e.g., 'source-faker', 'destination-postgres')
494        num_versions_to_validate: Number of most recent versions to override with
495            registry release dates for accuracy. Defaults to 5.
496        timeout: Timeout in seconds for the changelog fetch. Defaults to 30.
497
498    Returns:
499        List of ConnectorVersionInfo objects, sorted by most recent first.
500
501    Raises:
502        AirbyteConnectorNotRegisteredError: If the connector is not found in the registry.
503
504    Example:
505        >>> versions = get_connector_version_history("source-faker", num_versions_to_validate=3)
506        >>> for v in versions[:5]:
507        ...     print(f"{v.version}: {v.release_date}")
508    """
509    if connector_name not in get_available_connectors(InstallType.DOCKER):
510        raise exc.AirbyteConnectorNotRegisteredError(
511            connector_name=connector_name,
512            context={
513                "registry_url": _get_registry_url(),
514                "available_connectors": get_available_connectors(InstallType.DOCKER),
515            },
516        )
517
518    connector_type = "sources" if connector_name.startswith("source-") else "destinations"
519    connector_short_name = connector_name.replace("source-", "").replace("destination-", "")
520
521    changelog_url = f"https://docs.airbyte.com/integrations/{connector_type}/{connector_short_name}"
522
523    try:
524        response = requests.get(
525            changelog_url,
526            headers={"User-Agent": f"PyAirbyte/{get_version()}"},
527            timeout=timeout,
528        )
529        response.raise_for_status()
530        html_content = response.text
531    except requests.exceptions.RequestException as e:
532        logger.warning(f"Failed to fetch changelog for {connector_name}: {e}")
533        return []
534
535    version_dicts = parse_changelog_html(html_content, connector_name)
536
537    if not version_dicts:
538        logger.warning(f"No versions found in changelog for {connector_name}")
539        return []
540
541    versions = [ConnectorVersionInfo(**version_dict) for version_dict in version_dicts]
542
543    for version_info in versions[:num_versions_to_validate]:
544        registry_date = fetch_registry_version_date(connector_name, version_info.version)
545        if registry_date:
546            version_info.release_date = registry_date
547            logger.debug(
548                f"Updated release date for {connector_name} v{version_info.version} "
549                f"from registry: {registry_date}"
550            )
551
552    return versions

Get version history for a connector.

This function retrieves the version history for a connector by:

  1. Scraping the changelog HTML from docs.airbyte.com
  2. Parsing version information including PR URLs and titles
  3. Overriding release dates for the most recent N versions with accurate registry data
Arguments:
  • connector_name: Name of the connector (e.g., 'source-faker', 'destination-postgres')
  • num_versions_to_validate: Number of most recent versions to override with registry release dates for accuracy. Defaults to 5.
  • timeout: Timeout in seconds for the changelog fetch. Defaults to 30.
Returns:

List of ConnectorVersionInfo objects, sorted by most recent first.

Raises:
  • AirbyteConnectorNotRegisteredError: If the connector is not found in the registry.
Example:
>>> versions = get_connector_version_history("source-faker", num_versions_to_validate=3)
>>> for v in versions[:5]:
...     print(f"{v.version}: {v.release_date}")