airbyte.mcp.connector_registry
Airbyte Cloud MCP operations.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Airbyte Cloud MCP operations.""" 3 4# Note: Deferred type evaluation must be avoided due to FastMCP/Pydantic needing 5# types to be available at import time for tool registration. 6import contextlib 7from typing import Annotated, Any, Literal 8 9from fastmcp import FastMCP 10from pydantic import BaseModel, Field 11 12from airbyte._executors.util import DEFAULT_MANIFEST_URL 13from airbyte._util.meta import is_docker_installed 14from airbyte.mcp._tool_utils import mcp_tool, register_tools 15from airbyte.mcp._util import resolve_list_of_strings 16from airbyte.sources import get_available_connectors 17from airbyte.sources.registry import ConnectorMetadata, get_connector_metadata 18from airbyte.sources.util import get_source 19 20 21@mcp_tool( 22 domain="registry", 23 read_only=True, 24 idempotent=True, 25) 26def list_connectors( 27 keyword_filter: Annotated[ 28 str | None, 29 Field( 30 description="Filter connectors by keyword.", 31 default=None, 32 ), 33 ], 34 connector_type_filter: Annotated[ 35 Literal["source", "destination"] | None, 36 Field( 37 description="Filter connectors by type ('source' or 'destination').", 38 default=None, 39 ), 40 ], 41 install_types: Annotated[ 42 Literal["java", "python", "yaml", "docker"] 43 | list[Literal["java", "python", "yaml", "docker"]] 44 | None, 45 Field( 46 description=( 47 """ 48 Filter connectors by install type. 49 These are not mutually exclusive: 50 - "python": Connectors that can be installed as Python packages. 51 - "yaml": Connectors that can be installed simply via YAML download. 52 These connectors are the fastest to install and run, as they do not require any 53 additional dependencies. 54 - "java": Connectors that can only be installed via Java. Since PyAirbyte does not 55 currently ship with a JVM, these connectors will be run via Docker instead. 56 In environments where Docker is not available, these connectors may not be 57 runnable. 58 - "docker": Connectors that can be installed via Docker. Note that all connectors 59 can be run in Docker, so this filter should generally return the same results as 60 not specifying a filter. 61 If no install types are specified, all connectors will be returned. 62 """ 63 ), 64 default=None, 65 ), 66 ], 67) -> list[str]: 68 """List available Airbyte connectors with optional filtering. 69 70 Returns: 71 List of connector names. 72 """ 73 connectors: list[str] = get_available_connectors() 74 75 install_types_list: list[str] | None = resolve_list_of_strings( 76 install_types, # type: ignore[arg-type] # Type check doesn't understand literal is str 77 ) 78 79 if install_types_list: 80 # If install_types is provided, filter connectors based on the specified install types. 81 connectors = [ 82 connector 83 for connector in connectors 84 if any( 85 connector in get_available_connectors(install_type=install_type) 86 for install_type in install_types_list 87 ) 88 ] 89 90 if keyword_filter: 91 # Filter connectors by keyword, case-insensitive. 92 connectors = [ 93 connector for connector in connectors if keyword_filter.lower() in connector.lower() 94 ] 95 96 if connector_type_filter: 97 # Filter connectors by type ('source' or 'destination'). 98 # This assumes connector names are prefixed with 'source-' or 'destination-'. 99 connectors = [ 100 connector 101 for connector in connectors 102 if connector.startswith(f"{connector_type_filter}-") 103 ] 104 105 return sorted(connectors) 106 107 108class ConnectorInfo(BaseModel): 109 """@private Class to hold connector information.""" 110 111 connector_name: str 112 connector_metadata: ConnectorMetadata | None = None 113 documentation_url: str | None = None 114 config_spec_jsonschema: dict | None = None 115 manifest_url: str | None = None 116 117 118@mcp_tool( 119 domain="registry", 120 read_only=True, 121 idempotent=True, 122) 123def get_connector_info( 124 connector_name: Annotated[ 125 str, 126 Field(description="The name of the connector to get information for."), 127 ], 128) -> ConnectorInfo | Literal["Connector not found."]: 129 """Get the documentation URL for a connector.""" 130 if connector_name not in get_available_connectors(): 131 return "Connector not found." 132 133 connector = get_source( 134 connector_name, 135 docker_image=is_docker_installed() or False, 136 install_if_missing=False, # Defer to avoid failing entirely if it can't be installed. 137 ) 138 139 connector_metadata: ConnectorMetadata | None = None 140 with contextlib.suppress(Exception): 141 connector_metadata = get_connector_metadata(connector_name) 142 143 config_spec_jsonschema: dict[str, Any] | None = None 144 with contextlib.suppress(Exception): 145 # This requires running the connector. Install it if it isn't already installed. 146 connector.install() 147 config_spec_jsonschema = connector.config_spec 148 149 manifest_url = DEFAULT_MANIFEST_URL.format( 150 source_name=connector_name, 151 version="latest", 152 ) 153 154 return ConnectorInfo( 155 connector_name=connector.name, 156 connector_metadata=connector_metadata, 157 documentation_url=connector.docs_url, 158 config_spec_jsonschema=config_spec_jsonschema, 159 manifest_url=manifest_url, 160 ) 161 162 163def register_connector_registry_tools(app: FastMCP) -> None: 164 """@private Register tools with the FastMCP app. 165 166 This is an internal function and should not be called directly. 167 """ 168 register_tools(app, domain="registry")
@mcp_tool(domain='registry', read_only=True, idempotent=True)
def
list_connectors( keyword_filter: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Filter connectors by keyword.')], connector_type_filter: Annotated[Optional[Literal['source', 'destination']], FieldInfo(annotation=NoneType, required=False, default=None, description="Filter connectors by type ('source' or 'destination').")], install_types: Annotated[Union[Literal['java', 'python', 'yaml', 'docker'], list[Literal['java', 'python', 'yaml', 'docker']], NoneType], FieldInfo(annotation=NoneType, required=False, default=None, description='\n Filter connectors by install type.\n These are not mutually exclusive:\n - "python": Connectors that can be installed as Python packages.\n - "yaml": Connectors that can be installed simply via YAML download.\n These connectors are the fastest to install and run, as they do not require any\n additional dependencies.\n - "java": Connectors that can only be installed via Java. Since PyAirbyte does not\n currently ship with a JVM, these connectors will be run via Docker instead.\n In environments where Docker is not available, these connectors may not be\n runnable.\n - "docker": Connectors that can be installed via Docker. Note that all connectors\n can be run in Docker, so this filter should generally return the same results as\n not specifying a filter.\n If no install types are specified, all connectors will be returned.\n ')]) -> list[str]:
22@mcp_tool( 23 domain="registry", 24 read_only=True, 25 idempotent=True, 26) 27def list_connectors( 28 keyword_filter: Annotated[ 29 str | None, 30 Field( 31 description="Filter connectors by keyword.", 32 default=None, 33 ), 34 ], 35 connector_type_filter: Annotated[ 36 Literal["source", "destination"] | None, 37 Field( 38 description="Filter connectors by type ('source' or 'destination').", 39 default=None, 40 ), 41 ], 42 install_types: Annotated[ 43 Literal["java", "python", "yaml", "docker"] 44 | list[Literal["java", "python", "yaml", "docker"]] 45 | None, 46 Field( 47 description=( 48 """ 49 Filter connectors by install type. 50 These are not mutually exclusive: 51 - "python": Connectors that can be installed as Python packages. 52 - "yaml": Connectors that can be installed simply via YAML download. 53 These connectors are the fastest to install and run, as they do not require any 54 additional dependencies. 55 - "java": Connectors that can only be installed via Java. Since PyAirbyte does not 56 currently ship with a JVM, these connectors will be run via Docker instead. 57 In environments where Docker is not available, these connectors may not be 58 runnable. 59 - "docker": Connectors that can be installed via Docker. Note that all connectors 60 can be run in Docker, so this filter should generally return the same results as 61 not specifying a filter. 62 If no install types are specified, all connectors will be returned. 63 """ 64 ), 65 default=None, 66 ), 67 ], 68) -> list[str]: 69 """List available Airbyte connectors with optional filtering. 70 71 Returns: 72 List of connector names. 73 """ 74 connectors: list[str] = get_available_connectors() 75 76 install_types_list: list[str] | None = resolve_list_of_strings( 77 install_types, # type: ignore[arg-type] # Type check doesn't understand literal is str 78 ) 79 80 if install_types_list: 81 # If install_types is provided, filter connectors based on the specified install types. 82 connectors = [ 83 connector 84 for connector in connectors 85 if any( 86 connector in get_available_connectors(install_type=install_type) 87 for install_type in install_types_list 88 ) 89 ] 90 91 if keyword_filter: 92 # Filter connectors by keyword, case-insensitive. 93 connectors = [ 94 connector for connector in connectors if keyword_filter.lower() in connector.lower() 95 ] 96 97 if connector_type_filter: 98 # Filter connectors by type ('source' or 'destination'). 99 # This assumes connector names are prefixed with 'source-' or 'destination-'. 100 connectors = [ 101 connector 102 for connector in connectors 103 if connector.startswith(f"{connector_type_filter}-") 104 ] 105 106 return sorted(connectors)
List available Airbyte connectors with optional filtering.
Returns:
List of connector names.
@mcp_tool(domain='registry', read_only=True, idempotent=True)
def
get_connector_info( connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the connector to get information for.')]) -> Union[airbyte.mcp.connector_registry.ConnectorInfo, Literal['Connector not found.']]:
119@mcp_tool( 120 domain="registry", 121 read_only=True, 122 idempotent=True, 123) 124def get_connector_info( 125 connector_name: Annotated[ 126 str, 127 Field(description="The name of the connector to get information for."), 128 ], 129) -> ConnectorInfo | Literal["Connector not found."]: 130 """Get the documentation URL for a connector.""" 131 if connector_name not in get_available_connectors(): 132 return "Connector not found." 133 134 connector = get_source( 135 connector_name, 136 docker_image=is_docker_installed() or False, 137 install_if_missing=False, # Defer to avoid failing entirely if it can't be installed. 138 ) 139 140 connector_metadata: ConnectorMetadata | None = None 141 with contextlib.suppress(Exception): 142 connector_metadata = get_connector_metadata(connector_name) 143 144 config_spec_jsonschema: dict[str, Any] | None = None 145 with contextlib.suppress(Exception): 146 # This requires running the connector. Install it if it isn't already installed. 147 connector.install() 148 config_spec_jsonschema = connector.config_spec 149 150 manifest_url = DEFAULT_MANIFEST_URL.format( 151 source_name=connector_name, 152 version="latest", 153 ) 154 155 return ConnectorInfo( 156 connector_name=connector.name, 157 connector_metadata=connector_metadata, 158 documentation_url=connector.docs_url, 159 config_spec_jsonschema=config_spec_jsonschema, 160 manifest_url=manifest_url, 161 )
Get the documentation URL for a connector.