airbyte.mcp.connector_registry

Airbyte Cloud MCP operations.

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""Airbyte Cloud MCP operations."""
  3
  4# Note: Deferred type evaluation must be avoided due to FastMCP/Pydantic needing
  5# types to be available at import time for tool registration.
  6import contextlib
  7import logging
  8from typing import Annotated, Any, Literal
  9
 10import requests
 11from fastmcp import FastMCP
 12from pydantic import BaseModel, Field
 13
 14from airbyte import exceptions as exc
 15from airbyte._util.meta import is_docker_installed
 16from airbyte.mcp._tool_utils import mcp_tool, register_tools
 17from airbyte.mcp._util import resolve_list_of_strings
 18from airbyte.registry import (
 19    _DEFAULT_MANIFEST_URL,
 20    ApiDocsUrl,
 21    ConnectorMetadata,
 22    ConnectorVersionInfo,
 23    InstallType,
 24    get_available_connectors,
 25    get_connector_api_docs_urls,
 26    get_connector_metadata,
 27)
 28from airbyte.registry import get_connector_version_history as _get_connector_version_history
 29from airbyte.sources.util import get_source
 30
 31
 32logger = logging.getLogger("airbyte.mcp")
 33
 34
 35@mcp_tool(
 36    domain="registry",
 37    read_only=True,
 38    idempotent=True,
 39)
 40def list_connectors(
 41    keyword_filter: Annotated[
 42        str | None,
 43        Field(
 44            description="Filter connectors by keyword.",
 45            default=None,
 46        ),
 47    ],
 48    connector_type_filter: Annotated[
 49        Literal["source", "destination"] | None,
 50        Field(
 51            description="Filter connectors by type ('source' or 'destination').",
 52            default=None,
 53        ),
 54    ],
 55    install_types: Annotated[
 56        Literal["java", "python", "yaml", "docker"]
 57        | list[Literal["java", "python", "yaml", "docker"]]
 58        | None,
 59        Field(
 60            description=(
 61                """
 62                Filter connectors by install type.
 63                These are not mutually exclusive:
 64                - "python": Connectors that can be installed as Python packages.
 65                - "yaml": Connectors that can be installed simply via YAML download.
 66                    These connectors are the fastest to install and run, as they do not require any
 67                    additional dependencies.
 68                - "java": Connectors that can only be installed via Java. Since PyAirbyte does not
 69                    currently ship with a JVM, these connectors will be run via Docker instead.
 70                    In environments where Docker is not available, these connectors may not be
 71                    runnable.
 72                - "docker": Connectors that can be installed via Docker. Note that all connectors
 73                    can be run in Docker, so this filter should generally return the same results as
 74                    not specifying a filter.
 75                If no install types are specified, all connectors will be returned.
 76                """
 77            ),
 78            default=None,
 79        ),
 80    ],
 81) -> list[str]:
 82    """List available Airbyte connectors with optional filtering.
 83
 84    Returns:
 85        List of connector names.
 86    """
 87    # Start with the full list of known connectors (all support Docker):
 88    connectors: list[str] = get_available_connectors(install_type=InstallType.DOCKER)
 89
 90    install_types_list: list[str] | None = resolve_list_of_strings(
 91        install_types,  # type: ignore[arg-type]  # Type check doesn't understand literal is str
 92    )
 93
 94    if install_types_list:
 95        # If install_types is provided, filter connectors based on the specified install types.
 96        connectors = [
 97            connector
 98            for connector in connectors
 99            if any(
100                connector in get_available_connectors(install_type=install_type)
101                for install_type in install_types_list
102            )
103        ]
104
105    if keyword_filter:
106        # Filter connectors by keyword, case-insensitive.
107        connectors = [
108            connector for connector in connectors if keyword_filter.lower() in connector.lower()
109        ]
110
111    if connector_type_filter:
112        # Filter connectors by type ('source' or 'destination').
113        # This assumes connector names are prefixed with 'source-' or 'destination-'.
114        connectors = [
115            connector
116            for connector in connectors
117            if connector.startswith(f"{connector_type_filter}-")
118        ]
119
120    return sorted(connectors)
121
122
123class ConnectorInfo(BaseModel):
124    """@private Class to hold connector information."""
125
126    connector_name: str
127    connector_metadata: ConnectorMetadata | None = None
128    documentation_url: str | None = None
129    config_spec_jsonschema: dict | None = None
130    manifest_url: str | None = None
131
132
133@mcp_tool(
134    domain="registry",
135    read_only=True,
136    idempotent=True,
137)
138def get_connector_info(
139    connector_name: Annotated[
140        str,
141        Field(description="The name of the connector to get information for."),
142    ],
143) -> ConnectorInfo | Literal["Connector not found."]:
144    """Get the documentation URL for a connector."""
145    if connector_name not in get_available_connectors():
146        return "Connector not found."
147
148    connector = get_source(
149        connector_name,
150        docker_image=is_docker_installed() or False,
151        install_if_missing=False,  # Defer to avoid failing entirely if it can't be installed.
152    )
153
154    connector_metadata: ConnectorMetadata | None = None
155    with contextlib.suppress(Exception):
156        connector_metadata = get_connector_metadata(connector_name)
157
158    config_spec_jsonschema: dict[str, Any] | None = None
159    with contextlib.suppress(Exception):
160        # This requires running the connector. Install it if it isn't already installed.
161        connector.install()
162        config_spec_jsonschema = connector.config_spec
163
164    manifest_url = _DEFAULT_MANIFEST_URL.format(
165        source_name=connector_name,
166        version="latest",
167    )
168
169    return ConnectorInfo(
170        connector_name=connector.name,
171        connector_metadata=connector_metadata,
172        documentation_url=connector.docs_url,
173        config_spec_jsonschema=config_spec_jsonschema,
174        manifest_url=manifest_url,
175    )
176
177
178@mcp_tool(
179    domain="registry",
180    read_only=True,
181    idempotent=True,
182)
183def get_api_docs_urls(
184    connector_name: Annotated[
185        str,
186        Field(
187            description=(
188                "The canonical connector name "
189                "(e.g., 'source-facebook-marketing', 'destination-snowflake')"
190            )
191        ),
192    ],
193) -> list[ApiDocsUrl] | Literal["Connector not found."]:
194    """Get API documentation URLs for a connector.
195
196    This tool retrieves documentation URLs for a connector's upstream API from multiple sources:
197    - Registry metadata (documentationUrl, externalDocumentationUrls)
198    - Connector manifest.yaml file (data.externalDocumentationUrls)
199    """
200    try:
201        return get_connector_api_docs_urls(connector_name)
202    except exc.AirbyteConnectorNotRegisteredError:
203        return "Connector not found."
204
205
206@mcp_tool(
207    domain="registry",
208    read_only=True,
209    idempotent=True,
210)
211def get_connector_version_history(
212    connector_name: Annotated[
213        str,
214        Field(
215            description="The name of the connector (e.g., 'source-faker', 'destination-postgres')"
216        ),
217    ],
218    num_versions_to_validate: Annotated[
219        int,
220        Field(
221            description=(
222                "Number of most recent versions to validate with registry data for accurate "
223                "release dates. Defaults to 5."
224            ),
225            default=5,
226        ),
227    ] = 5,
228    limit: Annotated[
229        int | None,
230        Field(
231            description=(
232                "DEPRECATED: Use num_versions_to_validate instead. "
233                "Maximum number of versions to return (most recent first). "
234                "If specified, only the first N versions will be returned."
235            ),
236            default=None,
237        ),
238    ] = None,
239) -> list[ConnectorVersionInfo] | Literal["Connector not found.", "Failed to fetch changelog."]:
240    """Get version history for a connector.
241
242    This tool retrieves the version history for a connector, including:
243    - Version number
244    - Release date (from changelog, with registry override for recent versions)
245    - DockerHub URL for the version
246    - Changelog URL
247    - PR URL and title (scraped from changelog)
248
249    For the most recent N versions (default 5), release dates are fetched from the
250    registry for accuracy. For older versions, changelog dates are used.
251
252    Returns:
253        List of version information, sorted by most recent first.
254    """
255    try:
256        versions = _get_connector_version_history(
257            connector_name=connector_name,
258            num_versions_to_validate=num_versions_to_validate,
259        )
260    except exc.AirbyteConnectorNotRegisteredError:
261        return "Connector not found."
262    except requests.exceptions.RequestException:
263        logger.exception(f"Failed to fetch changelog for {connector_name}")
264        return "Failed to fetch changelog."
265    else:
266        if limit is not None and limit > 0:
267            return versions[:limit]
268        return versions
269
270
271def register_connector_registry_tools(app: FastMCP) -> None:
272    """@private Register tools with the FastMCP app.
273
274    This is an internal function and should not be called directly.
275    """
276    register_tools(app, domain="registry")
logger = <Logger airbyte.mcp (INFO)>
@mcp_tool(domain='registry', read_only=True, idempotent=True)
def list_connectors( keyword_filter: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Filter connectors by keyword.')], connector_type_filter: Annotated[Optional[Literal['source', 'destination']], FieldInfo(annotation=NoneType, required=False, default=None, description="Filter connectors by type ('source' or 'destination').")], install_types: Annotated[Union[Literal['java', 'python', 'yaml', 'docker'], list[Literal['java', 'python', 'yaml', 'docker']], NoneType], FieldInfo(annotation=NoneType, required=False, default=None, description='\n Filter connectors by install type.\n These are not mutually exclusive:\n - "python": Connectors that can be installed as Python packages.\n - "yaml": Connectors that can be installed simply via YAML download.\n These connectors are the fastest to install and run, as they do not require any\n additional dependencies.\n - "java": Connectors that can only be installed via Java. Since PyAirbyte does not\n currently ship with a JVM, these connectors will be run via Docker instead.\n In environments where Docker is not available, these connectors may not be\n runnable.\n - "docker": Connectors that can be installed via Docker. Note that all connectors\n can be run in Docker, so this filter should generally return the same results as\n not specifying a filter.\n If no install types are specified, all connectors will be returned.\n ')]) -> list[str]:
 36@mcp_tool(
 37    domain="registry",
 38    read_only=True,
 39    idempotent=True,
 40)
 41def list_connectors(
 42    keyword_filter: Annotated[
 43        str | None,
 44        Field(
 45            description="Filter connectors by keyword.",
 46            default=None,
 47        ),
 48    ],
 49    connector_type_filter: Annotated[
 50        Literal["source", "destination"] | None,
 51        Field(
 52            description="Filter connectors by type ('source' or 'destination').",
 53            default=None,
 54        ),
 55    ],
 56    install_types: Annotated[
 57        Literal["java", "python", "yaml", "docker"]
 58        | list[Literal["java", "python", "yaml", "docker"]]
 59        | None,
 60        Field(
 61            description=(
 62                """
 63                Filter connectors by install type.
 64                These are not mutually exclusive:
 65                - "python": Connectors that can be installed as Python packages.
 66                - "yaml": Connectors that can be installed simply via YAML download.
 67                    These connectors are the fastest to install and run, as they do not require any
 68                    additional dependencies.
 69                - "java": Connectors that can only be installed via Java. Since PyAirbyte does not
 70                    currently ship with a JVM, these connectors will be run via Docker instead.
 71                    In environments where Docker is not available, these connectors may not be
 72                    runnable.
 73                - "docker": Connectors that can be installed via Docker. Note that all connectors
 74                    can be run in Docker, so this filter should generally return the same results as
 75                    not specifying a filter.
 76                If no install types are specified, all connectors will be returned.
 77                """
 78            ),
 79            default=None,
 80        ),
 81    ],
 82) -> list[str]:
 83    """List available Airbyte connectors with optional filtering.
 84
 85    Returns:
 86        List of connector names.
 87    """
 88    # Start with the full list of known connectors (all support Docker):
 89    connectors: list[str] = get_available_connectors(install_type=InstallType.DOCKER)
 90
 91    install_types_list: list[str] | None = resolve_list_of_strings(
 92        install_types,  # type: ignore[arg-type]  # Type check doesn't understand literal is str
 93    )
 94
 95    if install_types_list:
 96        # If install_types is provided, filter connectors based on the specified install types.
 97        connectors = [
 98            connector
 99            for connector in connectors
100            if any(
101                connector in get_available_connectors(install_type=install_type)
102                for install_type in install_types_list
103            )
104        ]
105
106    if keyword_filter:
107        # Filter connectors by keyword, case-insensitive.
108        connectors = [
109            connector for connector in connectors if keyword_filter.lower() in connector.lower()
110        ]
111
112    if connector_type_filter:
113        # Filter connectors by type ('source' or 'destination').
114        # This assumes connector names are prefixed with 'source-' or 'destination-'.
115        connectors = [
116            connector
117            for connector in connectors
118            if connector.startswith(f"{connector_type_filter}-")
119        ]
120
121    return sorted(connectors)

List available Airbyte connectors with optional filtering.

Returns:

List of connector names.

@mcp_tool(domain='registry', read_only=True, idempotent=True)
def get_connector_info( connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the connector to get information for.')]) -> Union[airbyte.mcp.connector_registry.ConnectorInfo, Literal['Connector not found.']]:
134@mcp_tool(
135    domain="registry",
136    read_only=True,
137    idempotent=True,
138)
139def get_connector_info(
140    connector_name: Annotated[
141        str,
142        Field(description="The name of the connector to get information for."),
143    ],
144) -> ConnectorInfo | Literal["Connector not found."]:
145    """Get the documentation URL for a connector."""
146    if connector_name not in get_available_connectors():
147        return "Connector not found."
148
149    connector = get_source(
150        connector_name,
151        docker_image=is_docker_installed() or False,
152        install_if_missing=False,  # Defer to avoid failing entirely if it can't be installed.
153    )
154
155    connector_metadata: ConnectorMetadata | None = None
156    with contextlib.suppress(Exception):
157        connector_metadata = get_connector_metadata(connector_name)
158
159    config_spec_jsonschema: dict[str, Any] | None = None
160    with contextlib.suppress(Exception):
161        # This requires running the connector. Install it if it isn't already installed.
162        connector.install()
163        config_spec_jsonschema = connector.config_spec
164
165    manifest_url = _DEFAULT_MANIFEST_URL.format(
166        source_name=connector_name,
167        version="latest",
168    )
169
170    return ConnectorInfo(
171        connector_name=connector.name,
172        connector_metadata=connector_metadata,
173        documentation_url=connector.docs_url,
174        config_spec_jsonschema=config_spec_jsonschema,
175        manifest_url=manifest_url,
176    )

Get the documentation URL for a connector.

@mcp_tool(domain='registry', read_only=True, idempotent=True)
def get_api_docs_urls( connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description="The canonical connector name (e.g., 'source-facebook-marketing', 'destination-snowflake')")]) -> Union[list[airbyte.registry.ApiDocsUrl], Literal['Connector not found.']]:
179@mcp_tool(
180    domain="registry",
181    read_only=True,
182    idempotent=True,
183)
184def get_api_docs_urls(
185    connector_name: Annotated[
186        str,
187        Field(
188            description=(
189                "The canonical connector name "
190                "(e.g., 'source-facebook-marketing', 'destination-snowflake')"
191            )
192        ),
193    ],
194) -> list[ApiDocsUrl] | Literal["Connector not found."]:
195    """Get API documentation URLs for a connector.
196
197    This tool retrieves documentation URLs for a connector's upstream API from multiple sources:
198    - Registry metadata (documentationUrl, externalDocumentationUrls)
199    - Connector manifest.yaml file (data.externalDocumentationUrls)
200    """
201    try:
202        return get_connector_api_docs_urls(connector_name)
203    except exc.AirbyteConnectorNotRegisteredError:
204        return "Connector not found."

Get API documentation URLs for a connector.

This tool retrieves documentation URLs for a connector's upstream API from multiple sources:

  • Registry metadata (documentationUrl, externalDocumentationUrls)
  • Connector manifest.yaml file (data.externalDocumentationUrls)
@mcp_tool(domain='registry', read_only=True, idempotent=True)
def get_connector_version_history( connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description="The name of the connector (e.g., 'source-faker', 'destination-postgres')")], num_versions_to_validate: typing.Annotated[int, FieldInfo(annotation=NoneType, required=False, default=5, description='Number of most recent versions to validate with registry data for accurate release dates. Defaults to 5.')] = 5, limit: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='DEPRECATED: Use num_versions_to_validate instead. Maximum number of versions to return (most recent first). If specified, only the first N versions will be returned.')] = None) -> Union[list[airbyte.registry.ConnectorVersionInfo], Literal['Connector not found.', 'Failed to fetch changelog.']]:
207@mcp_tool(
208    domain="registry",
209    read_only=True,
210    idempotent=True,
211)
212def get_connector_version_history(
213    connector_name: Annotated[
214        str,
215        Field(
216            description="The name of the connector (e.g., 'source-faker', 'destination-postgres')"
217        ),
218    ],
219    num_versions_to_validate: Annotated[
220        int,
221        Field(
222            description=(
223                "Number of most recent versions to validate with registry data for accurate "
224                "release dates. Defaults to 5."
225            ),
226            default=5,
227        ),
228    ] = 5,
229    limit: Annotated[
230        int | None,
231        Field(
232            description=(
233                "DEPRECATED: Use num_versions_to_validate instead. "
234                "Maximum number of versions to return (most recent first). "
235                "If specified, only the first N versions will be returned."
236            ),
237            default=None,
238        ),
239    ] = None,
240) -> list[ConnectorVersionInfo] | Literal["Connector not found.", "Failed to fetch changelog."]:
241    """Get version history for a connector.
242
243    This tool retrieves the version history for a connector, including:
244    - Version number
245    - Release date (from changelog, with registry override for recent versions)
246    - DockerHub URL for the version
247    - Changelog URL
248    - PR URL and title (scraped from changelog)
249
250    For the most recent N versions (default 5), release dates are fetched from the
251    registry for accuracy. For older versions, changelog dates are used.
252
253    Returns:
254        List of version information, sorted by most recent first.
255    """
256    try:
257        versions = _get_connector_version_history(
258            connector_name=connector_name,
259            num_versions_to_validate=num_versions_to_validate,
260        )
261    except exc.AirbyteConnectorNotRegisteredError:
262        return "Connector not found."
263    except requests.exceptions.RequestException:
264        logger.exception(f"Failed to fetch changelog for {connector_name}")
265        return "Failed to fetch changelog."
266    else:
267        if limit is not None and limit > 0:
268            return versions[:limit]
269        return versions

Get version history for a connector.

This tool retrieves the version history for a connector, including:

  • Version number
  • Release date (from changelog, with registry override for recent versions)
  • DockerHub URL for the version
  • Changelog URL
  • PR URL and title (scraped from changelog)

For the most recent N versions (default 5), release dates are fetched from the registry for accuracy. For older versions, changelog dates are used.

Returns:

List of version information, sorted by most recent first.