airbyte_ops_mcp.mcp.registry

MCP tools for connector registry operations.

This module provides MCP tools for interacting with the Airbyte connector registry stored in Google Cloud Storage, including:

  • Reading connector metadata and specs
  • Listing connectors and versions
  • Yanking connector versions (workflow-backed)

MCP reference

MCP primitives registered by the registry module of the airbyte-internal-ops server: 5 tool(s), 0 prompt(s), 0 resource(s).

Tools (5)

get_connector_registry_entry

Hints: read-only · idempotent · open-world

Read a connector's metadata from the GCS registry.

Returns the full metadata.yaml content for a connector at the specified version. Requires GCS_CREDENTIALS environment variable to be set.

Parameters:

Name Type Required Default Description
connector_name string yes The connector name (e.g., 'source-faker', 'destination-postgres')
version string no "latest" Version to read (e.g., 'latest', '1.2.3'). Defaults to 'latest'.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connector_name": {
      "description": "The connector name (e.g., 'source-faker', 'destination-postgres')",
      "type": "string"
    },
    "version": {
      "default": "latest",
      "description": "Version to read (e.g., 'latest', '1.2.3'). Defaults to 'latest'.",
      "type": "string"
    }
  },
  "required": [
    "connector_name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "description": "Result of reading a registry entry from GCS.\n\nThis model wraps the raw metadata dictionary with additional context.",
  "properties": {
    "connector_name": {
      "description": "The connector technical name",
      "type": "string"
    },
    "version": {
      "description": "The version that was read",
      "type": "string"
    },
    "bucket_name": {
      "description": "The GCS bucket name",
      "type": "string"
    },
    "gcs_path": {
      "description": "The GCS path that was read",
      "type": "string"
    },
    "metadata": {
      "additionalProperties": true,
      "description": "The raw metadata dictionary",
      "type": "object"
    }
  },
  "required": [
    "connector_name",
    "version",
    "bucket_name",
    "gcs_path",
    "metadata"
  ],
  "type": "object"
}

get_connector_registry_spec

Hints: read-only · idempotent · open-world

Read a connector's spec from the GCS registry.

Returns the spec.json content for a connector at the specified version. Requires GCS_CREDENTIALS environment variable to be set.

Parameters:

Name Type Required Default Description
connector_name string yes The connector name (e.g., 'source-faker', 'destination-postgres')
version string no "latest" Version to read (e.g., 'latest', '1.2.3'). Defaults to 'latest'.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connector_name": {
      "description": "The connector name (e.g., 'source-faker', 'destination-postgres')",
      "type": "string"
    },
    "version": {
      "default": "latest",
      "description": "Version to read (e.g., 'latest', '1.2.3'). Defaults to 'latest'.",
      "type": "string"
    }
  },
  "required": [
    "connector_name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "description": "Result of reading a connector spec from GCS.",
  "properties": {
    "connector_name": {
      "description": "The connector technical name",
      "type": "string"
    },
    "version": {
      "description": "The version that was read",
      "type": "string"
    },
    "bucket_name": {
      "description": "The GCS bucket name",
      "type": "string"
    },
    "gcs_path": {
      "description": "The GCS path that was read",
      "type": "string"
    },
    "spec": {
      "additionalProperties": true,
      "description": "The connector spec dictionary",
      "type": "object"
    }
  },
  "required": [
    "connector_name",
    "version",
    "bucket_name",
    "gcs_path",
    "spec"
  ],
  "type": "object"
}

list_connector_versions_in_registry

Hints: read-only · idempotent · open-world

List all versions of a connector in the GCS registry.

Returns all published versions for a connector (excluding 'latest' and 'release_candidate'). Requires GCS_CREDENTIALS environment variable to be set.

Parameters:

Name Type Required Default Description
connector_name string yes The connector name (e.g., 'source-faker', 'destination-postgres')

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connector_name": {
      "description": "The connector name (e.g., 'source-faker', 'destination-postgres')",
      "type": "string"
    }
  },
  "required": [
    "connector_name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "description": "Result of listing versions for a connector.",
  "properties": {
    "connector_name": {
      "description": "The connector technical name",
      "type": "string"
    },
    "bucket_name": {
      "description": "The GCS bucket name",
      "type": "string"
    },
    "version_count": {
      "description": "Number of versions found",
      "type": "integer"
    },
    "versions": {
      "description": "List of version strings",
      "items": {
        "type": "string"
      },
      "type": "array"
    }
  },
  "required": [
    "connector_name",
    "bucket_name",
    "version_count",
    "versions"
  ],
  "type": "object"
}

