airbyte.mcp.registry
Airbyte Cloud MCP operations.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Airbyte Cloud MCP operations.""" 3 4# Note: Deferred type evaluation must be avoided due to FastMCP/Pydantic needing 5# types to be available at import time for tool registration. 6import contextlib 7import logging 8from typing import Annotated, Any, Literal 9 10import requests 11from fastmcp import FastMCP 12from fastmcp_extensions import mcp_tool, register_mcp_tools 13from pydantic import BaseModel, Field 14 15from airbyte import exceptions as exc 16from airbyte._util.meta import is_docker_installed 17from airbyte.mcp._arg_resolvers import resolve_list_of_strings 18from airbyte.registry import ( 19 _DEFAULT_MANIFEST_URL, 20 ApiDocsUrl, 21 ConnectorMetadata, 22 ConnectorVersionInfo, 23 InstallType, 24 get_available_connectors, 25 get_connector_api_docs_urls, 26 get_connector_metadata, 27) 28from airbyte.registry import get_connector_version_history as _get_connector_version_history 29from airbyte.sources.util import get_source 30 31 32logger = logging.getLogger("airbyte.mcp") 33 34 35@mcp_tool( 36 read_only=True, 37 idempotent=True, 38) 39def list_connectors( 40 keyword_filter: Annotated[ 41 str | None, 42 Field( 43 description="Filter connectors by keyword.", 44 default=None, 45 ), 46 ], 47 connector_type_filter: Annotated[ 48 Literal["source", "destination"] | None, 49 Field( 50 description="Filter connectors by type ('source' or 'destination').", 51 default=None, 52 ), 53 ], 54 install_types: Annotated[ 55 Literal["java", "python", "yaml", "docker"] 56 | list[Literal["java", "python", "yaml", "docker"]] 57 | None, 58 Field( 59 description=( 60 """ 61 Filter connectors by install type. 62 These are not mutually exclusive: 63 - "python": Connectors that can be installed as Python packages. 64 - "yaml": Connectors that can be installed simply via YAML download. 65 These connectors are the fastest to install and run, as they do not require any 66 additional dependencies. 67 - "java": Connectors that can only be installed via Java. Since PyAirbyte does not 68 currently ship with a JVM, these connectors will be run via Docker instead. 69 In environments where Docker is not available, these connectors may not be 70 runnable. 71 - "docker": Connectors that can be installed via Docker. Note that all connectors 72 can be run in Docker, so this filter should generally return the same results as 73 not specifying a filter. 74 If no install types are specified, all connectors will be returned. 75 """ 76 ), 77 default=None, 78 ), 79 ], 80) -> list[str]: 81 """List available Airbyte connectors with optional filtering. 82 83 Returns: 84 List of connector names. 85 """ 86 # Start with the full list of known connectors (all support Docker): 87 connectors: list[str] = get_available_connectors(install_type=InstallType.ANY) 88 89 install_types_list: list[str] | None = resolve_list_of_strings( 90 install_types, # type: ignore[arg-type] # Type check doesn't understand literal is str 91 ) 92 93 if install_types_list: 94 # If install_types is provided, filter connectors based on the specified install types. 95 connectors = [ 96 connector 97 for connector in connectors 98 if any( 99 connector in get_available_connectors(install_type=install_type) 100 for install_type in install_types_list 101 ) 102 ] 103 104 if keyword_filter: 105 # Filter connectors by keyword, case-insensitive. 106 connectors = [ 107 connector for connector in connectors if keyword_filter.lower() in connector.lower() 108 ] 109 110 if connector_type_filter: 111 # Filter connectors by type ('source' or 'destination'). 112 # This assumes connector names are prefixed with 'source-' or 'destination-'. 113 connectors = [ 114 connector 115 for connector in connectors 116 if connector.startswith(f"{connector_type_filter}-") 117 ] 118 119 return sorted(connectors) 120 121 122class ConnectorInfo(BaseModel): 123 """@private Class to hold connector information.""" 124 125 connector_name: str 126 connector_metadata: ConnectorMetadata | None = None 127 documentation_url: str | None = None 128 config_spec_jsonschema: dict | None = None 129 manifest_url: str | None = None 130 131 132@mcp_tool( 133 read_only=True, 134 idempotent=True, 135) 136def get_connector_info( 137 connector_name: Annotated[ 138 str, 139 Field(description="The name of the connector to get information for."), 140 ], 141) -> ConnectorInfo | Literal["Connector not found."]: 142 """Get the documentation URL for a connector.""" 143 if connector_name not in get_available_connectors(): 144 return "Connector not found." 145 146 connector = get_source( 147 connector_name, 148 docker_image=is_docker_installed() or False, 149 install_if_missing=False, # Defer to avoid failing entirely if it can't be installed. 150 ) 151 152 connector_metadata: ConnectorMetadata | None = None 153 with contextlib.suppress(Exception): 154 connector_metadata = get_connector_metadata(connector_name) 155 156 config_spec_jsonschema: dict[str, Any] | None = None 157 with contextlib.suppress(Exception): 158 # This requires running the connector. Install it if it isn't already installed. 159 connector.install() 160 config_spec_jsonschema = connector.config_spec 161 162 manifest_url = _DEFAULT_MANIFEST_URL.format( 163 source_name=connector_name, 164 version="latest", 165 ) 166 167 return ConnectorInfo( 168 connector_name=connector.name, 169 connector_metadata=connector_metadata, 170 documentation_url=connector.docs_url, 171 config_spec_jsonschema=config_spec_jsonschema, 172 manifest_url=manifest_url, 173 ) 174 175 176@mcp_tool( 177 read_only=True, 178 idempotent=True, 179) 180def get_api_docs_urls( 181 connector_name: Annotated[ 182 str, 183 Field( 184 description=( 185 "The canonical connector name " 186 "(e.g., 'source-facebook-marketing', 'destination-snowflake')" 187 ) 188 ), 189 ], 190) -> list[ApiDocsUrl] | Literal["Connector not found."]: 191 """Get API documentation URLs for a connector. 192 193 This tool retrieves documentation URLs for a connector's upstream API from multiple sources: 194 - Registry metadata (documentationUrl, externalDocumentationUrls) 195 - Connector manifest.yaml file (data.externalDocumentationUrls) 196 """ 197 try: 198 return get_connector_api_docs_urls(connector_name) 199 except exc.AirbyteConnectorNotRegisteredError: 200 return "Connector not found." 201 202 203@mcp_tool( 204 read_only=True, 205 idempotent=True, 206) 207def get_connector_version_history( 208 connector_name: Annotated[ 209 str, 210 Field( 211 description="The name of the connector (e.g., 'source-faker', 'destination-postgres')" 212 ), 213 ], 214 num_versions_to_validate: Annotated[ 215 int, 216 Field( 217 description=( 218 "Number of most recent versions to validate with registry data for accurate " 219 "release dates. Defaults to 5." 220 ), 221 default=5, 222 ), 223 ] = 5, 224 limit: Annotated[ 225 int | None, 226 Field( 227 description=( 228 "DEPRECATED: Use num_versions_to_validate instead. " 229 "Maximum number of versions to return (most recent first). " 230 "If specified, only the first N versions will be returned." 231 ), 232 default=None, 233 ), 234 ] = None, 235) -> list[ConnectorVersionInfo] | Literal["Connector not found.", "Failed to fetch changelog."]: 236 """Get version history for a connector. 237 238 This tool retrieves the version history for a connector, including: 239 - Version number 240 - Release date (from changelog, with registry override for recent versions) 241 - DockerHub URL for the version 242 - Changelog URL 243 - PR URL and title (scraped from changelog) 244 245 For the most recent N versions (default 5), release dates are fetched from the 246 registry for accuracy. For older versions, changelog dates are used. 247 248 Returns: 249 List of version information, sorted by most recent first. 250 """ 251 try: 252 versions = _get_connector_version_history( 253 connector_name=connector_name, 254 num_versions_to_validate=num_versions_to_validate, 255 ) 256 except exc.AirbyteConnectorNotRegisteredError: 257 return "Connector not found." 258 except requests.exceptions.RequestException: 259 logger.exception(f"Failed to fetch changelog for {connector_name}") 260 return "Failed to fetch changelog." 261 else: 262 if limit is not None and limit > 0: 263 return versions[:limit] 264 return versions 265 266 267def register_registry_tools(app: FastMCP) -> None: 268 """Register registry tools with the FastMCP app. 269 270 Args: 271 app: FastMCP application instance 272 """ 273 register_mcp_tools(app, mcp_module=__name__)
logger =
<Logger airbyte.mcp (INFO)>
@mcp_tool(read_only=True, idempotent=True)
def
list_connectors( keyword_filter: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Filter connectors by keyword.')], connector_type_filter: Annotated[Optional[Literal['source', 'destination']], FieldInfo(annotation=NoneType, required=False, default=None, description="Filter connectors by type ('source' or 'destination').")], install_types: Annotated[Union[Literal['java', 'python', 'yaml', 'docker'], list[Literal['java', 'python', 'yaml', 'docker']], NoneType], FieldInfo(annotation=NoneType, required=False, default=None, description='\n Filter connectors by install type.\n These are not mutually exclusive:\n - "python": Connectors that can be installed as Python packages.\n - "yaml": Connectors that can be installed simply via YAML download.\n These connectors are the fastest to install and run, as they do not require any\n additional dependencies.\n - "java": Connectors that can only be installed via Java. Since PyAirbyte does not\n currently ship with a JVM, these connectors will be run via Docker instead.\n In environments where Docker is not available, these connectors may not be\n runnable.\n - "docker": Connectors that can be installed via Docker. Note that all connectors\n can be run in Docker, so this filter should generally return the same results as\n not specifying a filter.\n If no install types are specified, all connectors will be returned.\n ')]) -> list[str]:
36@mcp_tool( 37 read_only=True, 38 idempotent=True, 39) 40def list_connectors( 41 keyword_filter: Annotated[ 42 str | None, 43 Field( 44 description="Filter connectors by keyword.", 45 default=None, 46 ), 47 ], 48 connector_type_filter: Annotated[ 49 Literal["source", "destination"] | None, 50 Field( 51 description="Filter connectors by type ('source' or 'destination').", 52 default=None, 53 ), 54 ], 55 install_types: Annotated[ 56 Literal["java", "python", "yaml", "docker"] 57 | list[Literal["java", "python", "yaml", "docker"]] 58 | None, 59 Field( 60 description=( 61 """ 62 Filter connectors by install type. 63 These are not mutually exclusive: 64 - "python": Connectors that can be installed as Python packages. 65 - "yaml": Connectors that can be installed simply via YAML download. 66 These connectors are the fastest to install and run, as they do not require any 67 additional dependencies. 68 - "java": Connectors that can only be installed via Java. Since PyAirbyte does not 69 currently ship with a JVM, these connectors will be run via Docker instead. 70 In environments where Docker is not available, these connectors may not be 71 runnable. 72 - "docker": Connectors that can be installed via Docker. Note that all connectors 73 can be run in Docker, so this filter should generally return the same results as 74 not specifying a filter. 75 If no install types are specified, all connectors will be returned. 76 """ 77 ), 78 default=None, 79 ), 80 ], 81) -> list[str]: 82 """List available Airbyte connectors with optional filtering. 83 84 Returns: 85 List of connector names. 86 """ 87 # Start with the full list of known connectors (all support Docker): 88 connectors: list[str] = get_available_connectors(install_type=InstallType.ANY) 89 90 install_types_list: list[str] | None = resolve_list_of_strings( 91 install_types, # type: ignore[arg-type] # Type check doesn't understand literal is str 92 ) 93 94 if install_types_list: 95 # If install_types is provided, filter connectors based on the specified install types. 96 connectors = [ 97 connector 98 for connector in connectors 99 if any( 100 connector in get_available_connectors(install_type=install_type) 101 for install_type in install_types_list 102 ) 103 ] 104 105 if keyword_filter: 106 # Filter connectors by keyword, case-insensitive. 107 connectors = [ 108 connector for connector in connectors if keyword_filter.lower() in connector.lower() 109 ] 110 111 if connector_type_filter: 112 # Filter connectors by type ('source' or 'destination'). 113 # This assumes connector names are prefixed with 'source-' or 'destination-'. 114 connectors = [ 115 connector 116 for connector in connectors 117 if connector.startswith(f"{connector_type_filter}-") 118 ] 119 120 return sorted(connectors)
List available Airbyte connectors with optional filtering.
Returns:
List of connector names.
@mcp_tool(read_only=True, idempotent=True)
def
get_connector_info( connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the connector to get information for.')]) -> Union[airbyte.mcp.registry.ConnectorInfo, Literal['Connector not found.']]:
133@mcp_tool( 134 read_only=True, 135 idempotent=True, 136) 137def get_connector_info( 138 connector_name: Annotated[ 139 str, 140 Field(description="The name of the connector to get information for."), 141 ], 142) -> ConnectorInfo | Literal["Connector not found."]: 143 """Get the documentation URL for a connector.""" 144 if connector_name not in get_available_connectors(): 145 return "Connector not found." 146 147 connector = get_source( 148 connector_name, 149 docker_image=is_docker_installed() or False, 150 install_if_missing=False, # Defer to avoid failing entirely if it can't be installed. 151 ) 152 153 connector_metadata: ConnectorMetadata | None = None 154 with contextlib.suppress(Exception): 155 connector_metadata = get_connector_metadata(connector_name) 156 157 config_spec_jsonschema: dict[str, Any] | None = None 158 with contextlib.suppress(Exception): 159 # This requires running the connector. Install it if it isn't already installed. 160 connector.install() 161 config_spec_jsonschema = connector.config_spec 162 163 manifest_url = _DEFAULT_MANIFEST_URL.format( 164 source_name=connector_name, 165 version="latest", 166 ) 167 168 return ConnectorInfo( 169 connector_name=connector.name, 170 connector_metadata=connector_metadata, 171 documentation_url=connector.docs_url, 172 config_spec_jsonschema=config_spec_jsonschema, 173 manifest_url=manifest_url, 174 )
Get the documentation URL for a connector.
@mcp_tool(read_only=True, idempotent=True)
def
get_api_docs_urls( connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description="The canonical connector name (e.g., 'source-facebook-marketing', 'destination-snowflake')")]) -> Union[list[airbyte.registry.ApiDocsUrl], Literal['Connector not found.']]:
177@mcp_tool( 178 read_only=True, 179 idempotent=True, 180) 181def get_api_docs_urls( 182 connector_name: Annotated[ 183 str, 184 Field( 185 description=( 186 "The canonical connector name " 187 "(e.g., 'source-facebook-marketing', 'destination-snowflake')" 188 ) 189 ), 190 ], 191) -> list[ApiDocsUrl] | Literal["Connector not found."]: 192 """Get API documentation URLs for a connector. 193 194 This tool retrieves documentation URLs for a connector's upstream API from multiple sources: 195 - Registry metadata (documentationUrl, externalDocumentationUrls) 196 - Connector manifest.yaml file (data.externalDocumentationUrls) 197 """ 198 try: 199 return get_connector_api_docs_urls(connector_name) 200 except exc.AirbyteConnectorNotRegisteredError: 201 return "Connector not found."
Get API documentation URLs for a connector.
This tool retrieves documentation URLs for a connector's upstream API from multiple sources:
- Registry metadata (documentationUrl, externalDocumentationUrls)
- Connector manifest.yaml file (data.externalDocumentationUrls)
@mcp_tool(read_only=True, idempotent=True)
def
get_connector_version_history( connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description="The name of the connector (e.g., 'source-faker', 'destination-postgres')")], num_versions_to_validate: typing.Annotated[int, FieldInfo(annotation=NoneType, required=False, default=5, description='Number of most recent versions to validate with registry data for accurate release dates. Defaults to 5.')] = 5, limit: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='DEPRECATED: Use num_versions_to_validate instead. Maximum number of versions to return (most recent first). If specified, only the first N versions will be returned.')] = None) -> Union[list[airbyte.registry.ConnectorVersionInfo], Literal['Connector not found.', 'Failed to fetch changelog.']]:
204@mcp_tool( 205 read_only=True, 206 idempotent=True, 207) 208def get_connector_version_history( 209 connector_name: Annotated[ 210 str, 211 Field( 212 description="The name of the connector (e.g., 'source-faker', 'destination-postgres')" 213 ), 214 ], 215 num_versions_to_validate: Annotated[ 216 int, 217 Field( 218 description=( 219 "Number of most recent versions to validate with registry data for accurate " 220 "release dates. Defaults to 5." 221 ), 222 default=5, 223 ), 224 ] = 5, 225 limit: Annotated[ 226 int | None, 227 Field( 228 description=( 229 "DEPRECATED: Use num_versions_to_validate instead. " 230 "Maximum number of versions to return (most recent first). " 231 "If specified, only the first N versions will be returned." 232 ), 233 default=None, 234 ), 235 ] = None, 236) -> list[ConnectorVersionInfo] | Literal["Connector not found.", "Failed to fetch changelog."]: 237 """Get version history for a connector. 238 239 This tool retrieves the version history for a connector, including: 240 - Version number 241 - Release date (from changelog, with registry override for recent versions) 242 - DockerHub URL for the version 243 - Changelog URL 244 - PR URL and title (scraped from changelog) 245 246 For the most recent N versions (default 5), release dates are fetched from the 247 registry for accuracy. For older versions, changelog dates are used. 248 249 Returns: 250 List of version information, sorted by most recent first. 251 """ 252 try: 253 versions = _get_connector_version_history( 254 connector_name=connector_name, 255 num_versions_to_validate=num_versions_to_validate, 256 ) 257 except exc.AirbyteConnectorNotRegisteredError: 258 return "Connector not found." 259 except requests.exceptions.RequestException: 260 logger.exception(f"Failed to fetch changelog for {connector_name}") 261 return "Failed to fetch changelog." 262 else: 263 if limit is not None and limit > 0: 264 return versions[:limit] 265 return versions
Get version history for a connector.
This tool retrieves the version history for a connector, including:
- Version number
- Release date (from changelog, with registry override for recent versions)
- DockerHub URL for the version
- Changelog URL
- PR URL and title (scraped from changelog)
For the most recent N versions (default 5), release dates are fetched from the registry for accuracy. For older versions, changelog dates are used.
Returns:
List of version information, sorted by most recent first.
def
register_registry_tools(app: fastmcp.server.server.FastMCP) -> None:
268def register_registry_tools(app: FastMCP) -> None: 269 """Register registry tools with the FastMCP app. 270 271 Args: 272 app: FastMCP application instance 273 """ 274 register_mcp_tools(app, mcp_module=__name__)
Register registry tools with the FastMCP app.
Arguments:
- app: FastMCP application instance