airbyte.mcp.connector_registry

Airbyte Cloud MCP operations.

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""Airbyte Cloud MCP operations."""
  3
  4# Note: Deferred type evaluation must be avoided due to FastMCP/Pydantic needing
  5# types to be available at import time for tool registration.
  6import contextlib
  7from typing import Annotated, Any, Literal
  8
  9from fastmcp import FastMCP
 10from pydantic import BaseModel, Field
 11
 12from airbyte._executors.util import DEFAULT_MANIFEST_URL
 13from airbyte._util.meta import is_docker_installed
 14from airbyte.mcp._util import resolve_list_of_strings
 15from airbyte.sources import get_available_connectors
 16from airbyte.sources.registry import ConnectorMetadata, get_connector_metadata
 17from airbyte.sources.util import get_source
 18
 19
 20# @app.tool()  # << deferred
 21def list_connectors(
 22    keyword_filter: Annotated[
 23        str | None,
 24        Field(
 25            description="Filter connectors by keyword.",
 26            default=None,
 27        ),
 28    ],
 29    connector_type_filter: Annotated[
 30        Literal["source", "destination"] | None,
 31        Field(
 32            description="Filter connectors by type ('source' or 'destination').",
 33            default=None,
 34        ),
 35    ],
 36    install_types: Annotated[
 37        Literal["java", "python", "yaml", "docker"]
 38        | list[Literal["java", "python", "yaml", "docker"]]
 39        | None,
 40        Field(
 41            description=(
 42                """
 43                Filter connectors by install type.
 44                These are not mutually exclusive:
 45                - "python": Connectors that can be installed as Python packages.
 46                - "yaml": Connectors that can be installed simply via YAML download.
 47                    These connectors are the fastest to install and run, as they do not require any
 48                    additional dependencies.
 49                - "java": Connectors that can only be installed via Java. Since PyAirbyte does not
 50                    currently ship with a JVM, these connectors will be run via Docker instead.
 51                    In environments where Docker is not available, these connectors may not be
 52                    runnable.
 53                - "docker": Connectors that can be installed via Docker. Note that all connectors
 54                    can be run in Docker, so this filter should generally return the same results as
 55                    not specifying a filter.
 56                If no install types are specified, all connectors will be returned.
 57                """
 58            ),
 59            default=None,
 60        ),
 61    ],
 62) -> list[str]:
 63    """List available Airbyte connectors with optional filtering.
 64
 65    Returns:
 66        List of connector names.
 67    """
 68    connectors: list[str] = get_available_connectors()
 69
 70    install_types_list: list[str] | None = resolve_list_of_strings(
 71        install_types,  # type: ignore[arg-type]  # Type check doesn't understand literal is str
 72    )
 73
 74    if install_types_list:
 75        # If install_types is provided, filter connectors based on the specified install types.
 76        connectors = [
 77            connector
 78            for connector in connectors
 79            if any(
 80                connector in get_available_connectors(install_type=install_type)
 81                for install_type in install_types_list
 82            )
 83        ]
 84
 85    if keyword_filter:
 86        # Filter connectors by keyword, case-insensitive.
 87        connectors = [
 88            connector for connector in connectors if keyword_filter.lower() in connector.lower()
 89        ]
 90
 91    if connector_type_filter:
 92        # Filter connectors by type ('source' or 'destination').
 93        # This assumes connector names are prefixed with 'source-' or 'destination-'.
 94        connectors = [
 95            connector
 96            for connector in connectors
 97            if connector.startswith(f"{connector_type_filter}-")
 98        ]
 99
100    return sorted(connectors)
101
102
103class ConnectorInfo(BaseModel):
104    """@private Class to hold connector information."""
105
106    connector_name: str
107    connector_metadata: ConnectorMetadata | None = None
108    documentation_url: str | None = None
109    config_spec_jsonschema: dict | None = None
110    manifest_url: str | None = None
111
112
113# @app.tool()  # << deferred
114def get_connector_info(
115    connector_name: Annotated[
116        str,
117        Field(description="The name of the connector to get information for."),
118    ],
119) -> ConnectorInfo | Literal["Connector not found."]:
120    """Get the documentation URL for a connector."""
121    if connector_name not in get_available_connectors():
122        return "Connector not found."
123
124    connector = get_source(
125        connector_name,
126        docker_image=is_docker_installed() or False,
127        install_if_missing=False,  # Defer to avoid failing entirely if it can't be installed.
128    )
129
130    connector_metadata: ConnectorMetadata | None = None
131    with contextlib.suppress(Exception):
132        connector_metadata = get_connector_metadata(connector_name)
133
134    config_spec_jsonschema: dict[str, Any] | None = None
135    with contextlib.suppress(Exception):
136        # This requires running the connector. Install it if it isn't already installed.
137        connector.install()
138        config_spec_jsonschema = connector.config_spec
139
140    manifest_url = DEFAULT_MANIFEST_URL.format(
141        source_name=connector_name,
142        version="latest",
143    )
144
145    return ConnectorInfo(
146        connector_name=connector.name,
147        connector_metadata=connector_metadata,
148        documentation_url=connector.docs_url,
149        config_spec_jsonschema=config_spec_jsonschema,
150        manifest_url=manifest_url,
151    )
152
153
154def register_connector_registry_tools(app: FastMCP) -> None:
155    """@private Register tools with the FastMCP app.
156
157    This is an internal function and should not be called directly.
158    """
159    app.tool(list_connectors)
160    app.tool(get_connector_info)
def list_connectors( keyword_filter: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Filter connectors by keyword.')], connector_type_filter: Annotated[Optional[Literal['source', 'destination']], FieldInfo(annotation=NoneType, required=False, default=None, description="Filter connectors by type ('source' or 'destination').")], install_types: Annotated[Union[Literal['java', 'python', 'yaml', 'docker'], list[Literal['java', 'python', 'yaml', 'docker']], NoneType], FieldInfo(annotation=NoneType, required=False, default=None, description='\n Filter connectors by install type.\n These are not mutually exclusive:\n - "python": Connectors that can be installed as Python packages.\n - "yaml": Connectors that can be installed simply via YAML download.\n These connectors are the fastest to install and run, as they do not require any\n additional dependencies.\n - "java": Connectors that can only be installed via Java. Since PyAirbyte does not\n currently ship with a JVM, these connectors will be run via Docker instead.\n In environments where Docker is not available, these connectors may not be\n runnable.\n - "docker": Connectors that can be installed via Docker. Note that all connectors\n can be run in Docker, so this filter should generally return the same results as\n not specifying a filter.\n If no install types are specified, all connectors will be returned.\n ')]) -> list[str]:
 22def list_connectors(
 23    keyword_filter: Annotated[
 24        str | None,
 25        Field(
 26            description="Filter connectors by keyword.",
 27            default=None,
 28        ),
 29    ],
 30    connector_type_filter: Annotated[
 31        Literal["source", "destination"] | None,
 32        Field(
 33            description="Filter connectors by type ('source' or 'destination').",
 34            default=None,
 35        ),
 36    ],
 37    install_types: Annotated[
 38        Literal["java", "python", "yaml", "docker"]
 39        | list[Literal["java", "python", "yaml", "docker"]]
 40        | None,
 41        Field(
 42            description=(
 43                """
 44                Filter connectors by install type.
 45                These are not mutually exclusive:
 46                - "python": Connectors that can be installed as Python packages.
 47                - "yaml": Connectors that can be installed simply via YAML download.
 48                    These connectors are the fastest to install and run, as they do not require any
 49                    additional dependencies.
 50                - "java": Connectors that can only be installed via Java. Since PyAirbyte does not
 51                    currently ship with a JVM, these connectors will be run via Docker instead.
 52                    In environments where Docker is not available, these connectors may not be
 53                    runnable.
 54                - "docker": Connectors that can be installed via Docker. Note that all connectors
 55                    can be run in Docker, so this filter should generally return the same results as
 56                    not specifying a filter.
 57                If no install types are specified, all connectors will be returned.
 58                """
 59            ),
 60            default=None,
 61        ),
 62    ],
 63) -> list[str]:
 64    """List available Airbyte connectors with optional filtering.
 65
 66    Returns:
 67        List of connector names.
 68    """
 69    connectors: list[str] = get_available_connectors()
 70
 71    install_types_list: list[str] | None = resolve_list_of_strings(
 72        install_types,  # type: ignore[arg-type]  # Type check doesn't understand literal is str
 73    )
 74
 75    if install_types_list:
 76        # If install_types is provided, filter connectors based on the specified install types.
 77        connectors = [
 78            connector
 79            for connector in connectors
 80            if any(
 81                connector in get_available_connectors(install_type=install_type)
 82                for install_type in install_types_list
 83            )
 84        ]
 85
 86    if keyword_filter:
 87        # Filter connectors by keyword, case-insensitive.
 88        connectors = [
 89            connector for connector in connectors if keyword_filter.lower() in connector.lower()
 90        ]
 91
 92    if connector_type_filter:
 93        # Filter connectors by type ('source' or 'destination').
 94        # This assumes connector names are prefixed with 'source-' or 'destination-'.
 95        connectors = [
 96            connector
 97            for connector in connectors
 98            if connector.startswith(f"{connector_type_filter}-")
 99        ]
100
101    return sorted(connectors)

List available Airbyte connectors with optional filtering.

Returns:

List of connector names.

def get_connector_info( connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the connector to get information for.')]) -> Union[airbyte.mcp.connector_registry.ConnectorInfo, Literal['Connector not found.']]:
115def get_connector_info(
116    connector_name: Annotated[
117        str,
118        Field(description="The name of the connector to get information for."),
119    ],
120) -> ConnectorInfo | Literal["Connector not found."]:
121    """Get the documentation URL for a connector."""
122    if connector_name not in get_available_connectors():
123        return "Connector not found."
124
125    connector = get_source(
126        connector_name,
127        docker_image=is_docker_installed() or False,
128        install_if_missing=False,  # Defer to avoid failing entirely if it can't be installed.
129    )
130
131    connector_metadata: ConnectorMetadata | None = None
132    with contextlib.suppress(Exception):
133        connector_metadata = get_connector_metadata(connector_name)
134
135    config_spec_jsonschema: dict[str, Any] | None = None
136    with contextlib.suppress(Exception):
137        # This requires running the connector. Install it if it isn't already installed.
138        connector.install()
139        config_spec_jsonschema = connector.config_spec
140
141    manifest_url = DEFAULT_MANIFEST_URL.format(
142        source_name=connector_name,
143        version="latest",
144    )
145
146    return ConnectorInfo(
147        connector_name=connector.name,
148        connector_metadata=connector_metadata,
149        documentation_url=connector.docs_url,
150        config_spec_jsonschema=config_spec_jsonschema,
151        manifest_url=manifest_url,
152    )

Get the documentation URL for a connector.