list_connectors_in_registry

Hints: read-only · idempotent · open-world

List connectors in the GCS registry with optional filtering.

When filters are applied, reads the compiled cloud_registry.json index for fast lookups. Without filters, falls back to scanning individual metadata blobs (captures all connectors including OSS-only).

Requires GCS_CREDENTIALS environment variable to be set.

Parameters:

Name Type Required Default Description
certified boolean no false When True, return only certified connectors. Shorthand for support_level='certified'.
support_level string no "" Exact support level to match (e.g., certified, community, archived). Empty string means no filter.
min_support_level string no "" Minimum support level threshold (inclusive). Levels: archived < community < certified. Empty string means no filter.
connector_type string no "" Filter by connector type: source or destination. Empty string means no filter.
language string no "" Filter by implementation language (e.g., python, java, manifest-only). Empty string means no filter.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "certified": {
      "default": false,
      "description": "When `True`, return only certified connectors. Shorthand for `support_level='certified'`.",
      "type": "boolean"
    },
    "support_level": {
      "default": "",
      "description": "Exact support level to match (e.g., `certified`, `community`, `archived`). Empty string means no filter.",
      "type": "string"
    },
    "min_support_level": {
      "default": "",
      "description": "Minimum support level threshold (inclusive). Levels: `archived` < `community` < `certified`. Empty string means no filter.",
      "type": "string"
    },
    "connector_type": {
      "default": "",
      "description": "Filter by connector type: `source` or `destination`. Empty string means no filter.",
      "type": "string"
    },
    "language": {
      "default": "",
      "description": "Filter by implementation language (e.g., `python`, `java`, `manifest-only`). Empty string means no filter.",
      "type": "string"
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "description": "Result of listing connectors in the registry.",
  "properties": {
    "bucket_name": {
      "description": "The GCS bucket name",
      "type": "string"
    },
    "connector_count": {
      "description": "Number of connectors found",
      "type": "integer"
    },
    "connectors": {
      "description": "List of connector names",
      "items": {
        "type": "string"
      },
      "type": "array"
    }
  },
  "required": [
    "bucket_name",
    "connector_count",
    "connectors"
  ],
  "type": "object"
}

yank_connector_version

Hints: open-world

Yank or unyank a connector version and recompile the registry via GitHub Actions.

Triggers a workflow that marks the version as yanked (or unyanked) and then recompiles the registry to update indexes and latest pointers.

Returns immediately with a workflow URL. Use check_ci_workflow_status to monitor progress.

Requires GITHUB_CI_WORKFLOW_TRIGGER_PAT or GITHUB_TOKEN environment variable with 'actions:write' permission.

Parameters:

Name Type Required Default Description
connector_name string yes Connector name (e.g., 'source-faker', 'destination-postgres').
version string yes Version to yank (e.g., '1.2.3').
store string yes Store target (e.g., 'coral:dev', 'coral:prod').
reason string no "" Reason for yanking this version.
unyank boolean no false Set to true to unyank (restore) the version instead of yanking it.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connector_name": {
      "description": "Connector name (e.g., 'source-faker', 'destination-postgres').",
      "type": "string"
    },
    "version": {
      "description": "Version to yank (e.g., '1.2.3').",
      "type": "string"
    },
    "store": {
      "description": "Store target (e.g., 'coral:dev', 'coral:prod').",
      "type": "string"
    },
    "reason": {
      "default": "",
      "description": "Reason for yanking this version.",
      "type": "string"
    },
    "unyank": {
      "default": false,
      "description": "Set to true to unyank (restore) the version instead of yanking it.",
      "type": "boolean"
    }
  },
  "required": [
    "connector_name",
    "version",
    "store"
  ],
  "type": "object"
}

Show output JSON schema

