airbyte_ops_mcp.mcp.registry

MCP tools for connector registry operations.

This module provides MCP tools for interacting with the Airbyte connector registry stored in Google Cloud Storage, including:

  • Reading connector metadata and specs
  • Listing connectors and versions

MCP reference

MCP primitives registered by the registry module of the airbyte-internal-ops server: 4 tool(s), 0 prompt(s), 0 resource(s).

Tools (4)

get_connector_registry_entry

Hints: read-only · idempotent · open-world

Read a connector's metadata from the GCS registry.

Returns the full metadata.yaml content for a connector at the specified version. Requires GCS_CREDENTIALS environment variable to be set.

Parameters:

Name Type Required Default Description
connector_name string yes The connector name (e.g., 'source-faker', 'destination-postgres')
version string no "latest" Version to read (e.g., 'latest', '1.2.3'). Defaults to 'latest'.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connector_name": {
      "description": "The connector name (e.g., 'source-faker', 'destination-postgres')",
      "type": "string"
    },
    "version": {
      "default": "latest",
      "description": "Version to read (e.g., 'latest', '1.2.3'). Defaults to 'latest'.",
      "type": "string"
    }
  },
  "required": [
    "connector_name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "description": "Result of reading a registry entry from GCS.\n\nThis model wraps the raw metadata dictionary with additional context.",
  "properties": {
    "connector_name": {
      "description": "The connector technical name",
      "type": "string"
    },
    "version": {
      "description": "The version that was read",
      "type": "string"
    },
    "bucket_name": {
      "description": "The GCS bucket name",
      "type": "string"
    },
    "gcs_path": {
      "description": "The GCS path that was read",
      "type": "string"
    },
    "metadata": {
      "additionalProperties": true,
      "description": "The raw metadata dictionary",
      "type": "object"
    }
  },
  "required": [
    "connector_name",
    "version",
    "bucket_name",
    "gcs_path",
    "metadata"
  ],
  "type": "object"
}

get_connector_registry_spec

Hints: read-only · idempotent · open-world

Read a connector's spec from the GCS registry.

Returns the spec.json content for a connector at the specified version. Requires GCS_CREDENTIALS environment variable to be set.

Parameters:

Name Type Required Default Description
connector_name string yes The connector name (e.g., 'source-faker', 'destination-postgres')
version string no "latest" Version to read (e.g., 'latest', '1.2.3'). Defaults to 'latest'.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connector_name": {
      "description": "The connector name (e.g., 'source-faker', 'destination-postgres')",
      "type": "string"
    },
    "version": {
      "default": "latest",
      "description": "Version to read (e.g., 'latest', '1.2.3'). Defaults to 'latest'.",
      "type": "string"
    }
  },
  "required": [
    "connector_name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "description": "Result of reading a connector spec from GCS.",
  "properties": {
    "connector_name": {
      "description": "The connector technical name",
      "type": "string"
    },
    "version": {
      "description": "The version that was read",
      "type": "string"
    },
    "bucket_name": {
      "description": "The GCS bucket name",
      "type": "string"
    },
    "gcs_path": {
      "description": "The GCS path that was read",
      "type": "string"
    },
    "spec": {
      "additionalProperties": true,
      "description": "The connector spec dictionary",
      "type": "object"
    }
  },
  "required": [
    "connector_name",
    "version",
    "bucket_name",
    "gcs_path",
    "spec"
  ],
  "type": "object"
}

list_connector_versions_in_registry

Hints: read-only · idempotent · open-world

List all versions of a connector in the GCS registry.

Returns all published versions for a connector (excluding 'latest' and 'release_candidate'). Requires GCS_CREDENTIALS environment variable to be set.

Parameters:

Name Type Required Default Description
connector_name string yes The connector name (e.g., 'source-faker', 'destination-postgres')

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connector_name": {
      "description": "The connector name (e.g., 'source-faker', 'destination-postgres')",
      "type": "string"
    }
  },
  "required": [
    "connector_name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "description": "Result of listing versions for a connector.",
  "properties": {
    "connector_name": {
      "description": "The connector technical name",
      "type": "string"
    },
    "bucket_name": {
      "description": "The GCS bucket name",
      "type": "string"
    },
    "version_count": {
      "description": "Number of versions found",
      "type": "integer"
    },
    "versions": {
      "description": "List of version strings",
      "items": {
        "type": "string"
      },
      "type": "array"
    }
  },
  "required": [
    "connector_name",
    "bucket_name",
    "version_count",
    "versions"
  ],
  "type": "object"
}

list_connectors_in_registry

Hints: read-only · idempotent · open-world

List connectors in the GCS registry with optional filtering.

When filters are applied, reads the compiled cloud_registry.json index for fast lookups. Without filters, falls back to scanning individual metadata blobs (captures all connectors including OSS-only).

Requires GCS_CREDENTIALS environment variable to be set.

Parameters:

