airbyte.mcp.registry

Airbyte Cloud MCP operations.

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""Airbyte Cloud MCP operations."""
  3
  4# Note: Deferred type evaluation must be avoided due to FastMCP/Pydantic needing
  5# types to be available at import time for tool registration.
  6import contextlib
  7import logging
  8from typing import Annotated, Any, Literal
  9
 10import requests
 11from fastmcp import FastMCP
 12from fastmcp_extensions import mcp_tool, register_mcp_tools
 13from pydantic import BaseModel, Field
 14
 15from airbyte import exceptions as exc
 16from airbyte._util.meta import is_docker_installed
 17from airbyte.mcp._arg_resolvers import resolve_list_of_strings
 18from airbyte.registry import (
 19    _DEFAULT_MANIFEST_URL,
 20    ApiDocsUrl,
 21    ConnectorMetadata,
 22    ConnectorVersionInfo,
 23    InstallType,
 24    get_available_connectors,
 25    get_connector_api_docs_urls,
 26    get_connector_metadata,
 27)
 28from airbyte.registry import get_connector_version_history as _get_connector_version_history
 29from airbyte.sources.util import get_source
 30
 31
 32logger = logging.getLogger("airbyte.mcp")
 33
 34
 35@mcp_tool(
 36    read_only=True,
 37    idempotent=True,
 38)
 39def list_connectors(
 40    keyword_filter: Annotated[
 41        str | None,
 42        Field(
 43            description="Filter connectors by keyword.",
 44            default=None,
 45        ),
 46    ],
 47    connector_type_filter: Annotated[
 48        Literal["source", "destination"] | None,
 49        Field(
 50            description="Filter connectors by type ('source' or 'destination').",
 51            default=None,
 52        ),
 53    ],
 54    install_types: Annotated[
 55        Literal["java", "python", "yaml", "docker"]
 56        | list[Literal["java", "python", "yaml", "docker"]]
 57        | None,
 58        Field(
 59            description=(
 60                """
 61                Filter connectors by install type.
 62                These are not mutually exclusive:
 63                - "python": Connectors that can be installed as Python packages.
 64                - "yaml": Connectors that can be installed simply via YAML download.
 65                    These connectors are the fastest to install and run, as they do not require any
 66                    additional dependencies.
 67                - "java": Connectors that can only be installed via Java. Since PyAirbyte does not
 68                    currently ship with a JVM, these connectors will be run via Docker instead.
 69                    In environments where Docker is not available, these connectors may not be
 70                    runnable.
 71                - "docker": Connectors that can be installed via Docker. Note that all connectors
 72                    can be run in Docker, so this filter should generally return the same results as
 73                    not specifying a filter.
 74                If no install types are specified, all connectors will be returned.
 75                """
 76            ),
 77            default=None,
 78        ),
 79    ],
 80) -> list[str]:
 81    """List available Airbyte connectors with optional filtering.
 82
 83    Returns:
 84        List of connector names.
 85    """
 86    # Start with the full list of known connectors (all support Docker):
 87    connectors: list[str] = get_available_connectors(install_type=InstallType.ANY)
 88
 89    install_types_list: list[str] | None = resolve_list_of_strings(
 90        install_types,  # type: ignore[arg-type]  # Type check doesn't understand literal is str
 91    )
 92
 93    if install_types_list:
 94        # If install_types is provided, filter connectors based on the specified install types.
 95        connectors = [
 96            connector
 97            for connector in connectors
 98            if any(
 99                connector in get_available_connectors(install_type=install_type)
100                for install_type in install_types_list
101            )
102        ]
103
104    if keyword_filter:
105        # Filter connectors by keyword, case-insensitive.
106        connectors = [
107            connector for connector in connectors if keyword_filter.lower() in connector.lower()
108        ]
109
110    if connector_type_filter:
111        # Filter connectors by type ('source' or 'destination').
112        # This assumes connector names are prefixed with 'source-' or 'destination-'.
113        connectors = [
114            connector
115            for connector in connectors
116            if connector.startswith(f"{connector_type_filter}-")
117        ]
118
119    return sorted(connectors)
120
121
122class ConnectorInfo(BaseModel):
123    """@private Class to hold connector information."""
124
125    connector_name: str
126    connector_metadata: ConnectorMetadata | None = None
127    documentation_url: str | None = None
128    config_spec_jsonschema: dict | None = None
129    manifest_url: str | None = None
130
131
132@mcp_tool(
133    read_only=True,
134    idempotent=True,
135)
136def get_connector_info(
137    connector_name: Annotated[
138        str,
139        Field(description="The name of the connector to get information for."),
140    ],
141) -> ConnectorInfo | Literal["Connector not found."]:
142    """Get the documentation URL for a connector."""
143    if connector_name not in get_available_connectors():
144        return "Connector not found."
145
146    connector = get_source(
147        connector_name,
148        docker_image=is_docker_installed() or False,
149        install_if_missing=False,  # Defer to avoid failing entirely if it can't be installed.
150    )
151
152    connector_metadata: ConnectorMetadata | None = None
153    with contextlib.suppress(Exception):
154        connector_metadata = get_connector_metadata(connector_name)
155
156    config_spec_jsonschema: dict[str, Any] | None = None
157    with contextlib.suppress(Exception):
158        # This requires running the connector. Install it if it isn't already installed.
159        connector.install()
160        config_spec_jsonschema = connector.config_spec
161
162    manifest_url = _DEFAULT_MANIFEST_URL.format(
163        source_name=connector_name,
164        version="latest",
165    )
166
167    return ConnectorInfo(
168        connector_name=connector.name,
169        connector_metadata=connector_metadata,
170        documentation_url=connector.docs_url,
171        config_spec_jsonschema=config_spec_jsonschema,
172        manifest_url=manifest_url,
173    )
174
175
176@mcp_tool(
177    read_only=True,
178    idempotent=True,
179)
180def get_api_docs_urls(
181    connector_name: Annotated[
182        str,
183        Field(
184            description=(
185                "The canonical connector name "
186                "(e.g., 'source-facebook-marketing', 'destination-snowflake')"
187            )
188        ),
189    ],
190) -> list[ApiDocsUrl] | Literal["Connector not found."]:
191    """Get API documentation URLs for a connector.
192
193    This tool retrieves documentation URLs for a connector's upstream API from multiple sources:
194    - Registry metadata (documentationUrl, externalDocumentationUrls)
195    - Connector manifest.yaml file (data.externalDocumentationUrls)
196    """
197    try:
198        return get_connector_api_docs_urls(connector_name)
199    except exc.AirbyteConnectorNotRegisteredError:
200        return "Connector not found."
201
202
203@mcp_tool(
204    read_only=True,
205    idempotent=True,
206)
207def get_connector_version_history(
208    connector_name: Annotated[
209        str,
210        Field(
211            description="The name of the connector (e.g., 'source-faker', 'destination-postgres')"
212        ),
213    ],
214    num_versions_to_validate: Annotated[
215        int,
216        Field(
217            description=(
218                "Number of most recent versions to validate with registry data for accurate "
219                "release dates. Defaults to 5."
220            ),
221            default=5,
222        ),
223    ] = 5,
224    limit: Annotated[
225        int | None,
226        Field(
227            description=(
228                "DEPRECATED: Use num_versions_to_validate instead. "
229                "Maximum number of versions to return (most recent first). "
230                "If specified, only the first N versions will be returned."
231            ),
232            default=None,
233        ),
234    ] = None,
235) -> list[ConnectorVersionInfo] | Literal["Connector not found.", "Failed to fetch changelog."]:
236    """Get version history for a connector.
237
238    This tool retrieves the version history for a connector, including:
239    - Version number
240    - Release date (from changelog, with registry override for recent versions)
241    - DockerHub URL for the version
242    - Changelog URL
243    - PR URL and title (scraped from changelog)
244
245    For the most recent N versions (default 5), release dates are fetched from the
246    registry for accuracy. For older versions, changelog dates are used.
247
248    Returns:
249        List of version information, sorted by most recent first.
250    """
251    try:
252        versions = _get_connector_version_history(
253            connector_name=connector_name,
254            num_versions_to_validate=num_versions_to_validate,
255        )
256    except exc.AirbyteConnectorNotRegisteredError:
257        return "Connector not found."
258    except requests.exceptions.RequestException:
259        logger.exception(f"Failed to fetch changelog for {connector_name}")
260        return "Failed to fetch changelog."
261    else:
262        if limit is not None and limit > 0:
263            return versions[:limit]
264        return versions
265
266
267def register_registry_tools(app: FastMCP) -> None:
268    """Register registry tools with the FastMCP app.
269
270    Args:
271        app: FastMCP application instance
272    """
273    register_mcp_tools(app, mcp_module=__name__)
logger = <Logger airbyte.mcp (INFO)>
@mcp_tool(read_only=True, idempotent=True)
def list_connectors( keyword_filter: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Filter connectors by keyword.')], connector_type_filter: Annotated[Optional[Literal['source', 'destination']], FieldInfo(annotation=NoneType, required=False, default=None, description="Filter connectors by type ('source' or 'destination').")], install_types: Annotated[Union[Literal['java', 'python', 'yaml', 'docker'], list[Literal['java', 'python', 'yaml', 'docker']], NoneType], FieldInfo(annotation=NoneType, required=False, default=None, description='\n Filter connectors by install type.\n These are not mutually exclusive:\n - "python": Connectors that can be installed as Python packages.\n - "yaml": Connectors that can be installed simply via YAML download.\n These connectors are the fastest to install and run, as they do not require any\n additional dependencies.\n - "java": Connectors that can only be installed via Java. Since PyAirbyte does not\n currently ship with a JVM, these connectors will be run via Docker instead.\n In environments where Docker is not available, these connectors may not be\n runnable.\n - "docker": Connectors that can be installed via Docker. Note that all connectors\n can be run in Docker, so this filter should generally return the same results as\n not specifying a filter.\n If no install types are specified, all connectors will be returned.\n ')]) -> list[str]:
 36@mcp_tool(
 37    read_only=True,
 38    idempotent=True,
 39)
 40def list_connectors(
 41    keyword_filter: Annotated[
 42        str | None,
 43        Field(
 44            description="Filter connectors by keyword.",
 45            default=None,
 46        ),
 47    ],
 48    connector_type_filter: Annotated[
 49        Literal["source", "destination"] | None,
 50        Field(
 51            description="Filter connectors by type ('source' or 'destination').",
 52            default=None,
 53        ),
 54    ],
 55    install_types: Annotated[
 56        Literal["java", "python", "yaml", "docker"]
 57        | list[Literal["java", "python", "yaml", "docker"]]
 58        | None,
 59        Field(
 60            description=(
 61                """
 62                Filter connectors by install type.
 63                These are not mutually exclusive:
 64                - "python": Connectors that can be installed as Python packages.
 65                - "yaml": Connectors that can be installed simply via YAML download.
 66                    These connectors are the fastest to install and run, as they do not require any
 67                    additional dependencies.
 68                - "java": Connectors that can only be installed via Java. Since PyAirbyte does not
 69                    currently ship with a JVM, these connectors will be run via Docker instead.
 70                    In environments where Docker is not available, these connectors may not be
 71                    runnable.
 72                - "docker": Connectors that can be installed via Docker. Note that all connectors
 73                    can be run in Docker, so this filter should generally return the same results as
 74                    not specifying a filter.
 75                If no install types are specified, all connectors will be returned.
 76                """
 77            ),
 78            default=None,
 79        ),
 80    ],
 81) -> list[str]:
 82    """List available Airbyte connectors with optional filtering.
 83
 84    Returns:
 85        List of connector names.
 86    """
 87    # Start with the full list of known connectors (all support Docker):
 88    connectors: list[str] = get_available_connectors(install_type=InstallType.ANY)
 89
 90    install_types_list: list[str] | None = resolve_list_of_strings(
 91        install_types,  # type: ignore[arg-type]  # Type check doesn't understand literal is str
 92    )
 93
 94    if install_types_list:
 95        # If install_types is provided, filter connectors based on the specified install types.
 96        connectors = [
 97            connector
 98            for connector in connectors
 99            if any(
100                connector in get_available_connectors(install_type=install_type)
101                for install_type in install_types_list
102            )
103        ]
104
105    if keyword_filter:
106        # Filter connectors by keyword, case-insensitive.
107        connectors = [
108            connector for connector in connectors if keyword_filter.lower() in connector.lower()
109        ]
110
111    if connector_type_filter:
112        # Filter connectors by type ('source' or 'destination').
113        # This assumes connector names are prefixed with 'source-' or 'destination-'.
114        connectors = [
115            connector
116            for connector in connectors
117            if connector.startswith(f"{connector_type_filter}-")
118        ]
119
120    return sorted(connectors)

List available Airbyte connectors with optional filtering.

Returns:

List of connector names.

@mcp_tool(read_only=True, idempotent=True)
def get_connector_info( connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the connector to get information for.')]) -> Union[airbyte.mcp.registry.ConnectorInfo, Literal['Connector not found.']]:
133@mcp_tool(
134    read_only=True,
135    idempotent=True,
136)
137def get_connector_info(
138    connector_name: Annotated[
139        str,
140        Field(description="The name of the connector to get information for."),
141    ],
142) -> ConnectorInfo | Literal["Connector not found."]:
143    """Get the documentation URL for a connector."""
144    if connector_name not in get_available_connectors():
145        return "Connector not found."
146
147    connector = get_source(
148        connector_name,
149        docker_image=is_docker_installed() or False,
150        install_if_missing=False,  # Defer to avoid failing entirely if it can't be installed.
151    )
152
153    connector_metadata: ConnectorMetadata | None = None
154    with contextlib.suppress(Exception):
155        connector_metadata = get_connector_metadata(connector_name)
156
157    config_spec_jsonschema: dict[str, Any] | None = None
158    with contextlib.suppress(Exception):
159        # This requires running the connector. Install it if it isn't already installed.
160        connector.install()
161        config_spec_jsonschema = connector.config_spec
162
163    manifest_url = _DEFAULT_MANIFEST_URL.format(
164        source_name=connector_name,
165        version="latest",
166    )
167
168    return ConnectorInfo(
169        connector_name=connector.name,
170        connector_metadata=connector_metadata,
171        documentation_url=connector.docs_url,
172        config_spec_jsonschema=config_spec_jsonschema,
173        manifest_url=manifest_url,
174    )

Get the documentation URL for a connector.

@mcp_tool(read_only=True, idempotent=True)
def get_api_docs_urls( connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description="The canonical connector name (e.g., 'source-facebook-marketing', 'destination-snowflake')")]) -> Union[list[airbyte.registry.ApiDocsUrl], Literal['Connector not found.']]:
177@mcp_tool(
178    read_only=True,
179    idempotent=True,
180)
181def get_api_docs_urls(
182    connector_name: Annotated[
183        str,
184        Field(
185            description=(
186                "The canonical connector name "
187                "(e.g., 'source-facebook-marketing', 'destination-snowflake')"
188            )
189        ),
190    ],
191) -> list[ApiDocsUrl] | Literal["Connector not found."]:
192    """Get API documentation URLs for a connector.
193
194    This tool retrieves documentation URLs for a connector's upstream API from multiple sources:
195    - Registry metadata (documentationUrl, externalDocumentationUrls)
196    - Connector manifest.yaml file (data.externalDocumentationUrls)
197    """
198    try:
199        return get_connector_api_docs_urls(connector_name)
200    except exc.AirbyteConnectorNotRegisteredError:
201        return "Connector not found."

Get API documentation URLs for a connector.

This tool retrieves documentation URLs for a connector's upstream API from multiple sources:

  • Registry metadata (documentationUrl, externalDocumentationUrls)
  • Connector manifest.yaml file (data.externalDocumentationUrls)
@mcp_tool(read_only=True, idempotent=True)
def get_connector_version_history( connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description="The name of the connector (e.g., 'source-faker', 'destination-postgres')")], num_versions_to_validate: typing.Annotated[int, FieldInfo(annotation=NoneType, required=False, default=5, description='Number of most recent versions to validate with registry data for accurate release dates. Defaults to 5.')] = 5, limit: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='DEPRECATED: Use num_versions_to_validate instead. Maximum number of versions to return (most recent first). If specified, only the first N versions will be returned.')] = None) -> Union[list[airbyte.registry.ConnectorVersionInfo], Literal['Connector not found.', 'Failed to fetch changelog.']]:
204@mcp_tool(
205    read_only=True,
206    idempotent=True,
207)
208def get_connector_version_history(
209    connector_name: Annotated[
210        str,
211        Field(
212            description="The name of the connector (e.g., 'source-faker', 'destination-postgres')"
213        ),
214    ],
215    num_versions_to_validate: Annotated[
216        int,
217        Field(
218            description=(
219                "Number of most recent versions to validate with registry data for accurate "
220                "release dates. Defaults to 5."
221            ),
222            default=5,
223        ),
224    ] = 5,
225    limit: Annotated[
226        int | None,
227        Field(
228            description=(
229                "DEPRECATED: Use num_versions_to_validate instead. "
230                "Maximum number of versions to return (most recent first). "
231                "If specified, only the first N versions will be returned."
232            ),
233            default=None,
234        ),
235    ] = None,
236) -> list[ConnectorVersionInfo] | Literal["Connector not found.", "Failed to fetch changelog."]:
237    """Get version history for a connector.
238
239    This tool retrieves the version history for a connector, including:
240    - Version number
241    - Release date (from changelog, with registry override for recent versions)
242    - DockerHub URL for the version
243    - Changelog URL
244    - PR URL and title (scraped from changelog)
245
246    For the most recent N versions (default 5), release dates are fetched from the
247    registry for accuracy. For older versions, changelog dates are used.
248
249    Returns:
250        List of version information, sorted by most recent first.
251    """
252    try:
253        versions = _get_connector_version_history(
254            connector_name=connector_name,
255            num_versions_to_validate=num_versions_to_validate,
256        )
257    except exc.AirbyteConnectorNotRegisteredError:
258        return "Connector not found."
259    except requests.exceptions.RequestException:
260        logger.exception(f"Failed to fetch changelog for {connector_name}")
261        return "Failed to fetch changelog."
262    else:
263        if limit is not None and limit > 0:
264            return versions[:limit]
265        return versions

Get version history for a connector.

This tool retrieves the version history for a connector, including:

  • Version number
  • Release date (from changelog, with registry override for recent versions)
  • DockerHub URL for the version
  • Changelog URL
  • PR URL and title (scraped from changelog)

For the most recent N versions (default 5), release dates are fetched from the registry for accuracy. For older versions, changelog dates are used.

Returns:

List of version information, sorted by most recent first.

def register_registry_tools(app: fastmcp.server.server.FastMCP) -> None:
268def register_registry_tools(app: FastMCP) -> None:
269    """Register registry tools with the FastMCP app.
270
271    Args:
272        app: FastMCP application instance
273    """
274    register_mcp_tools(app, mcp_module=__name__)

Register registry tools with the FastMCP app.

Arguments:
  • app: FastMCP application instance