airbyte.mcp.connector_registry
Airbyte Cloud MCP operations.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Airbyte Cloud MCP operations.""" 3 4# Note: Deferred type evaluation must be avoided due to FastMCP/Pydantic needing 5# types to be available at import time for tool registration. 6import contextlib 7import logging 8from typing import Annotated, Any, Literal 9 10import requests 11from fastmcp import FastMCP 12from pydantic import BaseModel, Field 13 14from airbyte import exceptions as exc 15from airbyte._util.meta import is_docker_installed 16from airbyte.mcp._tool_utils import mcp_tool, register_tools 17from airbyte.mcp._util import resolve_list_of_strings 18from airbyte.registry import ( 19 _DEFAULT_MANIFEST_URL, 20 ApiDocsUrl, 21 ConnectorMetadata, 22 ConnectorVersionInfo, 23 InstallType, 24 get_available_connectors, 25 get_connector_api_docs_urls, 26 get_connector_metadata, 27) 28from airbyte.registry import get_connector_version_history as _get_connector_version_history 29from airbyte.sources.util import get_source 30 31 32logger = logging.getLogger("airbyte.mcp") 33 34 35@mcp_tool( 36 domain="registry", 37 read_only=True, 38 idempotent=True, 39) 40def list_connectors( 41 keyword_filter: Annotated[ 42 str | None, 43 Field( 44 description="Filter connectors by keyword.", 45 default=None, 46 ), 47 ], 48 connector_type_filter: Annotated[ 49 Literal["source", "destination"] | None, 50 Field( 51 description="Filter connectors by type ('source' or 'destination').", 52 default=None, 53 ), 54 ], 55 install_types: Annotated[ 56 Literal["java", "python", "yaml", "docker"] 57 | list[Literal["java", "python", "yaml", "docker"]] 58 | None, 59 Field( 60 description=( 61 """ 62 Filter connectors by install type. 63 These are not mutually exclusive: 64 - "python": Connectors that can be installed as Python packages. 65 - "yaml": Connectors that can be installed simply via YAML download. 66 These connectors are the fastest to install and run, as they do not require any 67 additional dependencies. 68 - "java": Connectors that can only be installed via Java. Since PyAirbyte does not 69 currently ship with a JVM, these connectors will be run via Docker instead. 70 In environments where Docker is not available, these connectors may not be 71 runnable. 72 - "docker": Connectors that can be installed via Docker. Note that all connectors 73 can be run in Docker, so this filter should generally return the same results as 74 not specifying a filter. 75 If no install types are specified, all connectors will be returned. 76 """ 77 ), 78 default=None, 79 ), 80 ], 81) -> list[str]: 82 """List available Airbyte connectors with optional filtering. 83 84 Returns: 85 List of connector names. 86 """ 87 # Start with the full list of known connectors (all support Docker): 88 connectors: list[str] = get_available_connectors(install_type=InstallType.DOCKER) 89 90 install_types_list: list[str] | None = resolve_list_of_strings( 91 install_types, # type: ignore[arg-type] # Type check doesn't understand literal is str 92 ) 93 94 if install_types_list: 95 # If install_types is provided, filter connectors based on the specified install types. 96 connectors = [ 97 connector 98 for connector in connectors 99 if any( 100 connector in get_available_connectors(install_type=install_type) 101 for install_type in install_types_list 102 ) 103 ] 104 105 if keyword_filter: 106 # Filter connectors by keyword, case-insensitive. 107 connectors = [ 108 connector for connector in connectors if keyword_filter.lower() in connector.lower() 109 ] 110 111 if connector_type_filter: 112 # Filter connectors by type ('source' or 'destination'). 113 # This assumes connector names are prefixed with 'source-' or 'destination-'. 114 connectors = [ 115 connector 116 for connector in connectors 117 if connector.startswith(f"{connector_type_filter}-") 118 ] 119 120 return sorted(connectors) 121 122 123class ConnectorInfo(BaseModel): 124 """@private Class to hold connector information.""" 125 126 connector_name: str 127 connector_metadata: ConnectorMetadata | None = None 128 documentation_url: str | None = None 129 config_spec_jsonschema: dict | None = None 130 manifest_url: str | None = None 131 132 133@mcp_tool( 134 domain="registry", 135 read_only=True, 136 idempotent=True, 137) 138def get_connector_info( 139 connector_name: Annotated[ 140 str, 141 Field(description="The name of the connector to get information for."), 142 ], 143) -> ConnectorInfo | Literal["Connector not found."]: 144 """Get the documentation URL for a connector.""" 145 if connector_name not in get_available_connectors(): 146 return "Connector not found." 147 148 connector = get_source( 149 connector_name, 150 docker_image=is_docker_installed() or False, 151 install_if_missing=False, # Defer to avoid failing entirely if it can't be installed. 152 ) 153 154 connector_metadata: ConnectorMetadata | None = None 155 with contextlib.suppress(Exception): 156 connector_metadata = get_connector_metadata(connector_name) 157 158 config_spec_jsonschema: dict[str, Any] | None = None 159 with contextlib.suppress(Exception): 160 # This requires running the connector. Install it if it isn't already installed. 161 connector.install() 162 config_spec_jsonschema = connector.config_spec 163 164 manifest_url = _DEFAULT_MANIFEST_URL.format( 165 source_name=connector_name, 166 version="latest", 167 ) 168 169 return ConnectorInfo( 170 connector_name=connector.name, 171 connector_metadata=connector_metadata, 172 documentation_url=connector.docs_url, 173 config_spec_jsonschema=config_spec_jsonschema, 174 manifest_url=manifest_url, 175 ) 176 177 178@mcp_tool( 179 domain="registry", 180 read_only=True, 181 idempotent=True, 182) 183def get_api_docs_urls( 184 connector_name: Annotated[ 185 str, 186 Field( 187 description=( 188 "The canonical connector name " 189 "(e.g., 'source-facebook-marketing', 'destination-snowflake')" 190 ) 191 ), 192 ], 193) -> list[ApiDocsUrl] | Literal["Connector not found."]: 194 """Get API documentation URLs for a connector. 195 196 This tool retrieves documentation URLs for a connector's upstream API from multiple sources: 197 - Registry metadata (documentationUrl, externalDocumentationUrls) 198 - Connector manifest.yaml file (data.externalDocumentationUrls) 199 """ 200 try: 201 return get_connector_api_docs_urls(connector_name) 202 except exc.AirbyteConnectorNotRegisteredError: 203 return "Connector not found." 204 205 206@mcp_tool( 207 domain="registry", 208 read_only=True, 209 idempotent=True, 210) 211def get_connector_version_history( 212 connector_name: Annotated[ 213 str, 214 Field( 215 description="The name of the connector (e.g., 'source-faker', 'destination-postgres')" 216 ), 217 ], 218 num_versions_to_validate: Annotated[ 219 int, 220 Field( 221 description=( 222 "Number of most recent versions to validate with registry data for accurate " 223 "release dates. Defaults to 5." 224 ), 225 default=5, 226 ), 227 ] = 5, 228 limit: Annotated[ 229 int | None, 230 Field( 231 description=( 232 "DEPRECATED: Use num_versions_to_validate instead. " 233 "Maximum number of versions to return (most recent first). " 234 "If specified, only the first N versions will be returned." 235 ), 236 default=None, 237 ), 238 ] = None, 239) -> list[ConnectorVersionInfo] | Literal["Connector not found.", "Failed to fetch changelog."]: 240 """Get version history for a connector. 241 242 This tool retrieves the version history for a connector, including: 243 - Version number 244 - Release date (from changelog, with registry override for recent versions) 245 - DockerHub URL for the version 246 - Changelog URL 247 - PR URL and title (scraped from changelog) 248 249 For the most recent N versions (default 5), release dates are fetched from the 250 registry for accuracy. For older versions, changelog dates are used. 251 252 Returns: 253 List of version information, sorted by most recent first. 254 """ 255 try: 256 versions = _get_connector_version_history( 257 connector_name=connector_name, 258 num_versions_to_validate=num_versions_to_validate, 259 ) 260 except exc.AirbyteConnectorNotRegisteredError: 261 return "Connector not found." 262 except requests.exceptions.RequestException: 263 logger.exception(f"Failed to fetch changelog for {connector_name}") 264 return "Failed to fetch changelog." 265 else: 266 if limit is not None and limit > 0: 267 return versions[:limit] 268 return versions 269 270 271def register_connector_registry_tools(app: FastMCP) -> None: 272 """@private Register tools with the FastMCP app. 273 274 This is an internal function and should not be called directly. 275 """ 276 register_tools(app, domain="registry")
logger =
<Logger airbyte.mcp (INFO)>
@mcp_tool(domain='registry', read_only=True, idempotent=True)
def
list_connectors( keyword_filter: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Filter connectors by keyword.')], connector_type_filter: Annotated[Optional[Literal['source', 'destination']], FieldInfo(annotation=NoneType, required=False, default=None, description="Filter connectors by type ('source' or 'destination').")], install_types: Annotated[Union[Literal['java', 'python', 'yaml', 'docker'], list[Literal['java', 'python', 'yaml', 'docker']], NoneType], FieldInfo(annotation=NoneType, required=False, default=None, description='\n Filter connectors by install type.\n These are not mutually exclusive:\n - "python": Connectors that can be installed as Python packages.\n - "yaml": Connectors that can be installed simply via YAML download.\n These connectors are the fastest to install and run, as they do not require any\n additional dependencies.\n - "java": Connectors that can only be installed via Java. Since PyAirbyte does not\n currently ship with a JVM, these connectors will be run via Docker instead.\n In environments where Docker is not available, these connectors may not be\n runnable.\n - "docker": Connectors that can be installed via Docker. Note that all connectors\n can be run in Docker, so this filter should generally return the same results as\n not specifying a filter.\n If no install types are specified, all connectors will be returned.\n ')]) -> list[str]:
36@mcp_tool( 37 domain="registry", 38 read_only=True, 39 idempotent=True, 40) 41def list_connectors( 42 keyword_filter: Annotated[ 43 str | None, 44 Field( 45 description="Filter connectors by keyword.", 46 default=None, 47 ), 48 ], 49 connector_type_filter: Annotated[ 50 Literal["source", "destination"] | None, 51 Field( 52 description="Filter connectors by type ('source' or 'destination').", 53 default=None, 54 ), 55 ], 56 install_types: Annotated[ 57 Literal["java", "python", "yaml", "docker"] 58 | list[Literal["java", "python", "yaml", "docker"]] 59 | None, 60 Field( 61 description=( 62 """ 63 Filter connectors by install type. 64 These are not mutually exclusive: 65 - "python": Connectors that can be installed as Python packages. 66 - "yaml": Connectors that can be installed simply via YAML download. 67 These connectors are the fastest to install and run, as they do not require any 68 additional dependencies. 69 - "java": Connectors that can only be installed via Java. Since PyAirbyte does not 70 currently ship with a JVM, these connectors will be run via Docker instead. 71 In environments where Docker is not available, these connectors may not be 72 runnable. 73 - "docker": Connectors that can be installed via Docker. Note that all connectors 74 can be run in Docker, so this filter should generally return the same results as 75 not specifying a filter. 76 If no install types are specified, all connectors will be returned. 77 """ 78 ), 79 default=None, 80 ), 81 ], 82) -> list[str]: 83 """List available Airbyte connectors with optional filtering. 84 85 Returns: 86 List of connector names. 87 """ 88 # Start with the full list of known connectors (all support Docker): 89 connectors: list[str] = get_available_connectors(install_type=InstallType.DOCKER) 90 91 install_types_list: list[str] | None = resolve_list_of_strings( 92 install_types, # type: ignore[arg-type] # Type check doesn't understand literal is str 93 ) 94 95 if install_types_list: 96 # If install_types is provided, filter connectors based on the specified install types. 97 connectors = [ 98 connector 99 for connector in connectors 100 if any( 101 connector in get_available_connectors(install_type=install_type) 102 for install_type in install_types_list 103 ) 104 ] 105 106 if keyword_filter: 107 # Filter connectors by keyword, case-insensitive. 108 connectors = [ 109 connector for connector in connectors if keyword_filter.lower() in connector.lower() 110 ] 111 112 if connector_type_filter: 113 # Filter connectors by type ('source' or 'destination'). 114 # This assumes connector names are prefixed with 'source-' or 'destination-'. 115 connectors = [ 116 connector 117 for connector in connectors 118 if connector.startswith(f"{connector_type_filter}-") 119 ] 120 121 return sorted(connectors)
List available Airbyte connectors with optional filtering.
Returns:
List of connector names.
@mcp_tool(domain='registry', read_only=True, idempotent=True)
def
get_connector_info( connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the connector to get information for.')]) -> Union[airbyte.mcp.connector_registry.ConnectorInfo, Literal['Connector not found.']]:
134@mcp_tool( 135 domain="registry", 136 read_only=True, 137 idempotent=True, 138) 139def get_connector_info( 140 connector_name: Annotated[ 141 str, 142 Field(description="The name of the connector to get information for."), 143 ], 144) -> ConnectorInfo | Literal["Connector not found."]: 145 """Get the documentation URL for a connector.""" 146 if connector_name not in get_available_connectors(): 147 return "Connector not found." 148 149 connector = get_source( 150 connector_name, 151 docker_image=is_docker_installed() or False, 152 install_if_missing=False, # Defer to avoid failing entirely if it can't be installed. 153 ) 154 155 connector_metadata: ConnectorMetadata | None = None 156 with contextlib.suppress(Exception): 157 connector_metadata = get_connector_metadata(connector_name) 158 159 config_spec_jsonschema: dict[str, Any] | None = None 160 with contextlib.suppress(Exception): 161 # This requires running the connector. Install it if it isn't already installed. 162 connector.install() 163 config_spec_jsonschema = connector.config_spec 164 165 manifest_url = _DEFAULT_MANIFEST_URL.format( 166 source_name=connector_name, 167 version="latest", 168 ) 169 170 return ConnectorInfo( 171 connector_name=connector.name, 172 connector_metadata=connector_metadata, 173 documentation_url=connector.docs_url, 174 config_spec_jsonschema=config_spec_jsonschema, 175 manifest_url=manifest_url, 176 )
Get the documentation URL for a connector.
@mcp_tool(domain='registry', read_only=True, idempotent=True)
def
get_api_docs_urls( connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description="The canonical connector name (e.g., 'source-facebook-marketing', 'destination-snowflake')")]) -> Union[list[airbyte.registry.ApiDocsUrl], Literal['Connector not found.']]:
179@mcp_tool( 180 domain="registry", 181 read_only=True, 182 idempotent=True, 183) 184def get_api_docs_urls( 185 connector_name: Annotated[ 186 str, 187 Field( 188 description=( 189 "The canonical connector name " 190 "(e.g., 'source-facebook-marketing', 'destination-snowflake')" 191 ) 192 ), 193 ], 194) -> list[ApiDocsUrl] | Literal["Connector not found."]: 195 """Get API documentation URLs for a connector. 196 197 This tool retrieves documentation URLs for a connector's upstream API from multiple sources: 198 - Registry metadata (documentationUrl, externalDocumentationUrls) 199 - Connector manifest.yaml file (data.externalDocumentationUrls) 200 """ 201 try: 202 return get_connector_api_docs_urls(connector_name) 203 except exc.AirbyteConnectorNotRegisteredError: 204 return "Connector not found."
Get API documentation URLs for a connector.
This tool retrieves documentation URLs for a connector's upstream API from multiple sources:
- Registry metadata (documentationUrl, externalDocumentationUrls)
- Connector manifest.yaml file (data.externalDocumentationUrls)
@mcp_tool(domain='registry', read_only=True, idempotent=True)
def
get_connector_version_history( connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description="The name of the connector (e.g., 'source-faker', 'destination-postgres')")], num_versions_to_validate: typing.Annotated[int, FieldInfo(annotation=NoneType, required=False, default=5, description='Number of most recent versions to validate with registry data for accurate release dates. Defaults to 5.')] = 5, limit: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='DEPRECATED: Use num_versions_to_validate instead. Maximum number of versions to return (most recent first). If specified, only the first N versions will be returned.')] = None) -> Union[list[airbyte.registry.ConnectorVersionInfo], Literal['Connector not found.', 'Failed to fetch changelog.']]:
207@mcp_tool( 208 domain="registry", 209 read_only=True, 210 idempotent=True, 211) 212def get_connector_version_history( 213 connector_name: Annotated[ 214 str, 215 Field( 216 description="The name of the connector (e.g., 'source-faker', 'destination-postgres')" 217 ), 218 ], 219 num_versions_to_validate: Annotated[ 220 int, 221 Field( 222 description=( 223 "Number of most recent versions to validate with registry data for accurate " 224 "release dates. Defaults to 5." 225 ), 226 default=5, 227 ), 228 ] = 5, 229 limit: Annotated[ 230 int | None, 231 Field( 232 description=( 233 "DEPRECATED: Use num_versions_to_validate instead. " 234 "Maximum number of versions to return (most recent first). " 235 "If specified, only the first N versions will be returned." 236 ), 237 default=None, 238 ), 239 ] = None, 240) -> list[ConnectorVersionInfo] | Literal["Connector not found.", "Failed to fetch changelog."]: 241 """Get version history for a connector. 242 243 This tool retrieves the version history for a connector, including: 244 - Version number 245 - Release date (from changelog, with registry override for recent versions) 246 - DockerHub URL for the version 247 - Changelog URL 248 - PR URL and title (scraped from changelog) 249 250 For the most recent N versions (default 5), release dates are fetched from the 251 registry for accuracy. For older versions, changelog dates are used. 252 253 Returns: 254 List of version information, sorted by most recent first. 255 """ 256 try: 257 versions = _get_connector_version_history( 258 connector_name=connector_name, 259 num_versions_to_validate=num_versions_to_validate, 260 ) 261 except exc.AirbyteConnectorNotRegisteredError: 262 return "Connector not found." 263 except requests.exceptions.RequestException: 264 logger.exception(f"Failed to fetch changelog for {connector_name}") 265 return "Failed to fetch changelog." 266 else: 267 if limit is not None and limit > 0: 268 return versions[:limit] 269 return versions
Get version history for a connector.
This tool retrieves the version history for a connector, including:
- Version number
- Release date (from changelog, with registry override for recent versions)
- DockerHub URL for the version
- Changelog URL
- PR URL and title (scraped from changelog)
For the most recent N versions (default 5), release dates are fetched from the registry for accuracy. For older versions, changelog dates are used.
Returns:
List of version information, sorted by most recent first.