{
  "description": "Response from triggering a yank connector version workflow.",
  "properties": {
    "message": {
      "description": "Human-readable status message",
      "type": "string"
    },
    "workflow_url": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "URL to view the GitHub Actions workflow file"
    },
    "github_run_id": {
      "anyOf": [
        {
          "type": "integer"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "GitHub Actions workflow run ID (use with check_ci_workflow_status)"
    },
    "github_run_url": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Direct URL to the GitHub Actions workflow run"
    }
  },
  "required": [
    "message"
  ],
  "type": "object"
}

  1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
  2"""MCP tools for connector registry operations.
  3
  4This module provides MCP tools for interacting with the Airbyte connector registry
  5stored in Google Cloud Storage, including:
  6- Reading connector metadata and specs
  7- Listing connectors and versions
  8- Yanking connector versions (workflow-backed)
  9
 10## MCP reference
 11
 12.. include:: ../../../docs/mcp-generated/registry.md
 13    :start-line: 2
 14"""
 15
 16from __future__ import annotations
 17
 18__all__: list[str] = []
 19
 20from typing import Annotated, Any
 21
 22from fastmcp import FastMCP
 23from fastmcp_extensions import mcp_tool, register_mcp_tools
 24from pydantic import BaseModel, Field
 25
 26from airbyte_ops_mcp.github_actions import trigger_workflow_dispatch
 27from airbyte_ops_mcp.github_api import resolve_ci_trigger_github_token
 28from airbyte_ops_mcp.registry import (
 29    PROD_METADATA_SERVICE_BUCKET_NAME,
 30    ConnectorListResult,
 31    RegistryEntryResult,
 32    VersionListResult,
 33)
 34from airbyte_ops_mcp.registry._enums import (
 35    ConnectorLanguage,
 36    ConnectorType,
 37    SupportLevel,
 38)
 39from airbyte_ops_mcp.registry.operations import (
 40    get_registry_entry,
 41    get_registry_spec,
 42    list_connector_versions,
 43    list_registry_connectors,
 44    list_registry_connectors_filtered,
 45)
 46
 47
 48class RegistrySpecResult(BaseModel):
 49    """Result of reading a connector spec from GCS."""
 50
 51    connector_name: str = Field(description="The connector technical name")
 52    version: str = Field(description="The version that was read")
 53    bucket_name: str = Field(description="The GCS bucket name")
 54    gcs_path: str = Field(description="The GCS path that was read")
 55    spec: dict[str, Any] = Field(description="The connector spec dictionary")
 56
 57
 58@mcp_tool(
 59    read_only=True,
 60    idempotent=True,
 61    open_world=True,
 62)
 63def get_connector_registry_entry(
 64    connector_name: Annotated[
 65        str,
 66        "The connector name (e.g., 'source-faker', 'destination-postgres')",
 67    ],
 68    version: Annotated[
 69        str,
 70        "Version to read (e.g., 'latest', '1.2.3'). Defaults to 'latest'.",
 71    ] = "latest",
 72) -> RegistryEntryResult:
 73    """Read a connector's metadata from the GCS registry.
 74
 75    Returns the full metadata.yaml content for a connector at the specified version.
 76    Requires GCS_CREDENTIALS environment variable to be set.
 77    """
 78    bucket_name = PROD_METADATA_SERVICE_BUCKET_NAME
 79    metadata = get_registry_entry(
 80        connector_name=connector_name,
 81        bucket_name=bucket_name,
 82        version=version,
 83    )
 84    gcs_path = f"metadata/airbyte/{connector_name}/{version}/metadata.yaml"
 85    return RegistryEntryResult(
 86        connector_name=connector_name,
 87        version=version,
 88        bucket_name=bucket_name,
 89        gcs_path=gcs_path,
 90        metadata=metadata,
 91    )
 92
 93
 94@mcp_tool(
 95    read_only=True,
 96    idempotent=True,
 97    open_world=True,
 98)
 99def get_connector_registry_spec(
100    connector_name: Annotated[
101        str,
102        "The connector name (e.g., 'source-faker', 'destination-postgres')",
103    ],
104    version: Annotated[
105        str,
106        "Version to read (e.g., 'latest', '1.2.3'). Defaults to 'latest'.",
107    ] = "latest",
108) -> RegistrySpecResult:
109    """Read a connector's spec from the GCS registry.
110
111    Returns the spec.json content for a connector at the specified version.
112    Requires GCS_CREDENTIALS environment variable to be set.
113    """
114    bucket_name = PROD_METADATA_SERVICE_BUCKET_NAME
115    spec = get_registry_spec(
116        connector_name=connector_name,
117        bucket_name=bucket_name,
118        version=version,
119    )
120    gcs_path = f"metadata/airbyte/{connector_name}/{version}/spec.json"
121    return RegistrySpecResult(
122        connector_name=connector_name,
123        version=version,
124        bucket_name=bucket_name,
125        gcs_path=gcs_path,
126        spec=spec,
127    )
128
129
130@mcp_tool(
131    read_only=True,
132    idempotent=True,
133    open_world=True,
134)
135def list_connectors_in_registry(
136    certified: Annotated[
137        bool,
138        "When `True`, return only certified connectors. Shorthand for `support_level='certified'`.",
139    ] = False,
140    support_level: Annotated[
141        str,
142        "Exact support level to match (e.g., `certified`, `community`, `archived`). Empty string means no filter.",
143    ] = "",
144    min_support_level: Annotated[
145        str,
146        "Minimum support level threshold (inclusive). Levels: `archived` < `community` < `certified`. Empty string means no filter.",
147    ] = "",
148    connector_type: Annotated[
149        str,
150        "Filter by connector type: `source` or `destination`. Empty string means no filter.",
151    ] = "",
152    language: Annotated[
153        str,
154        "Filter by implementation language (e.g., `python`, `java`, `manifest-only`). Empty string means no filter.",
155    ] = "",
156) -> ConnectorListResult:
157    """List connectors in the GCS registry with optional filtering.
158
159    When filters are applied, reads the compiled `cloud_registry.json` index
160    for fast lookups. Without filters, falls back to scanning individual
161    metadata blobs (captures all connectors including OSS-only).
162
163    Requires GCS_CREDENTIALS environment variable to be set.
164    """
165    bucket_name = PROD_METADATA_SERVICE_BUCKET_NAME
166
167    # Normalise empty strings to typed enums or `None` for downstream logic.
168    eff_support_level: SupportLevel | None = (
169        SupportLevel.parse(support_level) if support_level else None
170    )
171    eff_min_support_level: SupportLevel | None = (
172        SupportLevel.parse(min_support_level) if min_support_level else None
173    )
174    eff_connector_type: ConnectorType | None = (
175        ConnectorType.parse(connector_type) if connector_type else None
176    )
177    eff_language: ConnectorLanguage | None = (
178        ConnectorLanguage.parse(language) if language else None
179    )
180
181    # `certified=True` is sugar for `support_level="certified"`.
182    if certified:
183        if eff_support_level and eff_support_level != SupportLevel.CERTIFIED:
184            raise ValueError(
185                "`certified=True` conflicts with `support_level="
186                f"{eff_support_level!r}`. Use one or the other."
187            )
188        eff_support_level = SupportLevel.CERTIFIED
189
190    has_filters = any(
191        [eff_support_level, eff_min_support_level, eff_connector_type, eff_language]
192    )
193
194    if has_filters:
195        connectors = list_registry_connectors_filtered(
196            bucket_name=bucket_name,
197            support_level=eff_support_level,
198            min_support_level=eff_min_support_level,
199            connector_type=eff_connector_type,
200            language=eff_language,
201        )
202    else:
203        connectors = list_registry_connectors(bucket_name=bucket_name)
204
205    return ConnectorListResult(
206        bucket_name=bucket_name,
207        connector_count=len(connectors),
208        connectors=connectors,
209    )
210
211
212@mcp_tool(
213    read_only=True,
214    idempotent=True,
215    open_world=True,
216)
217def list_connector_versions_in_registry(
218    connector_name: Annotated[
219        str,
220        "The connector name (e.g., 'source-faker', 'destination-postgres')",
221    ],
222) -> VersionListResult:
223    """List all versions of a connector in the GCS registry.
224
225    Returns all published versions for a connector (excluding 'latest' and 'release_candidate').
226    Requires GCS_CREDENTIALS environment variable to be set.
227    """
228    bucket_name = PROD_METADATA_SERVICE_BUCKET_NAME
229    versions = list_connector_versions(
230        connector_name=connector_name,
231        bucket_name=bucket_name,
232    )
233    return VersionListResult(
234        connector_name=connector_name,
235        bucket_name=bucket_name,
236        version_count=len(versions),
237        versions=versions,
238    )
239
240
241# =============================================================================
242# Yank Workflow Configuration
243# =============================================================================
244
245YANK_WORKFLOW_REPO_OWNER = "airbytehq"
246YANK_WORKFLOW_REPO_NAME = "airbyte-ops-mcp"
247YANK_WORKFLOW_DEFAULT_BRANCH = "main"
248YANK_WORKFLOW_FILE = "registry-yank.yml"
249
250
251class YankConnectorVersionResponse(BaseModel):
252    """Response from triggering a yank connector version workflow."""
253
254    message: str = Field(description="Human-readable status message")
255    workflow_url: str | None = Field(
256        default=None,
257        description="URL to view the GitHub Actions workflow file",
258    )
259    github_run_id: int | None = Field(
260        default=None,
261        description="GitHub Actions workflow run ID (use with check_ci_workflow_status)",
262    )
263    github_run_url: str | None = Field(
264        default=None,
265        description="Direct URL to the GitHub Actions workflow run",
266    )
267
268
269@mcp_tool(
270    read_only=False,
271    idempotent=False,
272    open_world=True,
273)
274def yank_connector_version(
275    connector_name: Annotated[
276        str,
277        "Connector name (e.g., 'source-faker', 'destination-postgres').",
278    ],
279    version: Annotated[
280        str,
281        "Version to yank (e.g., '1.2.3').",
282    ],
283    store: Annotated[
284        str,
285        "Store target (e.g., 'coral:dev', 'coral:prod').",
286    ],
287    reason: Annotated[
288        str,
289        "Reason for yanking this version.",
290    ] = "",
291    unyank: Annotated[
292        bool,
293        "Set to true to unyank (restore) the version instead of yanking it.",
294    ] = False,
295) -> YankConnectorVersionResponse:
296    """Yank or unyank a connector version and recompile the registry via GitHub Actions.
297
298    Triggers a workflow that marks the version as yanked (or unyanked) and then
299    recompiles the registry to update indexes and latest pointers.
300
301    Returns immediately with a workflow URL. Use check_ci_workflow_status to monitor
302    progress.
303
304    Requires GITHUB_CI_WORKFLOW_TRIGGER_PAT or GITHUB_TOKEN environment variable
305    with 'actions:write' permission.
306    """
307    try:
308        token = resolve_ci_trigger_github_token()
309    except ValueError as e:
310        return YankConnectorVersionResponse(
311            message=str(e),
312        )
313
314    action = "Unyank" if unyank else "Yank"
315    workflow_inputs: dict[str, str] = {
316        "connector_name": connector_name,
317        "version": version,
318        "store": store,
319        "unyank": str(unyank).lower(),
320    }
321    if reason:
322        workflow_inputs["reason"] = reason
323
324    dispatch_result = trigger_workflow_dispatch(
325        owner=YANK_WORKFLOW_REPO_OWNER,
326        repo=YANK_WORKFLOW_REPO_NAME,
327        workflow_file=YANK_WORKFLOW_FILE,
328        ref=YANK_WORKFLOW_DEFAULT_BRANCH,
329        inputs=workflow_inputs,
330        token=token,
331    )
332
333    view_url = dispatch_result.run_url or dispatch_result.workflow_url
334    reason_info = f" (reason: {reason})" if reason else ""
335    return YankConnectorVersionResponse(
336        message=(
337            f"{action} workflow triggered for {connector_name}@{version} "
338            f"on {store}{reason_info}. View progress at: {view_url}"
339        ),
340        workflow_url=dispatch_result.workflow_url,
341        github_run_id=dispatch_result.run_id,
342        github_run_url=dispatch_result.run_url,
343    )
344
345
346def register_registry_tools(app: FastMCP) -> None:
347    """Register registry tools with the FastMCP app."""
348    register_mcp_tools(app, mcp_module=__name__)