airbyte.registry
Connectivity to the connector catalog registry.
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""Connectivity to the connector catalog registry.""" 3 4from __future__ import annotations 5 6import json 7import logging 8import os 9import warnings 10from copy import copy 11from enum import Enum 12from pathlib import Path 13from typing import Any, cast 14 15import requests 16import yaml 17from pydantic import BaseModel, Field 18from typing_extensions import Self 19 20from airbyte import exceptions as exc 21from airbyte._registry_utils import fetch_registry_version_date, parse_changelog_html 22from airbyte._util.meta import is_docker_installed 23from airbyte.constants import AIRBYTE_OFFLINE_MODE 24from airbyte.logs import warn_once 25from airbyte.version import get_version 26 27 28logger = logging.getLogger("airbyte") 29 30 31__cache: dict[str, ConnectorMetadata] | None = None 32 33 34_REGISTRY_ENV_VAR = "AIRBYTE_LOCAL_REGISTRY" 35_REGISTRY_URL = "https://connectors.airbyte.com/files/registries/v0/oss_registry.json" 36 37_PYTHON_LANGUAGE = "python" 38_MANIFEST_ONLY_LANGUAGE = "manifest-only" 39 40_PYTHON_LANGUAGE_TAG = f"language:{_PYTHON_LANGUAGE}" 41_MANIFEST_ONLY_TAG = f"language:{_MANIFEST_ONLY_LANGUAGE}" 42 43_DEFAULT_MANIFEST_URL = ( 44 "https://connectors.airbyte.com/files/metadata/airbyte/{source_name}/{version}/manifest.yaml" 45) 46 47 48class InstallType(str, Enum): 49 """The type of installation for a connector.""" 50 51 YAML = "yaml" 52 PYTHON = "python" 53 DOCKER = "docker" 54 JAVA = "java" 55 56 57class Language(str, Enum): 58 """The language of a connector.""" 59 60 PYTHON = InstallType.PYTHON.value 61 JAVA = InstallType.JAVA.value 62 MANIFEST_ONLY = _MANIFEST_ONLY_LANGUAGE 63 64 65class ConnectorMetadata(BaseModel): 66 """Metadata for a connector.""" 67 68 name: str 69 """Connector name. For example, "source-google-sheets".""" 70 71 latest_available_version: str | None 72 """The latest available version of the connector.""" 73 74 pypi_package_name: str | None 75 """The name of the PyPI package for the connector, if it exists.""" 76 77 language: Language | None 78 """The language of the connector.""" 79 80 install_types: set[InstallType] 81 """The supported install types for the connector.""" 82 83 suggested_streams: list[str] | None = None 84 """A list of suggested streams for the connector, if available.""" 85 86 @property 87 def default_install_type(self) -> InstallType: 88 """Return the default install type for the connector.""" 89 if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types: 90 return InstallType.YAML 91 92 if InstallType.PYTHON in self.install_types: 93 return InstallType.PYTHON 94 95 # Else: Java or Docker 96 return InstallType.DOCKER 97 98 99def _get_registry_url() -> str: 100 if _REGISTRY_ENV_VAR in os.environ: 101 return str(os.environ.get(_REGISTRY_ENV_VAR)) 102 103 return _REGISTRY_URL 104 105 106def _is_registry_disabled(url: str) -> bool: 107 return url.upper() in {"0", "F", "FALSE"} or AIRBYTE_OFFLINE_MODE 108 109 110def _registry_entry_to_connector_metadata(entry: dict) -> ConnectorMetadata: 111 name = entry["dockerRepository"].replace("airbyte/", "") 112 latest_version: str | None = entry.get("dockerImageTag") 113 tags = entry.get("tags", []) 114 language: Language | None = None 115 116 if "language" in entry and entry["language"] is not None: 117 try: 118 language = Language(entry["language"]) 119 except Exception: 120 warnings.warn( 121 message=f"Invalid language for connector {name}: {entry['language']}", 122 stacklevel=2, 123 ) 124 if not language and _PYTHON_LANGUAGE_TAG in tags: 125 language = Language.PYTHON 126 if not language and _MANIFEST_ONLY_TAG in tags: 127 language = Language.MANIFEST_ONLY 128 129 remote_registries: dict = entry.get("remoteRegistries", {}) 130 pypi_registry: dict = remote_registries.get("pypi", {}) 131 pypi_package_name = cast( 132 "str | None", 133 pypi_registry.get("packageName", None), 134 ) 135 pypi_enabled: bool = pypi_registry.get("enabled", False) 136 install_types: set[InstallType] = { 137 x 138 for x in [ 139 InstallType.DOCKER, # Always True 140 InstallType.PYTHON if language == Language.PYTHON and pypi_enabled else None, 141 InstallType.JAVA if language == Language.JAVA else None, 142 InstallType.YAML if language == Language.MANIFEST_ONLY else None, 143 ] 144 if x 145 } 146 147 return ConnectorMetadata( 148 name=name, 149 latest_available_version=latest_version, 150 pypi_package_name=pypi_package_name if pypi_enabled else None, 151 language=language, 152 install_types=install_types, 153 suggested_streams=entry.get("suggestedStreams", {}).get("streams", None), 154 ) 155 156 157def _get_registry_cache(*, force_refresh: bool = False) -> dict[str, ConnectorMetadata]: 158 """Return the registry cache.""" 159 global __cache 160 if __cache and not force_refresh: 161 return __cache 162 163 registry_url = _get_registry_url() 164 165 if _is_registry_disabled(registry_url): 166 return {} 167 168 if registry_url.startswith("http"): 169 response = requests.get( 170 registry_url, 171 headers={"User-Agent": f"PyAirbyte/{get_version()}"}, 172 ) 173 response.raise_for_status() 174 data = response.json() 175 else: 176 # Assume local file 177 with Path(registry_url).open(encoding="utf-8") as f: 178 data = json.load(f) 179 180 new_cache: dict[str, ConnectorMetadata] = {} 181 182 for connector in data["sources"]: 183 connector_metadata = _registry_entry_to_connector_metadata(connector) 184 new_cache[connector_metadata.name] = connector_metadata 185 186 for connector in data["destinations"]: 187 connector_metadata = _registry_entry_to_connector_metadata(connector) 188 new_cache[connector_metadata.name] = connector_metadata 189 190 if len(new_cache) == 0: 191 # This isn't necessarily fatal, since users can bring their own 192 # connector definitions. 193 warn_once( 194 message=f"Connector registry is empty: {registry_url}", 195 with_stack=False, 196 ) 197 198 __cache = new_cache 199 return __cache 200 201 202def get_connector_metadata(name: str) -> ConnectorMetadata | None: 203 """Check the cache for the connector. 204 205 If the cache is empty, populate by calling update_cache. 206 """ 207 registry_url = _get_registry_url() 208 209 if _is_registry_disabled(registry_url): 210 return None 211 212 cache = copy(_get_registry_cache()) 213 214 if not cache: 215 raise exc.PyAirbyteInternalError( 216 message="Connector registry could not be loaded.", 217 context={ 218 "registry_url": _get_registry_url(), 219 }, 220 ) 221 if name not in cache: 222 raise exc.AirbyteConnectorNotRegisteredError( 223 connector_name=name, 224 context={ 225 "registry_url": _get_registry_url(), 226 "available_connectors": get_available_connectors(), 227 }, 228 ) 229 return cache[name] 230 231 232def get_available_connectors(install_type: InstallType | str | None = None) -> list[str]: 233 """Return a list of all available connectors. 234 235 Connectors will be returned in alphabetical order, with the standard prefix "source-". 236 """ 237 if install_type is None: 238 # No install type specified. Filter for whatever is runnable. 239 if is_docker_installed(): 240 logger.info("Docker is detected. Returning all connectors.") 241 # If Docker is available, return all connectors. 242 return sorted(conn.name for conn in _get_registry_cache().values()) 243 244 logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.") 245 246 # If Docker is not available, return only Python and Manifest-based connectors. 247 return sorted( 248 conn.name 249 for conn in _get_registry_cache().values() 250 if conn.language in {Language.PYTHON, Language.MANIFEST_ONLY} 251 ) 252 253 if not isinstance(install_type, InstallType): 254 install_type = InstallType(install_type) 255 256 if install_type == InstallType.PYTHON: 257 return sorted( 258 conn.name 259 for conn in _get_registry_cache().values() 260 if conn.pypi_package_name is not None 261 ) 262 263 if install_type == InstallType.JAVA: 264 warnings.warn( 265 message="Java connectors are not yet supported.", 266 stacklevel=2, 267 ) 268 return sorted( 269 conn.name for conn in _get_registry_cache().values() if conn.language == Language.JAVA 270 ) 271 272 if install_type == InstallType.DOCKER: 273 return sorted(conn.name for conn in _get_registry_cache().values()) 274 275 if install_type == InstallType.YAML: 276 return sorted( 277 conn.name 278 for conn in _get_registry_cache().values() 279 if InstallType.YAML in conn.install_types 280 ) 281 282 # pragma: no cover # Should never be reached. 283 raise exc.PyAirbyteInputError( 284 message="Invalid install type.", 285 context={ 286 "install_type": install_type, 287 }, 288 ) 289 290 291class ConnectorVersionInfo(BaseModel): 292 """Information about a specific connector version.""" 293 294 version: str 295 release_date: str | None = None 296 docker_image_url: str 297 changelog_url: str 298 pr_url: str | None = None 299 pr_title: str | None = None 300 parsing_errors: list[str] = Field(default_factory=list) 301 302 303class ApiDocsUrl(BaseModel): 304 """API documentation URL information.""" 305 306 title: str 307 url: str 308 source: str 309 doc_type: str = Field(default="other", alias="type") 310 requires_login: bool = Field(default=False, alias="requiresLogin") 311 312 model_config = {"populate_by_name": True} 313 314 @classmethod 315 def from_manifest_dict(cls, manifest_data: dict[str, Any]) -> list[Self]: 316 """Extract documentation URLs from parsed manifest data. 317 318 Args: 319 manifest_data: The parsed manifest.yaml data as a dictionary 320 321 Returns: 322 List of ApiDocsUrl objects extracted from the manifest 323 """ 324 results: list[Self] = [] 325 326 data_section = manifest_data.get("data") 327 if isinstance(data_section, dict): 328 external_docs = data_section.get("externalDocumentationUrls") 329 if isinstance(external_docs, list): 330 results = [ 331 cls( 332 title=doc["title"], 333 url=doc["url"], 334 source="data_external_docs", 335 doc_type=doc.get("type", "other"), 336 requires_login=doc.get("requiresLogin", False), 337 ) 338 for doc in external_docs 339 ] 340 341 return results 342 343 344def _manifest_url_for(connector_name: str) -> str: 345 """Get the expected URL of the manifest.yaml file for a connector. 346 347 Args: 348 connector_name: The canonical connector name (e.g., "source-facebook-marketing") 349 350 Returns: 351 The URL to the connector's manifest.yaml file 352 """ 353 return _DEFAULT_MANIFEST_URL.format( 354 source_name=connector_name, 355 version="latest", 356 ) 357 358 359def _fetch_manifest_dict(url: str) -> dict[str, Any]: 360 """Fetch and parse a manifest.yaml file from a URL. 361 362 Args: 363 url: The URL to fetch the manifest from 364 365 Returns: 366 The parsed manifest data as a dictionary, or empty dict if manifest not found (404) 367 368 Raises: 369 HTTPError: If the request fails with a non-404 status code 370 """ 371 http_not_found = 404 372 373 response = requests.get(url, timeout=10) 374 if response.status_code == http_not_found: 375 return {} 376 377 response.raise_for_status() 378 return yaml.safe_load(response.text) or {} 379 380 381def _extract_docs_from_registry(connector_name: str) -> list[ApiDocsUrl]: 382 """Extract documentation URLs from connector registry metadata. 383 384 Args: 385 connector_name: The canonical connector name (e.g., "source-facebook-marketing") 386 387 Returns: 388 List of ApiDocsUrl objects extracted from the registry 389 """ 390 registry_url = _get_registry_url() 391 response = requests.get(registry_url, timeout=10) 392 response.raise_for_status() 393 registry_data = response.json() 394 395 connector_list = registry_data.get("sources", []) + registry_data.get("destinations", []) 396 connector_entry = None 397 for entry in connector_list: 398 if entry.get("dockerRepository", "").endswith(f"/{connector_name}"): 399 connector_entry = entry 400 break 401 402 docs_urls = [] 403 404 if connector_entry and "documentationUrl" in connector_entry: 405 docs_urls.append( 406 ApiDocsUrl( 407 title="Airbyte Documentation", 408 url=connector_entry["documentationUrl"], 409 source="registry", 410 ) 411 ) 412 413 if connector_entry and "externalDocumentationUrls" in connector_entry: 414 external_docs = connector_entry["externalDocumentationUrls"] 415 if isinstance(external_docs, list): 416 docs_urls.extend( 417 [ 418 ApiDocsUrl( 419 title=doc["title"], 420 url=doc["url"], 421 source="registry_external_docs", 422 doc_type=doc.get("type", "other"), 423 requires_login=doc.get("requiresLogin", False), 424 ) 425 for doc in external_docs 426 ] 427 ) 428 429 return docs_urls 430 431 432def get_connector_api_docs_urls(connector_name: str) -> list[ApiDocsUrl]: 433 """Get API documentation URLs for a connector. 434 435 This function retrieves documentation URLs for a connector's upstream API from multiple sources: 436 - Registry metadata (documentationUrl, externalDocumentationUrls) 437 - Connector manifest.yaml file (data.externalDocumentationUrls) 438 439 Args: 440 connector_name: The canonical connector name (e.g., "source-facebook-marketing") 441 442 Returns: 443 List of ApiDocsUrl objects with documentation URLs, deduplicated by URL. 444 445 Raises: 446 AirbyteConnectorNotRegisteredError: If the connector is not found in the registry. 447 """ 448 if connector_name not in get_available_connectors(InstallType.DOCKER): 449 raise exc.AirbyteConnectorNotRegisteredError( 450 connector_name=connector_name, 451 context={ 452 "registry_url": _get_registry_url(), 453 "available_connectors": get_available_connectors(InstallType.DOCKER), 454 }, 455 ) 456 457 docs_urls: list[ApiDocsUrl] = [] 458 459 registry_urls = _extract_docs_from_registry(connector_name) 460 docs_urls.extend(registry_urls) 461 462 manifest_url = _manifest_url_for(connector_name) 463 manifest_data = _fetch_manifest_dict(manifest_url) 464 manifest_urls = ApiDocsUrl.from_manifest_dict(manifest_data) 465 docs_urls.extend(manifest_urls) 466 467 seen_urls = set() 468 unique_docs_urls = [] 469 for doc_url in docs_urls: 470 if doc_url.url not in seen_urls: 471 seen_urls.add(doc_url.url) 472 unique_docs_urls.append(doc_url) 473 474 return unique_docs_urls 475 476 477def get_connector_version_history( 478 connector_name: str, 479 *, 480 num_versions_to_validate: int = 5, 481 timeout: int = 30, 482) -> list[ConnectorVersionInfo]: 483 """Get version history for a connector. 484 485 This function retrieves the version history for a connector by: 486 1. Scraping the changelog HTML from docs.airbyte.com 487 2. Parsing version information including PR URLs and titles 488 3. Overriding release dates for the most recent N versions with accurate 489 registry data 490 491 Args: 492 connector_name: Name of the connector (e.g., 'source-faker', 'destination-postgres') 493 num_versions_to_validate: Number of most recent versions to override with 494 registry release dates for accuracy. Defaults to 5. 495 timeout: Timeout in seconds for the changelog fetch. Defaults to 30. 496 497 Returns: 498 List of ConnectorVersionInfo objects, sorted by most recent first. 499 500 Raises: 501 AirbyteConnectorNotRegisteredError: If the connector is not found in the registry. 502 503 Example: 504 >>> versions = get_connector_version_history("source-faker", num_versions_to_validate=3) 505 >>> for v in versions[:5]: 506 ... print(f"{v.version}: {v.release_date}") 507 """ 508 if connector_name not in get_available_connectors(InstallType.DOCKER): 509 raise exc.AirbyteConnectorNotRegisteredError( 510 connector_name=connector_name, 511 context={ 512 "registry_url": _get_registry_url(), 513 "available_connectors": get_available_connectors(InstallType.DOCKER), 514 }, 515 ) 516 517 connector_type = "sources" if connector_name.startswith("source-") else "destinations" 518 connector_short_name = connector_name.replace("source-", "").replace("destination-", "") 519 520 changelog_url = f"https://docs.airbyte.com/integrations/{connector_type}/{connector_short_name}" 521 522 try: 523 response = requests.get( 524 changelog_url, 525 headers={"User-Agent": f"PyAirbyte/{get_version()}"}, 526 timeout=timeout, 527 ) 528 response.raise_for_status() 529 html_content = response.text 530 except requests.exceptions.RequestException as e: 531 logger.warning(f"Failed to fetch changelog for {connector_name}: {e}") 532 return [] 533 534 version_dicts = parse_changelog_html(html_content, connector_name) 535 536 if not version_dicts: 537 logger.warning(f"No versions found in changelog for {connector_name}") 538 return [] 539 540 versions = [ConnectorVersionInfo(**version_dict) for version_dict in version_dicts] 541 542 for version_info in versions[:num_versions_to_validate]: 543 registry_date = fetch_registry_version_date(connector_name, version_info.version) 544 if registry_date: 545 version_info.release_date = registry_date 546 logger.debug( 547 f"Updated release date for {connector_name} v{version_info.version} " 548 f"from registry: {registry_date}" 549 ) 550 551 return versions
49class InstallType(str, Enum): 50 """The type of installation for a connector.""" 51 52 YAML = "yaml" 53 PYTHON = "python" 54 DOCKER = "docker" 55 JAVA = "java"
The type of installation for a connector.
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
58class Language(str, Enum): 59 """The language of a connector.""" 60 61 PYTHON = InstallType.PYTHON.value 62 JAVA = InstallType.JAVA.value 63 MANIFEST_ONLY = _MANIFEST_ONLY_LANGUAGE
The language of a connector.
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
66class ConnectorMetadata(BaseModel): 67 """Metadata for a connector.""" 68 69 name: str 70 """Connector name. For example, "source-google-sheets".""" 71 72 latest_available_version: str | None 73 """The latest available version of the connector.""" 74 75 pypi_package_name: str | None 76 """The name of the PyPI package for the connector, if it exists.""" 77 78 language: Language | None 79 """The language of the connector.""" 80 81 install_types: set[InstallType] 82 """The supported install types for the connector.""" 83 84 suggested_streams: list[str] | None = None 85 """A list of suggested streams for the connector, if available.""" 86 87 @property 88 def default_install_type(self) -> InstallType: 89 """Return the default install type for the connector.""" 90 if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types: 91 return InstallType.YAML 92 93 if InstallType.PYTHON in self.install_types: 94 return InstallType.PYTHON 95 96 # Else: Java or Docker 97 return InstallType.DOCKER
Metadata for a connector.
87 @property 88 def default_install_type(self) -> InstallType: 89 """Return the default install type for the connector.""" 90 if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types: 91 return InstallType.YAML 92 93 if InstallType.PYTHON in self.install_types: 94 return InstallType.PYTHON 95 96 # Else: Java or Docker 97 return InstallType.DOCKER
Return the default install type for the connector.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
203def get_connector_metadata(name: str) -> ConnectorMetadata | None: 204 """Check the cache for the connector. 205 206 If the cache is empty, populate by calling update_cache. 207 """ 208 registry_url = _get_registry_url() 209 210 if _is_registry_disabled(registry_url): 211 return None 212 213 cache = copy(_get_registry_cache()) 214 215 if not cache: 216 raise exc.PyAirbyteInternalError( 217 message="Connector registry could not be loaded.", 218 context={ 219 "registry_url": _get_registry_url(), 220 }, 221 ) 222 if name not in cache: 223 raise exc.AirbyteConnectorNotRegisteredError( 224 connector_name=name, 225 context={ 226 "registry_url": _get_registry_url(), 227 "available_connectors": get_available_connectors(), 228 }, 229 ) 230 return cache[name]
Check the cache for the connector.
If the cache is empty, populate by calling update_cache.
233def get_available_connectors(install_type: InstallType | str | None = None) -> list[str]: 234 """Return a list of all available connectors. 235 236 Connectors will be returned in alphabetical order, with the standard prefix "source-". 237 """ 238 if install_type is None: 239 # No install type specified. Filter for whatever is runnable. 240 if is_docker_installed(): 241 logger.info("Docker is detected. Returning all connectors.") 242 # If Docker is available, return all connectors. 243 return sorted(conn.name for conn in _get_registry_cache().values()) 244 245 logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.") 246 247 # If Docker is not available, return only Python and Manifest-based connectors. 248 return sorted( 249 conn.name 250 for conn in _get_registry_cache().values() 251 if conn.language in {Language.PYTHON, Language.MANIFEST_ONLY} 252 ) 253 254 if not isinstance(install_type, InstallType): 255 install_type = InstallType(install_type) 256 257 if install_type == InstallType.PYTHON: 258 return sorted( 259 conn.name 260 for conn in _get_registry_cache().values() 261 if conn.pypi_package_name is not None 262 ) 263 264 if install_type == InstallType.JAVA: 265 warnings.warn( 266 message="Java connectors are not yet supported.", 267 stacklevel=2, 268 ) 269 return sorted( 270 conn.name for conn in _get_registry_cache().values() if conn.language == Language.JAVA 271 ) 272 273 if install_type == InstallType.DOCKER: 274 return sorted(conn.name for conn in _get_registry_cache().values()) 275 276 if install_type == InstallType.YAML: 277 return sorted( 278 conn.name 279 for conn in _get_registry_cache().values() 280 if InstallType.YAML in conn.install_types 281 ) 282 283 # pragma: no cover # Should never be reached. 284 raise exc.PyAirbyteInputError( 285 message="Invalid install type.", 286 context={ 287 "install_type": install_type, 288 }, 289 )
Return a list of all available connectors.
Connectors will be returned in alphabetical order, with the standard prefix "source-".
292class ConnectorVersionInfo(BaseModel): 293 """Information about a specific connector version.""" 294 295 version: str 296 release_date: str | None = None 297 docker_image_url: str 298 changelog_url: str 299 pr_url: str | None = None 300 pr_title: str | None = None 301 parsing_errors: list[str] = Field(default_factory=list)
Information about a specific connector version.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
304class ApiDocsUrl(BaseModel): 305 """API documentation URL information.""" 306 307 title: str 308 url: str 309 source: str 310 doc_type: str = Field(default="other", alias="type") 311 requires_login: bool = Field(default=False, alias="requiresLogin") 312 313 model_config = {"populate_by_name": True} 314 315 @classmethod 316 def from_manifest_dict(cls, manifest_data: dict[str, Any]) -> list[Self]: 317 """Extract documentation URLs from parsed manifest data. 318 319 Args: 320 manifest_data: The parsed manifest.yaml data as a dictionary 321 322 Returns: 323 List of ApiDocsUrl objects extracted from the manifest 324 """ 325 results: list[Self] = [] 326 327 data_section = manifest_data.get("data") 328 if isinstance(data_section, dict): 329 external_docs = data_section.get("externalDocumentationUrls") 330 if isinstance(external_docs, list): 331 results = [ 332 cls( 333 title=doc["title"], 334 url=doc["url"], 335 source="data_external_docs", 336 doc_type=doc.get("type", "other"), 337 requires_login=doc.get("requiresLogin", False), 338 ) 339 for doc in external_docs 340 ] 341 342 return results
API documentation URL information.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
315 @classmethod 316 def from_manifest_dict(cls, manifest_data: dict[str, Any]) -> list[Self]: 317 """Extract documentation URLs from parsed manifest data. 318 319 Args: 320 manifest_data: The parsed manifest.yaml data as a dictionary 321 322 Returns: 323 List of ApiDocsUrl objects extracted from the manifest 324 """ 325 results: list[Self] = [] 326 327 data_section = manifest_data.get("data") 328 if isinstance(data_section, dict): 329 external_docs = data_section.get("externalDocumentationUrls") 330 if isinstance(external_docs, list): 331 results = [ 332 cls( 333 title=doc["title"], 334 url=doc["url"], 335 source="data_external_docs", 336 doc_type=doc.get("type", "other"), 337 requires_login=doc.get("requiresLogin", False), 338 ) 339 for doc in external_docs 340 ] 341 342 return results
Extract documentation URLs from parsed manifest data.
Arguments:
- manifest_data: The parsed manifest.yaml data as a dictionary
Returns:
List of ApiDocsUrl objects extracted from the manifest
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
433def get_connector_api_docs_urls(connector_name: str) -> list[ApiDocsUrl]: 434 """Get API documentation URLs for a connector. 435 436 This function retrieves documentation URLs for a connector's upstream API from multiple sources: 437 - Registry metadata (documentationUrl, externalDocumentationUrls) 438 - Connector manifest.yaml file (data.externalDocumentationUrls) 439 440 Args: 441 connector_name: The canonical connector name (e.g., "source-facebook-marketing") 442 443 Returns: 444 List of ApiDocsUrl objects with documentation URLs, deduplicated by URL. 445 446 Raises: 447 AirbyteConnectorNotRegisteredError: If the connector is not found in the registry. 448 """ 449 if connector_name not in get_available_connectors(InstallType.DOCKER): 450 raise exc.AirbyteConnectorNotRegisteredError( 451 connector_name=connector_name, 452 context={ 453 "registry_url": _get_registry_url(), 454 "available_connectors": get_available_connectors(InstallType.DOCKER), 455 }, 456 ) 457 458 docs_urls: list[ApiDocsUrl] = [] 459 460 registry_urls = _extract_docs_from_registry(connector_name) 461 docs_urls.extend(registry_urls) 462 463 manifest_url = _manifest_url_for(connector_name) 464 manifest_data = _fetch_manifest_dict(manifest_url) 465 manifest_urls = ApiDocsUrl.from_manifest_dict(manifest_data) 466 docs_urls.extend(manifest_urls) 467 468 seen_urls = set() 469 unique_docs_urls = [] 470 for doc_url in docs_urls: 471 if doc_url.url not in seen_urls: 472 seen_urls.add(doc_url.url) 473 unique_docs_urls.append(doc_url) 474 475 return unique_docs_urls
Get API documentation URLs for a connector.
This function retrieves documentation URLs for a connector's upstream API from multiple sources:
- Registry metadata (documentationUrl, externalDocumentationUrls)
- Connector manifest.yaml file (data.externalDocumentationUrls)
Arguments:
- connector_name: The canonical connector name (e.g., "source-facebook-marketing")
Returns:
List of ApiDocsUrl objects with documentation URLs, deduplicated by URL.
Raises:
- AirbyteConnectorNotRegisteredError: If the connector is not found in the registry.
478def get_connector_version_history( 479 connector_name: str, 480 *, 481 num_versions_to_validate: int = 5, 482 timeout: int = 30, 483) -> list[ConnectorVersionInfo]: 484 """Get version history for a connector. 485 486 This function retrieves the version history for a connector by: 487 1. Scraping the changelog HTML from docs.airbyte.com 488 2. Parsing version information including PR URLs and titles 489 3. Overriding release dates for the most recent N versions with accurate 490 registry data 491 492 Args: 493 connector_name: Name of the connector (e.g., 'source-faker', 'destination-postgres') 494 num_versions_to_validate: Number of most recent versions to override with 495 registry release dates for accuracy. Defaults to 5. 496 timeout: Timeout in seconds for the changelog fetch. Defaults to 30. 497 498 Returns: 499 List of ConnectorVersionInfo objects, sorted by most recent first. 500 501 Raises: 502 AirbyteConnectorNotRegisteredError: If the connector is not found in the registry. 503 504 Example: 505 >>> versions = get_connector_version_history("source-faker", num_versions_to_validate=3) 506 >>> for v in versions[:5]: 507 ... print(f"{v.version}: {v.release_date}") 508 """ 509 if connector_name not in get_available_connectors(InstallType.DOCKER): 510 raise exc.AirbyteConnectorNotRegisteredError( 511 connector_name=connector_name, 512 context={ 513 "registry_url": _get_registry_url(), 514 "available_connectors": get_available_connectors(InstallType.DOCKER), 515 }, 516 ) 517 518 connector_type = "sources" if connector_name.startswith("source-") else "destinations" 519 connector_short_name = connector_name.replace("source-", "").replace("destination-", "") 520 521 changelog_url = f"https://docs.airbyte.com/integrations/{connector_type}/{connector_short_name}" 522 523 try: 524 response = requests.get( 525 changelog_url, 526 headers={"User-Agent": f"PyAirbyte/{get_version()}"}, 527 timeout=timeout, 528 ) 529 response.raise_for_status() 530 html_content = response.text 531 except requests.exceptions.RequestException as e: 532 logger.warning(f"Failed to fetch changelog for {connector_name}: {e}") 533 return [] 534 535 version_dicts = parse_changelog_html(html_content, connector_name) 536 537 if not version_dicts: 538 logger.warning(f"No versions found in changelog for {connector_name}") 539 return [] 540 541 versions = [ConnectorVersionInfo(**version_dict) for version_dict in version_dicts] 542 543 for version_info in versions[:num_versions_to_validate]: 544 registry_date = fetch_registry_version_date(connector_name, version_info.version) 545 if registry_date: 546 version_info.release_date = registry_date 547 logger.debug( 548 f"Updated release date for {connector_name} v{version_info.version} " 549 f"from registry: {registry_date}" 550 ) 551 552 return versions
Get version history for a connector.
This function retrieves the version history for a connector by:
- Scraping the changelog HTML from docs.airbyte.com
- Parsing version information including PR URLs and titles
- Overriding release dates for the most recent N versions with accurate registry data
Arguments:
- connector_name: Name of the connector (e.g., 'source-faker', 'destination-postgres')
- num_versions_to_validate: Number of most recent versions to override with registry release dates for accuracy. Defaults to 5.
- timeout: Timeout in seconds for the changelog fetch. Defaults to 30.
Returns:
List of ConnectorVersionInfo objects, sorted by most recent first.
Raises:
- AirbyteConnectorNotRegisteredError: If the connector is not found in the registry.
Example:
>>> versions = get_connector_version_history("source-faker", num_versions_to_validate=3) >>> for v in versions[:5]: ... print(f"{v.version}: {v.release_date}")