airbyte.registry
Connectivity to the connector catalog registry.
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""Connectivity to the connector catalog registry.""" 3 4from __future__ import annotations 5 6import json 7import logging 8import os 9import warnings 10from copy import copy 11from enum import Enum 12from pathlib import Path 13from typing import Any, cast 14 15import requests 16import yaml 17from pydantic import BaseModel, Field 18from typing_extensions import Self 19 20from airbyte import exceptions as exc 21from airbyte._registry_utils import fetch_registry_version_date, parse_changelog_html 22from airbyte._util.meta import is_docker_installed 23from airbyte.constants import AIRBYTE_OFFLINE_MODE 24from airbyte.logs import warn_once 25from airbyte.version import get_version 26 27 28logger = logging.getLogger("airbyte") 29 30 31__cache: dict[str, ConnectorMetadata] | None = None 32 33 34_REGISTRY_ENV_VAR = "AIRBYTE_LOCAL_REGISTRY" 35_REGISTRY_URL = "https://connectors.airbyte.com/files/registries/v0/oss_registry.json" 36 37_PYTHON_LANGUAGE = "python" 38_MANIFEST_ONLY_LANGUAGE = "manifest-only" 39 40_PYTHON_LANGUAGE_TAG = f"language:{_PYTHON_LANGUAGE}" 41_MANIFEST_ONLY_TAG = f"language:{_MANIFEST_ONLY_LANGUAGE}" 42 43_DEFAULT_MANIFEST_URL = ( 44 "https://connectors.airbyte.com/files/metadata/airbyte/{source_name}/{version}/manifest.yaml" 45) 46 47 48class InstallType(str, Enum): 49 """The type of installation for a connector.""" 50 51 YAML = "yaml" 52 """Manifest-only connectors that can be run without Docker.""" 53 PYTHON = "python" 54 """Python-based connectors available via PyPI.""" 55 DOCKER = "docker" 56 """Docker-based connectors (returns all connectors for backward compatibility).""" 57 JAVA = "java" 58 """Java-based connectors.""" 59 60 INSTALLABLE = "installable" 61 """Connectors installable in the current environment (environment-sensitive). 62 63 Returns all connectors if Docker is installed, otherwise only Python and YAML. 64 """ 65 ANY = "any" 66 """All connectors in the registry (environment-independent).""" 67 68 69class Language(str, Enum): 70 """The language of a connector.""" 71 72 PYTHON = InstallType.PYTHON.value 73 JAVA = InstallType.JAVA.value 74 MANIFEST_ONLY = _MANIFEST_ONLY_LANGUAGE 75 76 77class ConnectorMetadata(BaseModel): 78 """Metadata for a connector.""" 79 80 name: str 81 """Connector name. For example, "source-google-sheets".""" 82 83 display_name: str | None = None 84 """Human-readable connector name.""" 85 86 connector_type: str | None = None 87 """Connector type: `source` or `destination`.""" 88 89 definition_id: str | None = None 90 """Source or destination definition ID.""" 91 92 docker_repository: str | None = None 93 """Docker repository for the connector image.""" 94 95 latest_available_version: str | None 96 """The latest available version of the connector.""" 97 98 pypi_package_name: str | None 99 """The name of the PyPI package for the connector, if it exists.""" 100 101 language: Language | None 102 """The language of the connector.""" 103 104 install_types: set[InstallType] 105 """The supported install types for the connector.""" 106 107 suggested_streams: list[str] | None = None 108 """A list of suggested streams for the connector, if available.""" 109 110 support_level: str | None = None 111 """Connector support level.""" 112 113 release_stage: str | None = None 114 """Connector release stage.""" 115 116 source_type: str | None = None 117 """Connector subtype.""" 118 119 documentation_url: str | None = None 120 """Connector documentation URL.""" 121 122 release_date: str | None = None 123 """Connector release date.""" 124 125 github_issue_label: str | None = None 126 """GitHub issue label for the connector.""" 127 128 @property 129 def default_install_type(self) -> InstallType: 130 """Return the default install type for the connector.""" 131 if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types: 132 return InstallType.YAML 133 134 if InstallType.PYTHON in self.install_types: 135 return InstallType.PYTHON 136 137 # Else: Java or Docker 138 return InstallType.DOCKER 139 140 141def _get_registry_url() -> str: 142 if _REGISTRY_ENV_VAR in os.environ: 143 return str(os.environ.get(_REGISTRY_ENV_VAR)) 144 145 return _REGISTRY_URL 146 147 148def _is_registry_disabled(url: str) -> bool: 149 return url.upper() in {"0", "F", "FALSE"} or AIRBYTE_OFFLINE_MODE 150 151 152def _registry_entry_to_connector_metadata(entry: dict) -> ConnectorMetadata: 153 name = entry["dockerRepository"].replace("airbyte/", "") 154 latest_version: str | None = entry.get("dockerImageTag") 155 connector_type = "source" if name.startswith("source-") else "destination" 156 definition_id = entry.get("sourceDefinitionId") or entry.get("destinationDefinitionId") 157 tags = entry.get("tags", []) 158 language: Language | None = None 159 160 if "language" in entry and entry["language"] is not None: 161 try: 162 language = Language(entry["language"]) 163 except Exception: 164 warnings.warn( 165 message=f"Invalid language for connector {name}: {entry['language']}", 166 stacklevel=2, 167 ) 168 if not language and _PYTHON_LANGUAGE_TAG in tags: 169 language = Language.PYTHON 170 if not language and _MANIFEST_ONLY_TAG in tags: 171 language = Language.MANIFEST_ONLY 172 173 remote_registries: dict = entry.get("remoteRegistries", {}) 174 pypi_registry: dict = remote_registries.get("pypi", {}) 175 pypi_package_name = cast( 176 "str | None", 177 pypi_registry.get("packageName", None), 178 ) 179 pypi_enabled: bool = pypi_registry.get("enabled", False) 180 install_types: set[InstallType] = { 181 x 182 for x in [ 183 InstallType.DOCKER, # Always True 184 InstallType.PYTHON if language == Language.PYTHON and pypi_enabled else None, 185 InstallType.JAVA if language == Language.JAVA else None, 186 InstallType.YAML if language == Language.MANIFEST_ONLY else None, 187 ] 188 if x 189 } 190 191 return ConnectorMetadata( 192 name=name, 193 display_name=entry.get("name"), 194 connector_type=connector_type, 195 definition_id=definition_id, 196 docker_repository=entry.get("dockerRepository"), 197 latest_available_version=latest_version, 198 pypi_package_name=pypi_package_name if pypi_enabled else None, 199 language=language, 200 install_types=install_types, 201 suggested_streams=entry.get("suggestedStreams", {}).get("streams", None), 202 support_level=entry.get("supportLevel"), 203 release_stage=entry.get("releaseStage"), 204 source_type=entry.get("sourceType"), 205 documentation_url=entry.get("documentationUrl"), 206 release_date=entry.get("releaseDate"), 207 github_issue_label=entry.get("githubIssueLabel"), 208 ) 209 210 211def _get_registry_cache( 212 *, 213 force_refresh: bool = False, 214) -> dict[str, ConnectorMetadata]: 215 """Return the registry cache. 216 217 Result is a mapping of connector name to ConnectorMetadata. 218 """ 219 global __cache 220 if __cache and not force_refresh: 221 return __cache 222 223 registry_url = _get_registry_url() 224 225 if _is_registry_disabled(registry_url): 226 return {} 227 228 if registry_url.startswith("http"): 229 response = requests.get( 230 registry_url, 231 headers={"User-Agent": f"PyAirbyte/{get_version()}"}, 232 ) 233 response.raise_for_status() 234 data = response.json() 235 else: 236 # Assume local file 237 with Path(registry_url).open(encoding="utf-8") as f: 238 data = json.load(f) 239 240 new_cache: dict[str, ConnectorMetadata] = {} 241 242 for connector in data["sources"]: 243 connector_metadata = _registry_entry_to_connector_metadata(connector) 244 new_cache[connector_metadata.name] = connector_metadata 245 246 for connector in data["destinations"]: 247 connector_metadata = _registry_entry_to_connector_metadata(connector) 248 new_cache[connector_metadata.name] = connector_metadata 249 250 if len(new_cache) == 0: 251 # This isn't necessarily fatal, since users can bring their own 252 # connector definitions. 253 warn_once( 254 message=f"Connector registry is empty: {registry_url}", 255 with_stack=False, 256 ) 257 258 __cache = new_cache 259 return __cache 260 261 262def get_connector_metadata(name: str) -> ConnectorMetadata | None: 263 """Check the cache for the connector. 264 265 If the cache is empty, populate by calling update_cache. 266 """ 267 registry_url = _get_registry_url() 268 269 if _is_registry_disabled(registry_url): 270 return None 271 272 cache = copy(_get_registry_cache()) 273 274 if not cache: 275 raise exc.PyAirbyteInternalError( 276 message="Connector registry could not be loaded.", 277 context={ 278 "registry_url": _get_registry_url(), 279 }, 280 ) 281 if name not in cache: 282 raise exc.AirbyteConnectorNotRegisteredError( 283 connector_name=name, 284 context={ 285 "registry_url": _get_registry_url(), 286 "available_connectors": get_available_connectors(), 287 }, 288 ) 289 return cache[name] 290 291 292def get_available_connectors( 293 install_type: InstallType | str | None = InstallType.INSTALLABLE, 294) -> list[str]: 295 """Return a list of all available connectors. 296 297 Connectors will be returned in alphabetical order, with the standard prefix "source-". 298 299 Args: 300 install_type: The type of installation for the connector. 301 Defaults to `InstallType.INSTALLABLE`. 302 """ 303 if install_type is None or install_type == InstallType.INSTALLABLE: 304 # Filter for installable connectors (default behavior). 305 if is_docker_installed(): 306 logger.info("Docker is detected. Returning all connectors.") 307 return sorted(_get_registry_cache().keys()) 308 309 logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.") 310 return sorted( 311 [ 312 connector_name 313 for connector_name, conn_info in _get_registry_cache().items() 314 if conn_info.language in {Language.PYTHON, Language.MANIFEST_ONLY} 315 ] 316 ) 317 318 if not isinstance(install_type, InstallType): 319 install_type = InstallType(install_type) 320 321 if install_type == InstallType.PYTHON: 322 return sorted( 323 connector_name 324 for connector_name, conn_info in _get_registry_cache().items() 325 if conn_info.pypi_package_name is not None 326 ) 327 328 if install_type == InstallType.JAVA: 329 warnings.warn( 330 message="Java connectors are not yet supported.", 331 stacklevel=2, 332 ) 333 return sorted( 334 connector_name 335 for connector_name, conn_info in _get_registry_cache().items() 336 if conn_info.language == Language.JAVA 337 ) 338 339 if install_type in {InstallType.DOCKER, InstallType.ANY}: 340 return sorted(_get_registry_cache().keys()) 341 342 if install_type == InstallType.YAML: 343 return sorted( 344 conn.name 345 for conn in _get_registry_cache().values() 346 if InstallType.YAML in conn.install_types 347 ) 348 349 # pragma: no cover # Should never be reached. 350 raise exc.PyAirbyteInputError( 351 message="Invalid install type.", 352 context={ 353 "install_type": install_type, 354 }, 355 ) 356 357 358class ConnectorVersionInfo(BaseModel): 359 """Information about a specific connector version.""" 360 361 version: str 362 release_date: str | None = None 363 docker_image_url: str 364 changelog_url: str 365 pr_url: str | None = None 366 pr_title: str | None = None 367 parsing_errors: list[str] = Field(default_factory=list) 368 369 370class ApiDocsUrl(BaseModel): 371 """API documentation URL information.""" 372 373 title: str 374 url: str 375 source: str 376 doc_type: str = Field(default="other", alias="type") 377 requires_login: bool = Field(default=False, alias="requiresLogin") 378 379 model_config = {"populate_by_name": True} 380 381 @classmethod 382 def from_manifest_dict(cls, manifest_data: dict[str, Any]) -> list[Self]: 383 """Extract documentation URLs from parsed manifest data. 384 385 Args: 386 manifest_data: The parsed manifest.yaml data as a dictionary 387 388 Returns: 389 List of ApiDocsUrl objects extracted from the manifest 390 """ 391 results: list[Self] = [] 392 393 data_section = manifest_data.get("data") 394 if isinstance(data_section, dict): 395 external_docs = data_section.get("externalDocumentationUrls") 396 if isinstance(external_docs, list): 397 results = [ 398 cls( 399 title=doc["title"], 400 url=doc["url"], 401 source="data_external_docs", 402 doc_type=doc.get("type", "other"), 403 requires_login=doc.get("requiresLogin", False), 404 ) 405 for doc in external_docs 406 ] 407 408 return results 409 410 411def _manifest_url_for(connector_name: str) -> str: 412 """Get the expected URL of the manifest.yaml file for a connector. 413 414 Args: 415 connector_name: The canonical connector name (e.g., "source-facebook-marketing") 416 417 Returns: 418 The URL to the connector's manifest.yaml file 419 """ 420 return _DEFAULT_MANIFEST_URL.format( 421 source_name=connector_name, 422 version="latest", 423 ) 424 425 426def _fetch_manifest_dict(url: str) -> dict[str, Any]: 427 """Fetch and parse a manifest.yaml file from a URL. 428 429 Args: 430 url: The URL to fetch the manifest from 431 432 Returns: 433 The parsed manifest data as a dictionary, or empty dict if manifest not found (404) 434 435 Raises: 436 HTTPError: If the request fails with a non-404 status code 437 """ 438 http_not_found = 404 439 440 response = requests.get(url, timeout=10) 441 if response.status_code == http_not_found: 442 return {} 443 444 response.raise_for_status() 445 return yaml.safe_load(response.text) or {} 446 447 448def _extract_docs_from_registry(connector_name: str) -> list[ApiDocsUrl]: 449 """Extract documentation URLs from connector registry metadata. 450 451 Args: 452 connector_name: The canonical connector name (e.g., "source-facebook-marketing") 453 454 Returns: 455 List of ApiDocsUrl objects extracted from the registry 456 """ 457 registry_url = _get_registry_url() 458 response = requests.get(registry_url, timeout=10) 459 response.raise_for_status() 460 registry_data = response.json() 461 462 connector_list = registry_data.get("sources", []) + registry_data.get("destinations", []) 463 connector_entry = None 464 for entry in connector_list: 465 if entry.get("dockerRepository", "").endswith(f"/{connector_name}"): 466 connector_entry = entry 467 break 468 469 docs_urls = [] 470 471 if connector_entry and "documentationUrl" in connector_entry: 472 docs_urls.append( 473 ApiDocsUrl( 474 title="Airbyte Documentation", 475 url=connector_entry["documentationUrl"], 476 source="registry", 477 ) 478 ) 479 480 if connector_entry and "externalDocumentationUrls" in connector_entry: 481 external_docs = connector_entry["externalDocumentationUrls"] 482 if isinstance(external_docs, list): 483 docs_urls.extend( 484 [ 485 ApiDocsUrl( 486 title=doc["title"], 487 url=doc["url"], 488 source="registry_external_docs", 489 doc_type=doc.get("type", "other"), 490 requires_login=doc.get("requiresLogin", False), 491 ) 492 for doc in external_docs 493 ] 494 ) 495 496 return docs_urls 497 498 499def get_connector_api_docs_urls(connector_name: str) -> list[ApiDocsUrl]: 500 """Get API documentation URLs for a connector. 501 502 This function retrieves documentation URLs for a connector's upstream API from multiple sources: 503 - Registry metadata (documentationUrl, externalDocumentationUrls) 504 - Connector manifest.yaml file (data.externalDocumentationUrls) 505 506 Args: 507 connector_name: The canonical connector name (e.g., "source-facebook-marketing") 508 509 Returns: 510 List of ApiDocsUrl objects with documentation URLs, deduplicated by URL. 511 512 Raises: 513 AirbyteConnectorNotRegisteredError: If the connector is not found in the registry. 514 """ 515 if connector_name not in get_available_connectors(InstallType.ANY): 516 raise exc.AirbyteConnectorNotRegisteredError( 517 connector_name=connector_name, 518 context={ 519 "registry_url": _get_registry_url(), 520 "available_connectors": get_available_connectors(InstallType.ANY), 521 }, 522 ) 523 524 docs_urls: list[ApiDocsUrl] = [] 525 526 registry_urls = _extract_docs_from_registry(connector_name) 527 docs_urls.extend(registry_urls) 528 529 manifest_url = _manifest_url_for(connector_name) 530 manifest_data = _fetch_manifest_dict(manifest_url) 531 manifest_urls = ApiDocsUrl.from_manifest_dict(manifest_data) 532 docs_urls.extend(manifest_urls) 533 534 seen_urls = set() 535 unique_docs_urls = [] 536 for doc_url in docs_urls: 537 if doc_url.url not in seen_urls: 538 seen_urls.add(doc_url.url) 539 unique_docs_urls.append(doc_url) 540 541 return unique_docs_urls 542 543 544def get_connector_version_history( 545 connector_name: str, 546 *, 547 num_versions_to_validate: int = 5, 548 timeout: int = 30, 549) -> list[ConnectorVersionInfo]: 550 """Get version history for a connector. 551 552 This function retrieves the version history for a connector by: 553 1. Scraping the changelog HTML from docs.airbyte.com 554 2. Parsing version information including PR URLs and titles 555 3. Overriding release dates for the most recent N versions with accurate 556 registry data 557 558 Args: 559 connector_name: Name of the connector (e.g., 'source-faker', 'destination-postgres') 560 num_versions_to_validate: Number of most recent versions to override with 561 registry release dates for accuracy. Defaults to 5. 562 timeout: Timeout in seconds for the changelog fetch. Defaults to 30. 563 564 Returns: 565 List of ConnectorVersionInfo objects, sorted by most recent first. 566 567 Raises: 568 AirbyteConnectorNotRegisteredError: If the connector is not found in the registry. 569 570 Example: 571 >>> versions = get_connector_version_history("source-faker", num_versions_to_validate=3) 572 >>> for v in versions[:5]: 573 ... print(f"{v.version}: {v.release_date}") 574 """ 575 if connector_name not in get_available_connectors(InstallType.ANY): 576 raise exc.AirbyteConnectorNotRegisteredError( 577 connector_name=connector_name, 578 context={ 579 "registry_url": _get_registry_url(), 580 "available_connectors": get_available_connectors(InstallType.ANY), 581 }, 582 ) 583 584 connector_type = "sources" if connector_name.startswith("source-") else "destinations" 585 connector_short_name = connector_name.replace("source-", "").replace("destination-", "") 586 587 changelog_url = f"https://docs.airbyte.com/integrations/{connector_type}/{connector_short_name}" 588 589 try: 590 response = requests.get( 591 changelog_url, 592 headers={"User-Agent": f"PyAirbyte/{get_version()}"}, 593 timeout=timeout, 594 ) 595 response.raise_for_status() 596 html_content = response.text 597 except requests.exceptions.RequestException as e: 598 logger.warning(f"Failed to fetch changelog for {connector_name}: {e}") 599 return [] 600 601 version_dicts = parse_changelog_html(html_content, connector_name) 602 603 if not version_dicts: 604 logger.warning(f"No versions found in changelog for {connector_name}") 605 return [] 606 607 versions = [ConnectorVersionInfo(**version_dict) for version_dict in version_dicts] 608 609 for version_info in versions[:num_versions_to_validate]: 610 registry_date = fetch_registry_version_date(connector_name, version_info.version) 611 if registry_date: 612 version_info.release_date = registry_date 613 logger.debug( 614 f"Updated release date for {connector_name} v{version_info.version} " 615 f"from registry: {registry_date}" 616 ) 617 618 return versions
49class InstallType(str, Enum): 50 """The type of installation for a connector.""" 51 52 YAML = "yaml" 53 """Manifest-only connectors that can be run without Docker.""" 54 PYTHON = "python" 55 """Python-based connectors available via PyPI.""" 56 DOCKER = "docker" 57 """Docker-based connectors (returns all connectors for backward compatibility).""" 58 JAVA = "java" 59 """Java-based connectors.""" 60 61 INSTALLABLE = "installable" 62 """Connectors installable in the current environment (environment-sensitive). 63 64 Returns all connectors if Docker is installed, otherwise only Python and YAML. 65 """ 66 ANY = "any" 67 """All connectors in the registry (environment-independent)."""
The type of installation for a connector.
Docker-based connectors (returns all connectors for backward compatibility).
Connectors installable in the current environment (environment-sensitive).
Returns all connectors if Docker is installed, otherwise only Python and YAML.
70class Language(str, Enum): 71 """The language of a connector.""" 72 73 PYTHON = InstallType.PYTHON.value 74 JAVA = InstallType.JAVA.value 75 MANIFEST_ONLY = _MANIFEST_ONLY_LANGUAGE
The language of a connector.
78class ConnectorMetadata(BaseModel): 79 """Metadata for a connector.""" 80 81 name: str 82 """Connector name. For example, "source-google-sheets".""" 83 84 display_name: str | None = None 85 """Human-readable connector name.""" 86 87 connector_type: str | None = None 88 """Connector type: `source` or `destination`.""" 89 90 definition_id: str | None = None 91 """Source or destination definition ID.""" 92 93 docker_repository: str | None = None 94 """Docker repository for the connector image.""" 95 96 latest_available_version: str | None 97 """The latest available version of the connector.""" 98 99 pypi_package_name: str | None 100 """The name of the PyPI package for the connector, if it exists.""" 101 102 language: Language | None 103 """The language of the connector.""" 104 105 install_types: set[InstallType] 106 """The supported install types for the connector.""" 107 108 suggested_streams: list[str] | None = None 109 """A list of suggested streams for the connector, if available.""" 110 111 support_level: str | None = None 112 """Connector support level.""" 113 114 release_stage: str | None = None 115 """Connector release stage.""" 116 117 source_type: str | None = None 118 """Connector subtype.""" 119 120 documentation_url: str | None = None 121 """Connector documentation URL.""" 122 123 release_date: str | None = None 124 """Connector release date.""" 125 126 github_issue_label: str | None = None 127 """GitHub issue label for the connector.""" 128 129 @property 130 def default_install_type(self) -> InstallType: 131 """Return the default install type for the connector.""" 132 if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types: 133 return InstallType.YAML 134 135 if InstallType.PYTHON in self.install_types: 136 return InstallType.PYTHON 137 138 # Else: Java or Docker 139 return InstallType.DOCKER
Metadata for a connector.
The latest available version of the connector.
The name of the PyPI package for the connector, if it exists.
A list of suggested streams for the connector, if available.
129 @property 130 def default_install_type(self) -> InstallType: 131 """Return the default install type for the connector.""" 132 if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types: 133 return InstallType.YAML 134 135 if InstallType.PYTHON in self.install_types: 136 return InstallType.PYTHON 137 138 # Else: Java or Docker 139 return InstallType.DOCKER
Return the default install type for the connector.
263def get_connector_metadata(name: str) -> ConnectorMetadata | None: 264 """Check the cache for the connector. 265 266 If the cache is empty, populate by calling update_cache. 267 """ 268 registry_url = _get_registry_url() 269 270 if _is_registry_disabled(registry_url): 271 return None 272 273 cache = copy(_get_registry_cache()) 274 275 if not cache: 276 raise exc.PyAirbyteInternalError( 277 message="Connector registry could not be loaded.", 278 context={ 279 "registry_url": _get_registry_url(), 280 }, 281 ) 282 if name not in cache: 283 raise exc.AirbyteConnectorNotRegisteredError( 284 connector_name=name, 285 context={ 286 "registry_url": _get_registry_url(), 287 "available_connectors": get_available_connectors(), 288 }, 289 ) 290 return cache[name]
Check the cache for the connector.
If the cache is empty, populate by calling update_cache.
293def get_available_connectors( 294 install_type: InstallType | str | None = InstallType.INSTALLABLE, 295) -> list[str]: 296 """Return a list of all available connectors. 297 298 Connectors will be returned in alphabetical order, with the standard prefix "source-". 299 300 Args: 301 install_type: The type of installation for the connector. 302 Defaults to `InstallType.INSTALLABLE`. 303 """ 304 if install_type is None or install_type == InstallType.INSTALLABLE: 305 # Filter for installable connectors (default behavior). 306 if is_docker_installed(): 307 logger.info("Docker is detected. Returning all connectors.") 308 return sorted(_get_registry_cache().keys()) 309 310 logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.") 311 return sorted( 312 [ 313 connector_name 314 for connector_name, conn_info in _get_registry_cache().items() 315 if conn_info.language in {Language.PYTHON, Language.MANIFEST_ONLY} 316 ] 317 ) 318 319 if not isinstance(install_type, InstallType): 320 install_type = InstallType(install_type) 321 322 if install_type == InstallType.PYTHON: 323 return sorted( 324 connector_name 325 for connector_name, conn_info in _get_registry_cache().items() 326 if conn_info.pypi_package_name is not None 327 ) 328 329 if install_type == InstallType.JAVA: 330 warnings.warn( 331 message="Java connectors are not yet supported.", 332 stacklevel=2, 333 ) 334 return sorted( 335 connector_name 336 for connector_name, conn_info in _get_registry_cache().items() 337 if conn_info.language == Language.JAVA 338 ) 339 340 if install_type in {InstallType.DOCKER, InstallType.ANY}: 341 return sorted(_get_registry_cache().keys()) 342 343 if install_type == InstallType.YAML: 344 return sorted( 345 conn.name 346 for conn in _get_registry_cache().values() 347 if InstallType.YAML in conn.install_types 348 ) 349 350 # pragma: no cover # Should never be reached. 351 raise exc.PyAirbyteInputError( 352 message="Invalid install type.", 353 context={ 354 "install_type": install_type, 355 }, 356 )
Return a list of all available connectors.
Connectors will be returned in alphabetical order, with the standard prefix "source-".
Arguments:
- install_type: The type of installation for the connector.
Defaults to
InstallType.INSTALLABLE.
359class ConnectorVersionInfo(BaseModel): 360 """Information about a specific connector version.""" 361 362 version: str 363 release_date: str | None = None 364 docker_image_url: str 365 changelog_url: str 366 pr_url: str | None = None 367 pr_title: str | None = None 368 parsing_errors: list[str] = Field(default_factory=list)
Information about a specific connector version.
371class ApiDocsUrl(BaseModel): 372 """API documentation URL information.""" 373 374 title: str 375 url: str 376 source: str 377 doc_type: str = Field(default="other", alias="type") 378 requires_login: bool = Field(default=False, alias="requiresLogin") 379 380 model_config = {"populate_by_name": True} 381 382 @classmethod 383 def from_manifest_dict(cls, manifest_data: dict[str, Any]) -> list[Self]: 384 """Extract documentation URLs from parsed manifest data. 385 386 Args: 387 manifest_data: The parsed manifest.yaml data as a dictionary 388 389 Returns: 390 List of ApiDocsUrl objects extracted from the manifest 391 """ 392 results: list[Self] = [] 393 394 data_section = manifest_data.get("data") 395 if isinstance(data_section, dict): 396 external_docs = data_section.get("externalDocumentationUrls") 397 if isinstance(external_docs, list): 398 results = [ 399 cls( 400 title=doc["title"], 401 url=doc["url"], 402 source="data_external_docs", 403 doc_type=doc.get("type", "other"), 404 requires_login=doc.get("requiresLogin", False), 405 ) 406 for doc in external_docs 407 ] 408 409 return results
API documentation URL information.
382 @classmethod 383 def from_manifest_dict(cls, manifest_data: dict[str, Any]) -> list[Self]: 384 """Extract documentation URLs from parsed manifest data. 385 386 Args: 387 manifest_data: The parsed manifest.yaml data as a dictionary 388 389 Returns: 390 List of ApiDocsUrl objects extracted from the manifest 391 """ 392 results: list[Self] = [] 393 394 data_section = manifest_data.get("data") 395 if isinstance(data_section, dict): 396 external_docs = data_section.get("externalDocumentationUrls") 397 if isinstance(external_docs, list): 398 results = [ 399 cls( 400 title=doc["title"], 401 url=doc["url"], 402 source="data_external_docs", 403 doc_type=doc.get("type", "other"), 404 requires_login=doc.get("requiresLogin", False), 405 ) 406 for doc in external_docs 407 ] 408 409 return results
Extract documentation URLs from parsed manifest data.
Arguments:
- manifest_data: The parsed manifest.yaml data as a dictionary
Returns:
List of ApiDocsUrl objects extracted from the manifest
500def get_connector_api_docs_urls(connector_name: str) -> list[ApiDocsUrl]: 501 """Get API documentation URLs for a connector. 502 503 This function retrieves documentation URLs for a connector's upstream API from multiple sources: 504 - Registry metadata (documentationUrl, externalDocumentationUrls) 505 - Connector manifest.yaml file (data.externalDocumentationUrls) 506 507 Args: 508 connector_name: The canonical connector name (e.g., "source-facebook-marketing") 509 510 Returns: 511 List of ApiDocsUrl objects with documentation URLs, deduplicated by URL. 512 513 Raises: 514 AirbyteConnectorNotRegisteredError: If the connector is not found in the registry. 515 """ 516 if connector_name not in get_available_connectors(InstallType.ANY): 517 raise exc.AirbyteConnectorNotRegisteredError( 518 connector_name=connector_name, 519 context={ 520 "registry_url": _get_registry_url(), 521 "available_connectors": get_available_connectors(InstallType.ANY), 522 }, 523 ) 524 525 docs_urls: list[ApiDocsUrl] = [] 526 527 registry_urls = _extract_docs_from_registry(connector_name) 528 docs_urls.extend(registry_urls) 529 530 manifest_url = _manifest_url_for(connector_name) 531 manifest_data = _fetch_manifest_dict(manifest_url) 532 manifest_urls = ApiDocsUrl.from_manifest_dict(manifest_data) 533 docs_urls.extend(manifest_urls) 534 535 seen_urls = set() 536 unique_docs_urls = [] 537 for doc_url in docs_urls: 538 if doc_url.url not in seen_urls: 539 seen_urls.add(doc_url.url) 540 unique_docs_urls.append(doc_url) 541 542 return unique_docs_urls
Get API documentation URLs for a connector.
This function retrieves documentation URLs for a connector's upstream API from multiple sources:
- Registry metadata (documentationUrl, externalDocumentationUrls)
- Connector manifest.yaml file (data.externalDocumentationUrls)
Arguments:
- connector_name: The canonical connector name (e.g., "source-facebook-marketing")
Returns:
List of ApiDocsUrl objects with documentation URLs, deduplicated by URL.
Raises:
- AirbyteConnectorNotRegisteredError: If the connector is not found in the registry.
545def get_connector_version_history( 546 connector_name: str, 547 *, 548 num_versions_to_validate: int = 5, 549 timeout: int = 30, 550) -> list[ConnectorVersionInfo]: 551 """Get version history for a connector. 552 553 This function retrieves the version history for a connector by: 554 1. Scraping the changelog HTML from docs.airbyte.com 555 2. Parsing version information including PR URLs and titles 556 3. Overriding release dates for the most recent N versions with accurate 557 registry data 558 559 Args: 560 connector_name: Name of the connector (e.g., 'source-faker', 'destination-postgres') 561 num_versions_to_validate: Number of most recent versions to override with 562 registry release dates for accuracy. Defaults to 5. 563 timeout: Timeout in seconds for the changelog fetch. Defaults to 30. 564 565 Returns: 566 List of ConnectorVersionInfo objects, sorted by most recent first. 567 568 Raises: 569 AirbyteConnectorNotRegisteredError: If the connector is not found in the registry. 570 571 Example: 572 >>> versions = get_connector_version_history("source-faker", num_versions_to_validate=3) 573 >>> for v in versions[:5]: 574 ... print(f"{v.version}: {v.release_date}") 575 """ 576 if connector_name not in get_available_connectors(InstallType.ANY): 577 raise exc.AirbyteConnectorNotRegisteredError( 578 connector_name=connector_name, 579 context={ 580 "registry_url": _get_registry_url(), 581 "available_connectors": get_available_connectors(InstallType.ANY), 582 }, 583 ) 584 585 connector_type = "sources" if connector_name.startswith("source-") else "destinations" 586 connector_short_name = connector_name.replace("source-", "").replace("destination-", "") 587 588 changelog_url = f"https://docs.airbyte.com/integrations/{connector_type}/{connector_short_name}" 589 590 try: 591 response = requests.get( 592 changelog_url, 593 headers={"User-Agent": f"PyAirbyte/{get_version()}"}, 594 timeout=timeout, 595 ) 596 response.raise_for_status() 597 html_content = response.text 598 except requests.exceptions.RequestException as e: 599 logger.warning(f"Failed to fetch changelog for {connector_name}: {e}") 600 return [] 601 602 version_dicts = parse_changelog_html(html_content, connector_name) 603 604 if not version_dicts: 605 logger.warning(f"No versions found in changelog for {connector_name}") 606 return [] 607 608 versions = [ConnectorVersionInfo(**version_dict) for version_dict in version_dicts] 609 610 for version_info in versions[:num_versions_to_validate]: 611 registry_date = fetch_registry_version_date(connector_name, version_info.version) 612 if registry_date: 613 version_info.release_date = registry_date 614 logger.debug( 615 f"Updated release date for {connector_name} v{version_info.version} " 616 f"from registry: {registry_date}" 617 ) 618 619 return versions
Get version history for a connector.
This function retrieves the version history for a connector by:
- Scraping the changelog HTML from docs.airbyte.com
- Parsing version information including PR URLs and titles
- Overriding release dates for the most recent N versions with accurate registry data
Arguments:
- connector_name: Name of the connector (e.g., 'source-faker', 'destination-postgres')
- num_versions_to_validate: Number of most recent versions to override with registry release dates for accuracy. Defaults to 5.
- timeout: Timeout in seconds for the changelog fetch. Defaults to 30.
Returns:
List of ConnectorVersionInfo objects, sorted by most recent first.
Raises:
- AirbyteConnectorNotRegisteredError: If the connector is not found in the registry.
Example:
>>> versions = get_connector_version_history("source-faker", num_versions_to_validate=3) >>> for v in versions[:5]: ... print(f"{v.version}: {v.release_date}")