airbyte.mcp.connector_registry
Airbyte Cloud MCP operations.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Airbyte Cloud MCP operations.""" 3 4# Note: Deferred type evaluation must be avoided due to FastMCP/Pydantic needing 5# types to be available at import time for tool registration. 6import contextlib 7from typing import Annotated, Any, Literal 8 9from fastmcp import FastMCP 10from pydantic import BaseModel, Field 11 12from airbyte._executors.util import DEFAULT_MANIFEST_URL 13from airbyte._util.meta import is_docker_installed 14from airbyte.mcp._util import resolve_list_of_strings 15from airbyte.sources import get_available_connectors 16from airbyte.sources.registry import ConnectorMetadata, get_connector_metadata 17from airbyte.sources.util import get_source 18 19 20# @app.tool() # << deferred 21def list_connectors( 22 keyword_filter: Annotated[ 23 str | None, 24 Field( 25 description="Filter connectors by keyword.", 26 default=None, 27 ), 28 ], 29 connector_type_filter: Annotated[ 30 Literal["source", "destination"] | None, 31 Field( 32 description="Filter connectors by type ('source' or 'destination').", 33 default=None, 34 ), 35 ], 36 install_types: Annotated[ 37 Literal["java", "python", "yaml", "docker"] 38 | list[Literal["java", "python", "yaml", "docker"]] 39 | None, 40 Field( 41 description=( 42 """ 43 Filter connectors by install type. 44 These are not mutually exclusive: 45 - "python": Connectors that can be installed as Python packages. 46 - "yaml": Connectors that can be installed simply via YAML download. 47 These connectors are the fastest to install and run, as they do not require any 48 additional dependencies. 49 - "java": Connectors that can only be installed via Java. Since PyAirbyte does not 50 currently ship with a JVM, these connectors will be run via Docker instead. 51 In environments where Docker is not available, these connectors may not be 52 runnable. 53 - "docker": Connectors that can be installed via Docker. Note that all connectors 54 can be run in Docker, so this filter should generally return the same results as 55 not specifying a filter. 56 If no install types are specified, all connectors will be returned. 57 """ 58 ), 59 default=None, 60 ), 61 ], 62) -> list[str]: 63 """List available Airbyte connectors with optional filtering. 64 65 Returns: 66 List of connector names. 67 """ 68 connectors: list[str] = get_available_connectors() 69 70 install_types_list: list[str] | None = resolve_list_of_strings( 71 install_types, # type: ignore[arg-type] # Type check doesn't understand literal is str 72 ) 73 74 if install_types_list: 75 # If install_types is provided, filter connectors based on the specified install types. 76 connectors = [ 77 connector 78 for connector in connectors 79 if any( 80 connector in get_available_connectors(install_type=install_type) 81 for install_type in install_types_list 82 ) 83 ] 84 85 if keyword_filter: 86 # Filter connectors by keyword, case-insensitive. 87 connectors = [ 88 connector for connector in connectors if keyword_filter.lower() in connector.lower() 89 ] 90 91 if connector_type_filter: 92 # Filter connectors by type ('source' or 'destination'). 93 # This assumes connector names are prefixed with 'source-' or 'destination-'. 94 connectors = [ 95 connector 96 for connector in connectors 97 if connector.startswith(f"{connector_type_filter}-") 98 ] 99 100 return sorted(connectors) 101 102 103class ConnectorInfo(BaseModel): 104 """@private Class to hold connector information.""" 105 106 connector_name: str 107 connector_metadata: ConnectorMetadata | None = None 108 documentation_url: str | None = None 109 config_spec_jsonschema: dict | None = None 110 manifest_url: str | None = None 111 112 113# @app.tool() # << deferred 114def get_connector_info( 115 connector_name: Annotated[ 116 str, 117 Field(description="The name of the connector to get information for."), 118 ], 119) -> ConnectorInfo | Literal["Connector not found."]: 120 """Get the documentation URL for a connector.""" 121 if connector_name not in get_available_connectors(): 122 return "Connector not found." 123 124 connector = get_source( 125 connector_name, 126 docker_image=is_docker_installed() or False, 127 install_if_missing=False, # Defer to avoid failing entirely if it can't be installed. 128 ) 129 130 connector_metadata: ConnectorMetadata | None = None 131 with contextlib.suppress(Exception): 132 connector_metadata = get_connector_metadata(connector_name) 133 134 config_spec_jsonschema: dict[str, Any] | None = None 135 with contextlib.suppress(Exception): 136 # This requires running the connector. Install it if it isn't already installed. 137 connector.install() 138 config_spec_jsonschema = connector.config_spec 139 140 manifest_url = DEFAULT_MANIFEST_URL.format( 141 source_name=connector_name, 142 version="latest", 143 ) 144 145 return ConnectorInfo( 146 connector_name=connector.name, 147 connector_metadata=connector_metadata, 148 documentation_url=connector.docs_url, 149 config_spec_jsonschema=config_spec_jsonschema, 150 manifest_url=manifest_url, 151 ) 152 153 154def register_connector_registry_tools(app: FastMCP) -> None: 155 """@private Register tools with the FastMCP app. 156 157 This is an internal function and should not be called directly. 158 """ 159 app.tool(list_connectors) 160 app.tool(get_connector_info)
def
list_connectors( keyword_filter: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Filter connectors by keyword.')], connector_type_filter: Annotated[Optional[Literal['source', 'destination']], FieldInfo(annotation=NoneType, required=False, default=None, description="Filter connectors by type ('source' or 'destination').")], install_types: Annotated[Union[Literal['java', 'python', 'yaml', 'docker'], list[Literal['java', 'python', 'yaml', 'docker']], NoneType], FieldInfo(annotation=NoneType, required=False, default=None, description='\n Filter connectors by install type.\n These are not mutually exclusive:\n - "python": Connectors that can be installed as Python packages.\n - "yaml": Connectors that can be installed simply via YAML download.\n These connectors are the fastest to install and run, as they do not require any\n additional dependencies.\n - "java": Connectors that can only be installed via Java. Since PyAirbyte does not\n currently ship with a JVM, these connectors will be run via Docker instead.\n In environments where Docker is not available, these connectors may not be\n runnable.\n - "docker": Connectors that can be installed via Docker. Note that all connectors\n can be run in Docker, so this filter should generally return the same results as\n not specifying a filter.\n If no install types are specified, all connectors will be returned.\n ')]) -> list[str]:
22def list_connectors( 23 keyword_filter: Annotated[ 24 str | None, 25 Field( 26 description="Filter connectors by keyword.", 27 default=None, 28 ), 29 ], 30 connector_type_filter: Annotated[ 31 Literal["source", "destination"] | None, 32 Field( 33 description="Filter connectors by type ('source' or 'destination').", 34 default=None, 35 ), 36 ], 37 install_types: Annotated[ 38 Literal["java", "python", "yaml", "docker"] 39 | list[Literal["java", "python", "yaml", "docker"]] 40 | None, 41 Field( 42 description=( 43 """ 44 Filter connectors by install type. 45 These are not mutually exclusive: 46 - "python": Connectors that can be installed as Python packages. 47 - "yaml": Connectors that can be installed simply via YAML download. 48 These connectors are the fastest to install and run, as they do not require any 49 additional dependencies. 50 - "java": Connectors that can only be installed via Java. Since PyAirbyte does not 51 currently ship with a JVM, these connectors will be run via Docker instead. 52 In environments where Docker is not available, these connectors may not be 53 runnable. 54 - "docker": Connectors that can be installed via Docker. Note that all connectors 55 can be run in Docker, so this filter should generally return the same results as 56 not specifying a filter. 57 If no install types are specified, all connectors will be returned. 58 """ 59 ), 60 default=None, 61 ), 62 ], 63) -> list[str]: 64 """List available Airbyte connectors with optional filtering. 65 66 Returns: 67 List of connector names. 68 """ 69 connectors: list[str] = get_available_connectors() 70 71 install_types_list: list[str] | None = resolve_list_of_strings( 72 install_types, # type: ignore[arg-type] # Type check doesn't understand literal is str 73 ) 74 75 if install_types_list: 76 # If install_types is provided, filter connectors based on the specified install types. 77 connectors = [ 78 connector 79 for connector in connectors 80 if any( 81 connector in get_available_connectors(install_type=install_type) 82 for install_type in install_types_list 83 ) 84 ] 85 86 if keyword_filter: 87 # Filter connectors by keyword, case-insensitive. 88 connectors = [ 89 connector for connector in connectors if keyword_filter.lower() in connector.lower() 90 ] 91 92 if connector_type_filter: 93 # Filter connectors by type ('source' or 'destination'). 94 # This assumes connector names are prefixed with 'source-' or 'destination-'. 95 connectors = [ 96 connector 97 for connector in connectors 98 if connector.startswith(f"{connector_type_filter}-") 99 ] 100 101 return sorted(connectors)
List available Airbyte connectors with optional filtering.
Returns:
List of connector names.
def
get_connector_info( connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the connector to get information for.')]) -> Union[airbyte.mcp.connector_registry.ConnectorInfo, Literal['Connector not found.']]:
115def get_connector_info( 116 connector_name: Annotated[ 117 str, 118 Field(description="The name of the connector to get information for."), 119 ], 120) -> ConnectorInfo | Literal["Connector not found."]: 121 """Get the documentation URL for a connector.""" 122 if connector_name not in get_available_connectors(): 123 return "Connector not found." 124 125 connector = get_source( 126 connector_name, 127 docker_image=is_docker_installed() or False, 128 install_if_missing=False, # Defer to avoid failing entirely if it can't be installed. 129 ) 130 131 connector_metadata: ConnectorMetadata | None = None 132 with contextlib.suppress(Exception): 133 connector_metadata = get_connector_metadata(connector_name) 134 135 config_spec_jsonschema: dict[str, Any] | None = None 136 with contextlib.suppress(Exception): 137 # This requires running the connector. Install it if it isn't already installed. 138 connector.install() 139 config_spec_jsonschema = connector.config_spec 140 141 manifest_url = DEFAULT_MANIFEST_URL.format( 142 source_name=connector_name, 143 version="latest", 144 ) 145 146 return ConnectorInfo( 147 connector_name=connector.name, 148 connector_metadata=connector_metadata, 149 documentation_url=connector.docs_url, 150 config_spec_jsonschema=config_spec_jsonschema, 151 manifest_url=manifest_url, 152 )
Get the documentation URL for a connector.