Name Type Required Default Description
certified boolean no false When True, return only certified connectors. Shorthand for support_level='certified'.
support_level string no "" Exact support level to match (e.g., certified, community, archived). Empty string means no filter.
min_support_level string no "" Minimum support level threshold (inclusive). Levels: archived < community < certified. Empty string means no filter.
connector_type string no "" Filter by connector type: source or destination. Empty string means no filter.
language string no "" Filter by implementation language (e.g., python, java, manifest-only). Empty string means no filter.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "certified": {
      "default": false,
      "description": "When `True`, return only certified connectors. Shorthand for `support_level='certified'`.",
      "type": "boolean"
    },
    "support_level": {
      "default": "",
      "description": "Exact support level to match (e.g., `certified`, `community`, `archived`). Empty string means no filter.",
      "type": "string"
    },
    "min_support_level": {
      "default": "",
      "description": "Minimum support level threshold (inclusive). Levels: `archived` < `community` < `certified`. Empty string means no filter.",
      "type": "string"
    },
    "connector_type": {
      "default": "",
      "description": "Filter by connector type: `source` or `destination`. Empty string means no filter.",
      "type": "string"
    },
    "language": {
      "default": "",
      "description": "Filter by implementation language (e.g., `python`, `java`, `manifest-only`). Empty string means no filter.",
      "type": "string"
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "description": "Result of listing connectors in the registry.",
  "properties": {
    "bucket_name": {
      "description": "The GCS bucket name",
      "type": "string"
    },
    "connector_count": {
      "description": "Number of connectors found",
      "type": "integer"
    },
    "connectors": {
      "description": "List of connector names",
      "items": {
        "type": "string"
      },
      "type": "array"
    }
  },
  "required": [
    "bucket_name",
    "connector_count",
    "connectors"
  ],
  "type": "object"
}

  1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
  2"""MCP tools for connector registry operations.
  3
  4This module provides MCP tools for interacting with the Airbyte connector registry
  5stored in Google Cloud Storage, including:
  6- Reading connector metadata and specs
  7- Listing connectors and versions
  8
  9## MCP reference
 10
 11.. include:: ../../../docs/mcp-generated/registry.md
 12    :start-line: 2
 13"""
 14
 15from __future__ import annotations
 16
 17__all__: list[str] = []
 18
 19from typing import Annotated, Any
 20
 21from fastmcp import FastMCP
 22from fastmcp_extensions import mcp_tool, register_mcp_tools
 23from pydantic import BaseModel, Field
 24
 25from airbyte_ops_mcp.registry import (
 26    PROD_METADATA_SERVICE_BUCKET_NAME,
 27    ConnectorListResult,
 28    RegistryEntryResult,
 29    VersionListResult,
 30)
 31from airbyte_ops_mcp.registry._enums import (
 32    ConnectorLanguage,
 33    ConnectorType,
 34    SupportLevel,
 35)
 36from airbyte_ops_mcp.registry.operations import (
 37    get_registry_entry,
 38    get_registry_spec,
 39    list_connector_versions,
 40    list_registry_connectors,
 41    list_registry_connectors_filtered,
 42)
 43
 44
 45class RegistrySpecResult(BaseModel):
 46    """Result of reading a connector spec from GCS."""
 47
 48    connector_name: str = Field(description="The connector technical name")
 49    version: str = Field(description="The version that was read")
 50    bucket_name: str = Field(description="The GCS bucket name")
 51    gcs_path: str = Field(description="The GCS path that was read")
 52    spec: dict[str, Any] = Field(description="The connector spec dictionary")
 53
 54
 55@mcp_tool(
 56    read_only=True,
 57    idempotent=True,
 58    open_world=True,
 59)
 60def get_connector_registry_entry(
 61    connector_name: Annotated[
 62        str,
 63        "The connector name (e.g., 'source-faker', 'destination-postgres')",
 64    ],
 65    version: Annotated[
 66        str,
 67        "Version to read (e.g., 'latest', '1.2.3'). Defaults to 'latest'.",
 68    ] = "latest",
 69) -> RegistryEntryResult:
 70    """Read a connector's metadata from the GCS registry.
 71
 72    Returns the full metadata.yaml content for a connector at the specified version.
 73    Requires GCS_CREDENTIALS environment variable to be set.
 74    """
 75    bucket_name = PROD_METADATA_SERVICE_BUCKET_NAME
 76    metadata = get_registry_entry(
 77        connector_name=connector_name,
 78        bucket_name=bucket_name,
 79        version=version,
 80    )
 81    gcs_path = f"metadata/airbyte/{connector_name}/{version}/metadata.yaml"
 82    return RegistryEntryResult(
 83        connector_name=connector_name,
 84        version=version,
 85        bucket_name=bucket_name,
 86        gcs_path=gcs_path,
 87        metadata=metadata,
 88    )
 89
 90
 91@mcp_tool(
 92    read_only=True,
 93    idempotent=True,
 94    open_world=True,
 95)
 96def get_connector_registry_spec(
 97    connector_name: Annotated[
 98        str,
 99        "The connector name (e.g., 'source-faker', 'destination-postgres')",
100    ],
101    version: Annotated[
102        str,
103        "Version to read (e.g., 'latest', '1.2.3'). Defaults to 'latest'.",
104    ] = "latest",
105) -> RegistrySpecResult:
106    """Read a connector's spec from the GCS registry.
107
108    Returns the spec.json content for a connector at the specified version.
109    Requires GCS_CREDENTIALS environment variable to be set.
110    """
111    bucket_name = PROD_METADATA_SERVICE_BUCKET_NAME
112    spec = get_registry_spec(
113        connector_name=connector_name,
114        bucket_name=bucket_name,
115        version=version,
116    )
117    gcs_path = f"metadata/airbyte/{connector_name}/{version}/spec.json"
118    return RegistrySpecResult(
119        connector_name=connector_name,
120        version=version,
121        bucket_name=bucket_name,
122        gcs_path=gcs_path,
123        spec=spec,
124    )
125
126
127@mcp_tool(
128    read_only=True,
129    idempotent=True,
130    open_world=True,
131)
132def list_connectors_in_registry(
133    certified: Annotated[
134        bool,
135        "When `True`, return only certified connectors. Shorthand for `support_level='certified'`.",
136    ] = False,
137    support_level: Annotated[
138        str,
139        "Exact support level to match (e.g., `certified`, `community`, `archived`). Empty string means no filter.",
140    ] = "",
141    min_support_level: Annotated[
142        str,
143        "Minimum support level threshold (inclusive). Levels: `archived` < `community` < `certified`. Empty string means no filter.",
144    ] = "",
145    connector_type: Annotated[
146        str,
147        "Filter by connector type: `source` or `destination`. Empty string means no filter.",
148    ] = "",
149    language: Annotated[
150        str,
151        "Filter by implementation language (e.g., `python`, `java`, `manifest-only`). Empty string means no filter.",
152    ] = "",
153) -> ConnectorListResult:
154    """List connectors in the GCS registry with optional filtering.
155
156    When filters are applied, reads the compiled `cloud_registry.json` index
157    for fast lookups. Without filters, falls back to scanning individual
158    metadata blobs (captures all connectors including OSS-only).
159
160    Requires GCS_CREDENTIALS environment variable to be set.
161    """
162    bucket_name = PROD_METADATA_SERVICE_BUCKET_NAME
163
164    # Normalise empty strings to typed enums or `None` for downstream logic.
165    eff_support_level: SupportLevel | None = (
166        SupportLevel.parse(support_level) if support_level else None
167    )
168    eff_min_support_level: SupportLevel | None = (
169        SupportLevel.parse(min_support_level) if min_support_level else None
170    )
171    eff_connector_type: ConnectorType | None = (
172        ConnectorType.parse(connector_type) if connector_type else None
173    )
174    eff_language: ConnectorLanguage | None = (
175        ConnectorLanguage.parse(language) if language else None
176    )
177
178    # `certified=True` is sugar for `support_level="certified"`.
179    if certified:
180        if eff_support_level and eff_support_level != SupportLevel.CERTIFIED:
181            raise ValueError(
182                "`certified=True` conflicts with `support_level="
183                f"{eff_support_level!r}`. Use one or the other."
184            )
185        eff_support_level = SupportLevel.CERTIFIED
186
187    has_filters = any(
188        [eff_support_level, eff_min_support_level, eff_connector_type, eff_language]
189    )
190
191    if has_filters:
192        connectors = list_registry_connectors_filtered(
193            bucket_name=bucket_name,
194            support_level=eff_support_level,
195            min_support_level=eff_min_support_level,
196            connector_type=eff_connector_type,
197            language=eff_language,
198        )
199    else:
200        connectors = list_registry_connectors(bucket_name=bucket_name)
201
202    return ConnectorListResult(
203        bucket_name=bucket_name,
204        connector_count=len(connectors),
205        connectors=connectors,
206    )
207
208
209@mcp_tool(
210    read_only=True,
211    idempotent=True,
212    open_world=True,
213)
214def list_connector_versions_in_registry(
215    connector_name: Annotated[
216        str,
217        "The connector name (e.g., 'source-faker', 'destination-postgres')",
218    ],
219) -> VersionListResult:
220    """List all versions of a connector in the GCS registry.
221
222    Returns all published versions for a connector (excluding 'latest' and 'release_candidate').
223    Requires GCS_CREDENTIALS environment variable to be set.
224    """
225    bucket_name = PROD_METADATA_SERVICE_BUCKET_NAME
226    versions = list_connector_versions(
227        connector_name=connector_name,
228        bucket_name=bucket_name,
229    )
230    return VersionListResult(
231        connector_name=connector_name,
232        bucket_name=bucket_name,
233        version_count=len(versions),
234        versions=versions,
235    )
236
237
238def register_registry_tools(app: FastMCP) -> None:
239    """Register registry tools with the FastMCP app."""
240    register_mcp_tools(app, mcp_module=__name__)