airbyte.registry
Connectivity to the connector catalog registry.
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""Connectivity to the connector catalog registry.""" 3 4from __future__ import annotations 5 6import json 7import logging 8import os 9import warnings 10from copy import copy 11from enum import Enum 12from pathlib import Path 13from typing import Any, cast 14 15import requests 16import yaml 17from pydantic import BaseModel, Field 18from typing_extensions import Self 19 20from airbyte import exceptions as exc 21from airbyte._registry_utils import fetch_registry_version_date, parse_changelog_html 22from airbyte._util.meta import is_docker_installed 23from airbyte.constants import AIRBYTE_OFFLINE_MODE 24from airbyte.logs import warn_once 25from airbyte.version import get_version 26 27 28logger = logging.getLogger("airbyte") 29 30 31__cache: dict[str, ConnectorMetadata] | None = None 32 33 34_REGISTRY_ENV_VAR = "AIRBYTE_LOCAL_REGISTRY" 35_REGISTRY_URL = "https://connectors.airbyte.com/files/registries/v0/oss_registry.json" 36 37_PYTHON_LANGUAGE = "python" 38_MANIFEST_ONLY_LANGUAGE = "manifest-only" 39 40_PYTHON_LANGUAGE_TAG = f"language:{_PYTHON_LANGUAGE}" 41_MANIFEST_ONLY_TAG = f"language:{_MANIFEST_ONLY_LANGUAGE}" 42 43_DEFAULT_MANIFEST_URL = ( 44 "https://connectors.airbyte.com/files/metadata/airbyte/{source_name}/{version}/manifest.yaml" 45) 46 47 48class InstallType(str, Enum): 49 """The type of installation for a connector.""" 50 51 YAML = "yaml" 52 """Manifest-only connectors that can be run without Docker.""" 53 PYTHON = "python" 54 """Python-based connectors available via PyPI.""" 55 DOCKER = "docker" 56 """Docker-based connectors (returns all connectors for backward compatibility).""" 57 JAVA = "java" 58 """Java-based connectors.""" 59 60 INSTALLABLE = "installable" 61 """Connectors installable in the current environment (environment-sensitive). 62 63 Returns all connectors if Docker is installed, otherwise only Python and YAML. 64 """ 65 ANY = "any" 66 """All connectors in the registry (environment-independent).""" 67 68 69class Language(str, Enum): 70 """The language of a connector.""" 71 72 PYTHON = InstallType.PYTHON.value 73 JAVA = InstallType.JAVA.value 74 MANIFEST_ONLY = _MANIFEST_ONLY_LANGUAGE 75 76 77class ConnectorMetadata(BaseModel): 78 """Metadata for a connector.""" 79 80 name: str 81 """Connector name. For example, "source-google-sheets".""" 82 83 latest_available_version: str | None 84 """The latest available version of the connector.""" 85 86 pypi_package_name: str | None 87 """The name of the PyPI package for the connector, if it exists.""" 88 89 language: Language | None 90 """The language of the connector.""" 91 92 install_types: set[InstallType] 93 """The supported install types for the connector.""" 94 95 suggested_streams: list[str] | None = None 96 """A list of suggested streams for the connector, if available.""" 97 98 @property 99 def default_install_type(self) -> InstallType: 100 """Return the default install type for the connector.""" 101 if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types: 102 return InstallType.YAML 103 104 if InstallType.PYTHON in self.install_types: 105 return InstallType.PYTHON 106 107 # Else: Java or Docker 108 return InstallType.DOCKER 109 110 111def _get_registry_url() -> str: 112 if _REGISTRY_ENV_VAR in os.environ: 113 return str(os.environ.get(_REGISTRY_ENV_VAR)) 114 115 return _REGISTRY_URL 116 117 118def _is_registry_disabled(url: str) -> bool: 119 return url.upper() in {"0", "F", "FALSE"} or AIRBYTE_OFFLINE_MODE 120 121 122def _registry_entry_to_connector_metadata(entry: dict) -> ConnectorMetadata: 123 name = entry["dockerRepository"].replace("airbyte/", "") 124 latest_version: str | None = entry.get("dockerImageTag") 125 tags = entry.get("tags", []) 126 language: Language | None = None 127 128 if "language" in entry and entry["language"] is not None: 129 try: 130 language = Language(entry["language"]) 131 except Exception: 132 warnings.warn( 133 message=f"Invalid language for connector {name}: {entry['language']}", 134 stacklevel=2, 135 ) 136 if not language and _PYTHON_LANGUAGE_TAG in tags: 137 language = Language.PYTHON 138 if not language and _MANIFEST_ONLY_TAG in tags: 139 language = Language.MANIFEST_ONLY 140 141 remote_registries: dict = entry.get("remoteRegistries", {}) 142 pypi_registry: dict = remote_registries.get("pypi", {}) 143 pypi_package_name = cast( 144 "str | None", 145 pypi_registry.get("packageName", None), 146 ) 147 pypi_enabled: bool = pypi_registry.get("enabled", False) 148 install_types: set[InstallType] = { 149 x 150 for x in [ 151 InstallType.DOCKER, # Always True 152 InstallType.PYTHON if language == Language.PYTHON and pypi_enabled else None, 153 InstallType.JAVA if language == Language.JAVA else None, 154 InstallType.YAML if language == Language.MANIFEST_ONLY else None, 155 ] 156 if x 157 } 158 159 return ConnectorMetadata( 160 name=name, 161 latest_available_version=latest_version, 162 pypi_package_name=pypi_package_name if pypi_enabled else None, 163 language=language, 164 install_types=install_types, 165 suggested_streams=entry.get("suggestedStreams", {}).get("streams", None), 166 ) 167 168 169def _get_registry_cache( 170 *, 171 force_refresh: bool = False, 172) -> dict[str, ConnectorMetadata]: 173 """Return the registry cache. 174 175 Result is a mapping of connector name to ConnectorMetadata. 176 """ 177 global __cache 178 if __cache and not force_refresh: 179 return __cache 180 181 registry_url = _get_registry_url() 182 183 if _is_registry_disabled(registry_url): 184 return {} 185 186 if registry_url.startswith("http"): 187 response = requests.get( 188 registry_url, 189 headers={"User-Agent": f"PyAirbyte/{get_version()}"}, 190 ) 191 response.raise_for_status() 192 data = response.json() 193 else: 194 # Assume local file 195 with Path(registry_url).open(encoding="utf-8") as f: 196 data = json.load(f) 197 198 new_cache: dict[str, ConnectorMetadata] = {} 199 200 for connector in data["sources"]: 201 connector_metadata = _registry_entry_to_connector_metadata(connector) 202 new_cache[connector_metadata.name] = connector_metadata 203 204 for connector in data["destinations"]: 205 connector_metadata = _registry_entry_to_connector_metadata(connector) 206 new_cache[connector_metadata.name] = connector_metadata 207 208 if len(new_cache) == 0: 209 # This isn't necessarily fatal, since users can bring their own 210 # connector definitions. 211 warn_once( 212 message=f"Connector registry is empty: {registry_url}", 213 with_stack=False, 214 ) 215 216 __cache = new_cache 217 return __cache 218 219 220def get_connector_metadata(name: str) -> ConnectorMetadata | None: 221 """Check the cache for the connector. 222 223 If the cache is empty, populate by calling update_cache. 224 """ 225 registry_url = _get_registry_url() 226 227 if _is_registry_disabled(registry_url): 228 return None 229 230 cache = copy(_get_registry_cache()) 231 232 if not cache: 233 raise exc.PyAirbyteInternalError( 234 message="Connector registry could not be loaded.", 235 context={ 236 "registry_url": _get_registry_url(), 237 }, 238 ) 239 if name not in cache: 240 raise exc.AirbyteConnectorNotRegisteredError( 241 connector_name=name, 242 context={ 243 "registry_url": _get_registry_url(), 244 "available_connectors": get_available_connectors(), 245 }, 246 ) 247 return cache[name] 248 249 250def get_available_connectors( 251 install_type: InstallType | str | None = InstallType.INSTALLABLE, 252) -> list[str]: 253 """Return a list of all available connectors. 254 255 Connectors will be returned in alphabetical order, with the standard prefix "source-". 256 257 Args: 258 install_type: The type of installation for the connector. 259 Defaults to `InstallType.INSTALLABLE`. 260 """ 261 if install_type is None or install_type == InstallType.INSTALLABLE: 262 # Filter for installable connectors (default behavior). 263 if is_docker_installed(): 264 logger.info("Docker is detected. Returning all connectors.") 265 return sorted(_get_registry_cache().keys()) 266 267 logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.") 268 return sorted( 269 [ 270 connector_name 271 for connector_name, conn_info in _get_registry_cache().items() 272 if conn_info.language in {Language.PYTHON, Language.MANIFEST_ONLY} 273 ] 274 ) 275 276 if not isinstance(install_type, InstallType): 277 install_type = InstallType(install_type) 278 279 if install_type == InstallType.PYTHON: 280 return sorted( 281 connector_name 282 for connector_name, conn_info in _get_registry_cache().items() 283 if conn_info.pypi_package_name is not None 284 ) 285 286 if install_type == InstallType.JAVA: 287 warnings.warn( 288 message="Java connectors are not yet supported.", 289 stacklevel=2, 290 ) 291 return sorted( 292 connector_name 293 for connector_name, conn_info in _get_registry_cache().items() 294 if conn_info.language == Language.JAVA 295 ) 296 297 if install_type in {InstallType.DOCKER, InstallType.ANY}: 298 return sorted(_get_registry_cache().keys()) 299 300 if install_type == InstallType.YAML: 301 return sorted( 302 conn.name 303 for conn in _get_registry_cache().values() 304 if InstallType.YAML in conn.install_types 305 ) 306 307 # pragma: no cover # Should never be reached. 308 raise exc.PyAirbyteInputError( 309 message="Invalid install type.", 310 context={ 311 "install_type": install_type, 312 }, 313 ) 314 315 316class ConnectorVersionInfo(BaseModel): 317 """Information about a specific connector version.""" 318 319 version: str 320 release_date: str | None = None 321 docker_image_url: str 322 changelog_url: str 323 pr_url: str | None = None 324 pr_title: str | None = None 325 parsing_errors: list[str] = Field(default_factory=list) 326 327 328class ApiDocsUrl(BaseModel): 329 """API documentation URL information.""" 330 331 title: str 332 url: str 333 source: str 334 doc_type: str = Field(default="other", alias="type") 335 requires_login: bool = Field(default=False, alias="requiresLogin") 336 337 model_config = {"populate_by_name": True} 338 339 @classmethod 340 def from_manifest_dict(cls, manifest_data: dict[str, Any]) -> list[Self]: 341 """Extract documentation URLs from parsed manifest data. 342 343 Args: 344 manifest_data: The parsed manifest.yaml data as a dictionary 345 346 Returns: 347 List of ApiDocsUrl objects extracted from the manifest 348 """ 349 results: list[Self] = [] 350 351 data_section = manifest_data.get("data") 352 if isinstance(data_section, dict): 353 external_docs = data_section.get("externalDocumentationUrls") 354 if isinstance(external_docs, list): 355 results = [ 356 cls( 357 title=doc["title"], 358 url=doc["url"], 359 source="data_external_docs", 360 doc_type=doc.get("type", "other"), 361 requires_login=doc.get("requiresLogin", False), 362 ) 363 for doc in external_docs 364 ] 365 366 return results 367 368 369def _manifest_url_for(connector_name: str) -> str: 370 """Get the expected URL of the manifest.yaml file for a connector. 371 372 Args: 373 connector_name: The canonical connector name (e.g., "source-facebook-marketing") 374 375 Returns: 376 The URL to the connector's manifest.yaml file 377 """ 378 return _DEFAULT_MANIFEST_URL.format( 379 source_name=connector_name, 380 version="latest", 381 ) 382 383 384def _fetch_manifest_dict(url: str) -> dict[str, Any]: 385 """Fetch and parse a manifest.yaml file from a URL. 386 387 Args: 388 url: The URL to fetch the manifest from 389 390 Returns: 391 The parsed manifest data as a dictionary, or empty dict if manifest not found (404) 392 393 Raises: 394 HTTPError: If the request fails with a non-404 status code 395 """ 396 http_not_found = 404 397 398 response = requests.get(url, timeout=10) 399 if response.status_code == http_not_found: 400 return {} 401 402 response.raise_for_status() 403 return yaml.safe_load(response.text) or {} 404 405 406def _extract_docs_from_registry(connector_name: str) -> list[ApiDocsUrl]: 407 """Extract documentation URLs from connector registry metadata. 408 409 Args: 410 connector_name: The canonical connector name (e.g., "source-facebook-marketing") 411 412 Returns: 413 List of ApiDocsUrl objects extracted from the registry 414 """ 415 registry_url = _get_registry_url() 416 response = requests.get(registry_url, timeout=10) 417 response.raise_for_status() 418 registry_data = response.json() 419 420 connector_list = registry_data.get("sources", []) + registry_data.get("destinations", []) 421 connector_entry = None 422 for entry in connector_list: 423 if entry.get("dockerRepository", "").endswith(f"/{connector_name}"): 424 connector_entry = entry 425 break 426 427 docs_urls = [] 428 429 if connector_entry and "documentationUrl" in connector_entry: 430 docs_urls.append( 431 ApiDocsUrl( 432 title="Airbyte Documentation", 433 url=connector_entry["documentationUrl"], 434 source="registry", 435 ) 436 ) 437 438 if connector_entry and "externalDocumentationUrls" in connector_entry: 439 external_docs = connector_entry["externalDocumentationUrls"] 440 if isinstance(external_docs, list): 441 docs_urls.extend( 442 [ 443 ApiDocsUrl( 444 title=doc["title"], 445 url=doc["url"], 446 source="registry_external_docs", 447 doc_type=doc.get("type", "other"), 448 requires_login=doc.get("requiresLogin", False), 449 ) 450 for doc in external_docs 451 ] 452 ) 453 454 return docs_urls 455 456 457def get_connector_api_docs_urls(connector_name: str) -> list[ApiDocsUrl]: 458 """Get API documentation URLs for a connector. 459 460 This function retrieves documentation URLs for a connector's upstream API from multiple sources: 461 - Registry metadata (documentationUrl, externalDocumentationUrls) 462 - Connector manifest.yaml file (data.externalDocumentationUrls) 463 464 Args: 465 connector_name: The canonical connector name (e.g., "source-facebook-marketing") 466 467 Returns: 468 List of ApiDocsUrl objects with documentation URLs, deduplicated by URL. 469 470 Raises: 471 AirbyteConnectorNotRegisteredError: If the connector is not found in the registry. 472 """ 473 if connector_name not in get_available_connectors(InstallType.ANY): 474 raise exc.AirbyteConnectorNotRegisteredError( 475 connector_name=connector_name, 476 context={ 477 "registry_url": _get_registry_url(), 478 "available_connectors": get_available_connectors(InstallType.ANY), 479 }, 480 ) 481 482 docs_urls: list[ApiDocsUrl] = [] 483 484 registry_urls = _extract_docs_from_registry(connector_name) 485 docs_urls.extend(registry_urls) 486 487 manifest_url = _manifest_url_for(connector_name) 488 manifest_data = _fetch_manifest_dict(manifest_url) 489 manifest_urls = ApiDocsUrl.from_manifest_dict(manifest_data) 490 docs_urls.extend(manifest_urls) 491 492 seen_urls = set() 493 unique_docs_urls = [] 494 for doc_url in docs_urls: 495 if doc_url.url not in seen_urls: 496 seen_urls.add(doc_url.url) 497 unique_docs_urls.append(doc_url) 498 499 return unique_docs_urls 500 501 502def get_connector_version_history( 503 connector_name: str, 504 *, 505 num_versions_to_validate: int = 5, 506 timeout: int = 30, 507) -> list[ConnectorVersionInfo]: 508 """Get version history for a connector. 509 510 This function retrieves the version history for a connector by: 511 1. Scraping the changelog HTML from docs.airbyte.com 512 2. Parsing version information including PR URLs and titles 513 3. Overriding release dates for the most recent N versions with accurate 514 registry data 515 516 Args: 517 connector_name: Name of the connector (e.g., 'source-faker', 'destination-postgres') 518 num_versions_to_validate: Number of most recent versions to override with 519 registry release dates for accuracy. Defaults to 5. 520 timeout: Timeout in seconds for the changelog fetch. Defaults to 30. 521 522 Returns: 523 List of ConnectorVersionInfo objects, sorted by most recent first. 524 525 Raises: 526 AirbyteConnectorNotRegisteredError: If the connector is not found in the registry. 527 528 Example: 529 >>> versions = get_connector_version_history("source-faker", num_versions_to_validate=3) 530 >>> for v in versions[:5]: 531 ... print(f"{v.version}: {v.release_date}") 532 """ 533 if connector_name not in get_available_connectors(InstallType.ANY): 534 raise exc.AirbyteConnectorNotRegisteredError( 535 connector_name=connector_name, 536 context={ 537 "registry_url": _get_registry_url(), 538 "available_connectors": get_available_connectors(InstallType.ANY), 539 }, 540 ) 541 542 connector_type = "sources" if connector_name.startswith("source-") else "destinations" 543 connector_short_name = connector_name.replace("source-", "").replace("destination-", "") 544 545 changelog_url = f"https://docs.airbyte.com/integrations/{connector_type}/{connector_short_name}" 546 547 try: 548 response = requests.get( 549 changelog_url, 550 headers={"User-Agent": f"PyAirbyte/{get_version()}"}, 551 timeout=timeout, 552 ) 553 response.raise_for_status() 554 html_content = response.text 555 except requests.exceptions.RequestException as e: 556 logger.warning(f"Failed to fetch changelog for {connector_name}: {e}") 557 return [] 558 559 version_dicts = parse_changelog_html(html_content, connector_name) 560 561 if not version_dicts: 562 logger.warning(f"No versions found in changelog for {connector_name}") 563 return [] 564 565 versions = [ConnectorVersionInfo(**version_dict) for version_dict in version_dicts] 566 567 for version_info in versions[:num_versions_to_validate]: 568 registry_date = fetch_registry_version_date(connector_name, version_info.version) 569 if registry_date: 570 version_info.release_date = registry_date 571 logger.debug( 572 f"Updated release date for {connector_name} v{version_info.version} " 573 f"from registry: {registry_date}" 574 ) 575 576 return versions
49class InstallType(str, Enum): 50 """The type of installation for a connector.""" 51 52 YAML = "yaml" 53 """Manifest-only connectors that can be run without Docker.""" 54 PYTHON = "python" 55 """Python-based connectors available via PyPI.""" 56 DOCKER = "docker" 57 """Docker-based connectors (returns all connectors for backward compatibility).""" 58 JAVA = "java" 59 """Java-based connectors.""" 60 61 INSTALLABLE = "installable" 62 """Connectors installable in the current environment (environment-sensitive). 63 64 Returns all connectors if Docker is installed, otherwise only Python and YAML. 65 """ 66 ANY = "any" 67 """All connectors in the registry (environment-independent)."""
The type of installation for a connector.
Docker-based connectors (returns all connectors for backward compatibility).
Connectors installable in the current environment (environment-sensitive).
Returns all connectors if Docker is installed, otherwise only Python and YAML.
70class Language(str, Enum): 71 """The language of a connector.""" 72 73 PYTHON = InstallType.PYTHON.value 74 JAVA = InstallType.JAVA.value 75 MANIFEST_ONLY = _MANIFEST_ONLY_LANGUAGE
The language of a connector.
78class ConnectorMetadata(BaseModel): 79 """Metadata for a connector.""" 80 81 name: str 82 """Connector name. For example, "source-google-sheets".""" 83 84 latest_available_version: str | None 85 """The latest available version of the connector.""" 86 87 pypi_package_name: str | None 88 """The name of the PyPI package for the connector, if it exists.""" 89 90 language: Language | None 91 """The language of the connector.""" 92 93 install_types: set[InstallType] 94 """The supported install types for the connector.""" 95 96 suggested_streams: list[str] | None = None 97 """A list of suggested streams for the connector, if available.""" 98 99 @property 100 def default_install_type(self) -> InstallType: 101 """Return the default install type for the connector.""" 102 if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types: 103 return InstallType.YAML 104 105 if InstallType.PYTHON in self.install_types: 106 return InstallType.PYTHON 107 108 # Else: Java or Docker 109 return InstallType.DOCKER
Metadata for a connector.
The latest available version of the connector.
The name of the PyPI package for the connector, if it exists.
A list of suggested streams for the connector, if available.
99 @property 100 def default_install_type(self) -> InstallType: 101 """Return the default install type for the connector.""" 102 if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types: 103 return InstallType.YAML 104 105 if InstallType.PYTHON in self.install_types: 106 return InstallType.PYTHON 107 108 # Else: Java or Docker 109 return InstallType.DOCKER
Return the default install type for the connector.
221def get_connector_metadata(name: str) -> ConnectorMetadata | None: 222 """Check the cache for the connector. 223 224 If the cache is empty, populate by calling update_cache. 225 """ 226 registry_url = _get_registry_url() 227 228 if _is_registry_disabled(registry_url): 229 return None 230 231 cache = copy(_get_registry_cache()) 232 233 if not cache: 234 raise exc.PyAirbyteInternalError( 235 message="Connector registry could not be loaded.", 236 context={ 237 "registry_url": _get_registry_url(), 238 }, 239 ) 240 if name not in cache: 241 raise exc.AirbyteConnectorNotRegisteredError( 242 connector_name=name, 243 context={ 244 "registry_url": _get_registry_url(), 245 "available_connectors": get_available_connectors(), 246 }, 247 ) 248 return cache[name]
Check the cache for the connector.
If the cache is empty, populate by calling update_cache.
251def get_available_connectors( 252 install_type: InstallType | str | None = InstallType.INSTALLABLE, 253) -> list[str]: 254 """Return a list of all available connectors. 255 256 Connectors will be returned in alphabetical order, with the standard prefix "source-". 257 258 Args: 259 install_type: The type of installation for the connector. 260 Defaults to `InstallType.INSTALLABLE`. 261 """ 262 if install_type is None or install_type == InstallType.INSTALLABLE: 263 # Filter for installable connectors (default behavior). 264 if is_docker_installed(): 265 logger.info("Docker is detected. Returning all connectors.") 266 return sorted(_get_registry_cache().keys()) 267 268 logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.") 269 return sorted( 270 [ 271 connector_name 272 for connector_name, conn_info in _get_registry_cache().items() 273 if conn_info.language in {Language.PYTHON, Language.MANIFEST_ONLY} 274 ] 275 ) 276 277 if not isinstance(install_type, InstallType): 278 install_type = InstallType(install_type) 279 280 if install_type == InstallType.PYTHON: 281 return sorted( 282 connector_name 283 for connector_name, conn_info in _get_registry_cache().items() 284 if conn_info.pypi_package_name is not None 285 ) 286 287 if install_type == InstallType.JAVA: 288 warnings.warn( 289 message="Java connectors are not yet supported.", 290 stacklevel=2, 291 ) 292 return sorted( 293 connector_name 294 for connector_name, conn_info in _get_registry_cache().items() 295 if conn_info.language == Language.JAVA 296 ) 297 298 if install_type in {InstallType.DOCKER, InstallType.ANY}: 299 return sorted(_get_registry_cache().keys()) 300 301 if install_type == InstallType.YAML: 302 return sorted( 303 conn.name 304 for conn in _get_registry_cache().values() 305 if InstallType.YAML in conn.install_types 306 ) 307 308 # pragma: no cover # Should never be reached. 309 raise exc.PyAirbyteInputError( 310 message="Invalid install type.", 311 context={ 312 "install_type": install_type, 313 }, 314 )
Return a list of all available connectors.
Connectors will be returned in alphabetical order, with the standard prefix "source-".
Arguments:
- install_type: The type of installation for the connector.
Defaults to
InstallType.INSTALLABLE.
317class ConnectorVersionInfo(BaseModel): 318 """Information about a specific connector version.""" 319 320 version: str 321 release_date: str | None = None 322 docker_image_url: str 323 changelog_url: str 324 pr_url: str | None = None 325 pr_title: str | None = None 326 parsing_errors: list[str] = Field(default_factory=list)
Information about a specific connector version.
329class ApiDocsUrl(BaseModel): 330 """API documentation URL information.""" 331 332 title: str 333 url: str 334 source: str 335 doc_type: str = Field(default="other", alias="type") 336 requires_login: bool = Field(default=False, alias="requiresLogin") 337 338 model_config = {"populate_by_name": True} 339 340 @classmethod 341 def from_manifest_dict(cls, manifest_data: dict[str, Any]) -> list[Self]: 342 """Extract documentation URLs from parsed manifest data. 343 344 Args: 345 manifest_data: The parsed manifest.yaml data as a dictionary 346 347 Returns: 348 List of ApiDocsUrl objects extracted from the manifest 349 """ 350 results: list[Self] = [] 351 352 data_section = manifest_data.get("data") 353 if isinstance(data_section, dict): 354 external_docs = data_section.get("externalDocumentationUrls") 355 if isinstance(external_docs, list): 356 results = [ 357 cls( 358 title=doc["title"], 359 url=doc["url"], 360 source="data_external_docs", 361 doc_type=doc.get("type", "other"), 362 requires_login=doc.get("requiresLogin", False), 363 ) 364 for doc in external_docs 365 ] 366 367 return results
API documentation URL information.
340 @classmethod 341 def from_manifest_dict(cls, manifest_data: dict[str, Any]) -> list[Self]: 342 """Extract documentation URLs from parsed manifest data. 343 344 Args: 345 manifest_data: The parsed manifest.yaml data as a dictionary 346 347 Returns: 348 List of ApiDocsUrl objects extracted from the manifest 349 """ 350 results: list[Self] = [] 351 352 data_section = manifest_data.get("data") 353 if isinstance(data_section, dict): 354 external_docs = data_section.get("externalDocumentationUrls") 355 if isinstance(external_docs, list): 356 results = [ 357 cls( 358 title=doc["title"], 359 url=doc["url"], 360 source="data_external_docs", 361 doc_type=doc.get("type", "other"), 362 requires_login=doc.get("requiresLogin", False), 363 ) 364 for doc in external_docs 365 ] 366 367 return results
Extract documentation URLs from parsed manifest data.
Arguments:
- manifest_data: The parsed manifest.yaml data as a dictionary
Returns:
List of ApiDocsUrl objects extracted from the manifest
458def get_connector_api_docs_urls(connector_name: str) -> list[ApiDocsUrl]: 459 """Get API documentation URLs for a connector. 460 461 This function retrieves documentation URLs for a connector's upstream API from multiple sources: 462 - Registry metadata (documentationUrl, externalDocumentationUrls) 463 - Connector manifest.yaml file (data.externalDocumentationUrls) 464 465 Args: 466 connector_name: The canonical connector name (e.g., "source-facebook-marketing") 467 468 Returns: 469 List of ApiDocsUrl objects with documentation URLs, deduplicated by URL. 470 471 Raises: 472 AirbyteConnectorNotRegisteredError: If the connector is not found in the registry. 473 """ 474 if connector_name not in get_available_connectors(InstallType.ANY): 475 raise exc.AirbyteConnectorNotRegisteredError( 476 connector_name=connector_name, 477 context={ 478 "registry_url": _get_registry_url(), 479 "available_connectors": get_available_connectors(InstallType.ANY), 480 }, 481 ) 482 483 docs_urls: list[ApiDocsUrl] = [] 484 485 registry_urls = _extract_docs_from_registry(connector_name) 486 docs_urls.extend(registry_urls) 487 488 manifest_url = _manifest_url_for(connector_name) 489 manifest_data = _fetch_manifest_dict(manifest_url) 490 manifest_urls = ApiDocsUrl.from_manifest_dict(manifest_data) 491 docs_urls.extend(manifest_urls) 492 493 seen_urls = set() 494 unique_docs_urls = [] 495 for doc_url in docs_urls: 496 if doc_url.url not in seen_urls: 497 seen_urls.add(doc_url.url) 498 unique_docs_urls.append(doc_url) 499 500 return unique_docs_urls
Get API documentation URLs for a connector.
This function retrieves documentation URLs for a connector's upstream API from multiple sources:
- Registry metadata (documentationUrl, externalDocumentationUrls)
- Connector manifest.yaml file (data.externalDocumentationUrls)
Arguments:
- connector_name: The canonical connector name (e.g., "source-facebook-marketing")
Returns:
List of ApiDocsUrl objects with documentation URLs, deduplicated by URL.
Raises:
- AirbyteConnectorNotRegisteredError: If the connector is not found in the registry.
503def get_connector_version_history( 504 connector_name: str, 505 *, 506 num_versions_to_validate: int = 5, 507 timeout: int = 30, 508) -> list[ConnectorVersionInfo]: 509 """Get version history for a connector. 510 511 This function retrieves the version history for a connector by: 512 1. Scraping the changelog HTML from docs.airbyte.com 513 2. Parsing version information including PR URLs and titles 514 3. Overriding release dates for the most recent N versions with accurate 515 registry data 516 517 Args: 518 connector_name: Name of the connector (e.g., 'source-faker', 'destination-postgres') 519 num_versions_to_validate: Number of most recent versions to override with 520 registry release dates for accuracy. Defaults to 5. 521 timeout: Timeout in seconds for the changelog fetch. Defaults to 30. 522 523 Returns: 524 List of ConnectorVersionInfo objects, sorted by most recent first. 525 526 Raises: 527 AirbyteConnectorNotRegisteredError: If the connector is not found in the registry. 528 529 Example: 530 >>> versions = get_connector_version_history("source-faker", num_versions_to_validate=3) 531 >>> for v in versions[:5]: 532 ... print(f"{v.version}: {v.release_date}") 533 """ 534 if connector_name not in get_available_connectors(InstallType.ANY): 535 raise exc.AirbyteConnectorNotRegisteredError( 536 connector_name=connector_name, 537 context={ 538 "registry_url": _get_registry_url(), 539 "available_connectors": get_available_connectors(InstallType.ANY), 540 }, 541 ) 542 543 connector_type = "sources" if connector_name.startswith("source-") else "destinations" 544 connector_short_name = connector_name.replace("source-", "").replace("destination-", "") 545 546 changelog_url = f"https://docs.airbyte.com/integrations/{connector_type}/{connector_short_name}" 547 548 try: 549 response = requests.get( 550 changelog_url, 551 headers={"User-Agent": f"PyAirbyte/{get_version()}"}, 552 timeout=timeout, 553 ) 554 response.raise_for_status() 555 html_content = response.text 556 except requests.exceptions.RequestException as e: 557 logger.warning(f"Failed to fetch changelog for {connector_name}: {e}") 558 return [] 559 560 version_dicts = parse_changelog_html(html_content, connector_name) 561 562 if not version_dicts: 563 logger.warning(f"No versions found in changelog for {connector_name}") 564 return [] 565 566 versions = [ConnectorVersionInfo(**version_dict) for version_dict in version_dicts] 567 568 for version_info in versions[:num_versions_to_validate]: 569 registry_date = fetch_registry_version_date(connector_name, version_info.version) 570 if registry_date: 571 version_info.release_date = registry_date 572 logger.debug( 573 f"Updated release date for {connector_name} v{version_info.version} " 574 f"from registry: {registry_date}" 575 ) 576 577 return versions
Get version history for a connector.
This function retrieves the version history for a connector by:
- Scraping the changelog HTML from docs.airbyte.com
- Parsing version information including PR URLs and titles
- Overriding release dates for the most recent N versions with accurate registry data
Arguments:
- connector_name: Name of the connector (e.g., 'source-faker', 'destination-postgres')
- num_versions_to_validate: Number of most recent versions to override with registry release dates for accuracy. Defaults to 5.
- timeout: Timeout in seconds for the changelog fetch. Defaults to 30.
Returns:
List of ConnectorVersionInfo objects, sorted by most recent first.
Raises:
- AirbyteConnectorNotRegisteredError: If the connector is not found in the registry.
Example:
>>> versions = get_connector_version_history("source-faker", num_versions_to_validate=3) >>> for v in versions[:5]: ... print(f"{v.version}: {v.release_date}")