airbyte_ops_mcp.mcp.registry
MCP tools for connector registry operations.
This module provides MCP tools for interacting with the Airbyte connector registry stored in Google Cloud Storage, including:
- Reading connector metadata and specs
- Listing connectors and versions
- Yanking connector versions (workflow-backed)
MCP reference
MCP primitives registered by the registry module of the airbyte-internal-ops server: 5 tool(s), 0 prompt(s), 0 resource(s).
Tools (5)
get_connector_registry_entry
Hints: read-only · idempotent · open-world
Read a connector's metadata from the GCS registry.
Returns the full metadata.yaml content for a connector at the specified version. Requires GCS_CREDENTIALS environment variable to be set.
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connector_name |
string |
yes | — | The connector name (e.g., 'source-faker', 'destination-postgres') |
version |
string |
no | "latest" |
Version to read (e.g., 'latest', '1.2.3'). Defaults to 'latest'. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connector_name": {
"description": "The connector name (e.g., 'source-faker', 'destination-postgres')",
"type": "string"
},
"version": {
"default": "latest",
"description": "Version to read (e.g., 'latest', '1.2.3'). Defaults to 'latest'.",
"type": "string"
}
},
"required": [
"connector_name"
],
"type": "object"
}
Show output JSON schema
{
"description": "Result of reading a registry entry from GCS.\n\nThis model wraps the raw metadata dictionary with additional context.",
"properties": {
"connector_name": {
"description": "The connector technical name",
"type": "string"
},
"version": {
"description": "The version that was read",
"type": "string"
},
"bucket_name": {
"description": "The GCS bucket name",
"type": "string"
},
"gcs_path": {
"description": "The GCS path that was read",
"type": "string"
},
"metadata": {
"additionalProperties": true,
"description": "The raw metadata dictionary",
"type": "object"
}
},
"required": [
"connector_name",
"version",
"bucket_name",
"gcs_path",
"metadata"
],
"type": "object"
}
get_connector_registry_spec
Hints: read-only · idempotent · open-world
Read a connector's spec from the GCS registry.
Returns the spec.json content for a connector at the specified version. Requires GCS_CREDENTIALS environment variable to be set.
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connector_name |
string |
yes | — | The connector name (e.g., 'source-faker', 'destination-postgres') |
version |
string |
no | "latest" |
Version to read (e.g., 'latest', '1.2.3'). Defaults to 'latest'. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connector_name": {
"description": "The connector name (e.g., 'source-faker', 'destination-postgres')",
"type": "string"
},
"version": {
"default": "latest",
"description": "Version to read (e.g., 'latest', '1.2.3'). Defaults to 'latest'.",
"type": "string"
}
},
"required": [
"connector_name"
],
"type": "object"
}
Show output JSON schema
{
"description": "Result of reading a connector spec from GCS.",
"properties": {
"connector_name": {
"description": "The connector technical name",
"type": "string"
},
"version": {
"description": "The version that was read",
"type": "string"
},
"bucket_name": {
"description": "The GCS bucket name",
"type": "string"
},
"gcs_path": {
"description": "The GCS path that was read",
"type": "string"
},
"spec": {
"additionalProperties": true,
"description": "The connector spec dictionary",
"type": "object"
}
},
"required": [
"connector_name",
"version",
"bucket_name",
"gcs_path",
"spec"
],
"type": "object"
}
list_connector_versions_in_registry
Hints: read-only · idempotent · open-world
List all versions of a connector in the GCS registry.
Returns all published versions for a connector (excluding 'latest' and 'release_candidate'). Requires GCS_CREDENTIALS environment variable to be set.
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connector_name |
string |
yes | — | The connector name (e.g., 'source-faker', 'destination-postgres') |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connector_name": {
"description": "The connector name (e.g., 'source-faker', 'destination-postgres')",
"type": "string"
}
},
"required": [
"connector_name"
],
"type": "object"
}
Show output JSON schema
{
"description": "Result of listing versions for a connector.",
"properties": {
"connector_name": {
"description": "The connector technical name",
"type": "string"
},
"bucket_name": {
"description": "The GCS bucket name",
"type": "string"
},
"version_count": {
"description": "Number of versions found",
"type": "integer"
},
"versions": {
"description": "List of version strings",
"items": {
"type": "string"
},
"type": "array"
}
},
"required": [
"connector_name",
"bucket_name",
"version_count",
"versions"
],
"type": "object"
}
list_connectors_in_registry
Hints: read-only · idempotent · open-world
List connectors in the GCS registry with optional filtering.
When filters are applied, reads the compiled cloud_registry.json index
for fast lookups. Without filters, falls back to scanning individual
metadata blobs (captures all connectors including OSS-only).
Requires GCS_CREDENTIALS environment variable to be set.
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
certified |
boolean |
no | false |
When True, return only certified connectors. Shorthand for support_level='certified'. |
support_level |
string |
no | "" |
Exact support level to match (e.g., certified, community, archived). Empty string means no filter. |
min_support_level |
string |
no | "" |
Minimum support level threshold (inclusive). Levels: archived < community < certified. Empty string means no filter. |
connector_type |
string |
no | "" |
Filter by connector type: source or destination. Empty string means no filter. |
language |
string |
no | "" |
Filter by implementation language (e.g., python, java, manifest-only). Empty string means no filter. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"certified": {
"default": false,
"description": "When `True`, return only certified connectors. Shorthand for `support_level='certified'`.",
"type": "boolean"
},
"support_level": {
"default": "",
"description": "Exact support level to match (e.g., `certified`, `community`, `archived`). Empty string means no filter.",
"type": "string"
},
"min_support_level": {
"default": "",
"description": "Minimum support level threshold (inclusive). Levels: `archived` < `community` < `certified`. Empty string means no filter.",
"type": "string"
},
"connector_type": {
"default": "",
"description": "Filter by connector type: `source` or `destination`. Empty string means no filter.",
"type": "string"
},
"language": {
"default": "",
"description": "Filter by implementation language (e.g., `python`, `java`, `manifest-only`). Empty string means no filter.",
"type": "string"
}
},
"type": "object"
}
Show output JSON schema
{
"description": "Result of listing connectors in the registry.",
"properties": {
"bucket_name": {
"description": "The GCS bucket name",
"type": "string"
},
"connector_count": {
"description": "Number of connectors found",
"type": "integer"
},
"connectors": {
"description": "List of connector names",
"items": {
"type": "string"
},
"type": "array"
}
},
"required": [
"bucket_name",
"connector_count",
"connectors"
],
"type": "object"
}
yank_connector_version
Hints: open-world
Yank or unyank a connector version and recompile the registry via GitHub Actions.
Triggers a workflow that marks the version as yanked (or unyanked) and then recompiles the registry to update indexes and latest pointers.
Returns immediately with a workflow URL. Use check_ci_workflow_status to monitor progress.
Requires GITHUB_CI_WORKFLOW_TRIGGER_PAT or GITHUB_TOKEN environment variable with 'actions:write' permission.
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connector_name |
string |
yes | — | Connector name (e.g., 'source-faker', 'destination-postgres'). |
version |
string |
yes | — | Version to yank (e.g., '1.2.3'). |
store |
string |
yes | — | Store target (e.g., 'coral:dev', 'coral:prod'). |
reason |
string |
no | "" |
Reason for yanking this version. |
unyank |
boolean |
no | false |
Set to true to unyank (restore) the version instead of yanking it. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connector_name": {
"description": "Connector name (e.g., 'source-faker', 'destination-postgres').",
"type": "string"
},
"version": {
"description": "Version to yank (e.g., '1.2.3').",
"type": "string"
},
"store": {
"description": "Store target (e.g., 'coral:dev', 'coral:prod').",
"type": "string"
},
"reason": {
"default": "",
"description": "Reason for yanking this version.",
"type": "string"
},
"unyank": {
"default": false,
"description": "Set to true to unyank (restore) the version instead of yanking it.",
"type": "boolean"
}
},
"required": [
"connector_name",
"version",
"store"
],
"type": "object"
}
Show output JSON schema
{
"description": "Response from triggering a yank connector version workflow.",
"properties": {
"message": {
"description": "Human-readable status message",
"type": "string"
},
"workflow_url": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "URL to view the GitHub Actions workflow file"
},
"github_run_id": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "GitHub Actions workflow run ID (use with check_ci_workflow_status)"
},
"github_run_url": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Direct URL to the GitHub Actions workflow run"
}
},
"required": [
"message"
],
"type": "object"
}
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2"""MCP tools for connector registry operations. 3 4This module provides MCP tools for interacting with the Airbyte connector registry 5stored in Google Cloud Storage, including: 6- Reading connector metadata and specs 7- Listing connectors and versions 8- Yanking connector versions (workflow-backed) 9 10## MCP reference 11 12.. include:: ../../../docs/mcp-generated/registry.md 13 :start-line: 2 14""" 15 16from __future__ import annotations 17 18__all__: list[str] = [] 19 20from typing import Annotated, Any 21 22from fastmcp import FastMCP 23from fastmcp_extensions import mcp_tool, register_mcp_tools 24from pydantic import BaseModel, Field 25 26from airbyte_ops_mcp.github_actions import trigger_workflow_dispatch 27from airbyte_ops_mcp.github_api import resolve_ci_trigger_github_token 28from airbyte_ops_mcp.registry import ( 29 PROD_METADATA_SERVICE_BUCKET_NAME, 30 ConnectorListResult, 31 RegistryEntryResult, 32 VersionListResult, 33) 34from airbyte_ops_mcp.registry._enums import ( 35 ConnectorLanguage, 36 ConnectorType, 37 SupportLevel, 38) 39from airbyte_ops_mcp.registry.operations import ( 40 get_registry_entry, 41 get_registry_spec, 42 list_connector_versions, 43 list_registry_connectors, 44 list_registry_connectors_filtered, 45) 46 47 48class RegistrySpecResult(BaseModel): 49 """Result of reading a connector spec from GCS.""" 50 51 connector_name: str = Field(description="The connector technical name") 52 version: str = Field(description="The version that was read") 53 bucket_name: str = Field(description="The GCS bucket name") 54 gcs_path: str = Field(description="The GCS path that was read") 55 spec: dict[str, Any] = Field(description="The connector spec dictionary") 56 57 58@mcp_tool( 59 read_only=True, 60 idempotent=True, 61 open_world=True, 62) 63def get_connector_registry_entry( 64 connector_name: Annotated[ 65 str, 66 "The connector name (e.g., 'source-faker', 'destination-postgres')", 67 ], 68 version: Annotated[ 69 str, 70 "Version to read (e.g., 'latest', '1.2.3'). Defaults to 'latest'.", 71 ] = "latest", 72) -> RegistryEntryResult: 73 """Read a connector's metadata from the GCS registry. 74 75 Returns the full metadata.yaml content for a connector at the specified version. 76 Requires GCS_CREDENTIALS environment variable to be set. 77 """ 78 bucket_name = PROD_METADATA_SERVICE_BUCKET_NAME 79 metadata = get_registry_entry( 80 connector_name=connector_name, 81 bucket_name=bucket_name, 82 version=version, 83 ) 84 gcs_path = f"metadata/airbyte/{connector_name}/{version}/metadata.yaml" 85 return RegistryEntryResult( 86 connector_name=connector_name, 87 version=version, 88 bucket_name=bucket_name, 89 gcs_path=gcs_path, 90 metadata=metadata, 91 ) 92 93 94@mcp_tool( 95 read_only=True, 96 idempotent=True, 97 open_world=True, 98) 99def get_connector_registry_spec( 100 connector_name: Annotated[ 101 str, 102 "The connector name (e.g., 'source-faker', 'destination-postgres')", 103 ], 104 version: Annotated[ 105 str, 106 "Version to read (e.g., 'latest', '1.2.3'). Defaults to 'latest'.", 107 ] = "latest", 108) -> RegistrySpecResult: 109 """Read a connector's spec from the GCS registry. 110 111 Returns the spec.json content for a connector at the specified version. 112 Requires GCS_CREDENTIALS environment variable to be set. 113 """ 114 bucket_name = PROD_METADATA_SERVICE_BUCKET_NAME 115 spec = get_registry_spec( 116 connector_name=connector_name, 117 bucket_name=bucket_name, 118 version=version, 119 ) 120 gcs_path = f"metadata/airbyte/{connector_name}/{version}/spec.json" 121 return RegistrySpecResult( 122 connector_name=connector_name, 123 version=version, 124 bucket_name=bucket_name, 125 gcs_path=gcs_path, 126 spec=spec, 127 ) 128 129 130@mcp_tool( 131 read_only=True, 132 idempotent=True, 133 open_world=True, 134) 135def list_connectors_in_registry( 136 certified: Annotated[ 137 bool, 138 "When `True`, return only certified connectors. Shorthand for `support_level='certified'`.", 139 ] = False, 140 support_level: Annotated[ 141 str, 142 "Exact support level to match (e.g., `certified`, `community`, `archived`). Empty string means no filter.", 143 ] = "", 144 min_support_level: Annotated[ 145 str, 146 "Minimum support level threshold (inclusive). Levels: `archived` < `community` < `certified`. Empty string means no filter.", 147 ] = "", 148 connector_type: Annotated[ 149 str, 150 "Filter by connector type: `source` or `destination`. Empty string means no filter.", 151 ] = "", 152 language: Annotated[ 153 str, 154 "Filter by implementation language (e.g., `python`, `java`, `manifest-only`). Empty string means no filter.", 155 ] = "", 156) -> ConnectorListResult: 157 """List connectors in the GCS registry with optional filtering. 158 159 When filters are applied, reads the compiled `cloud_registry.json` index 160 for fast lookups. Without filters, falls back to scanning individual 161 metadata blobs (captures all connectors including OSS-only). 162 163 Requires GCS_CREDENTIALS environment variable to be set. 164 """ 165 bucket_name = PROD_METADATA_SERVICE_BUCKET_NAME 166 167 # Normalise empty strings to typed enums or `None` for downstream logic. 168 eff_support_level: SupportLevel | None = ( 169 SupportLevel.parse(support_level) if support_level else None 170 ) 171 eff_min_support_level: SupportLevel | None = ( 172 SupportLevel.parse(min_support_level) if min_support_level else None 173 ) 174 eff_connector_type: ConnectorType | None = ( 175 ConnectorType.parse(connector_type) if connector_type else None 176 ) 177 eff_language: ConnectorLanguage | None = ( 178 ConnectorLanguage.parse(language) if language else None 179 ) 180 181 # `certified=True` is sugar for `support_level="certified"`. 182 if certified: 183 if eff_support_level and eff_support_level != SupportLevel.CERTIFIED: 184 raise ValueError( 185 "`certified=True` conflicts with `support_level=" 186 f"{eff_support_level!r}`. Use one or the other." 187 ) 188 eff_support_level = SupportLevel.CERTIFIED 189 190 has_filters = any( 191 [eff_support_level, eff_min_support_level, eff_connector_type, eff_language] 192 ) 193 194 if has_filters: 195 connectors = list_registry_connectors_filtered( 196 bucket_name=bucket_name, 197 support_level=eff_support_level, 198 min_support_level=eff_min_support_level, 199 connector_type=eff_connector_type, 200 language=eff_language, 201 ) 202 else: 203 connectors = list_registry_connectors(bucket_name=bucket_name) 204 205 return ConnectorListResult( 206 bucket_name=bucket_name, 207 connector_count=len(connectors), 208 connectors=connectors, 209 ) 210 211 212@mcp_tool( 213 read_only=True, 214 idempotent=True, 215 open_world=True, 216) 217def list_connector_versions_in_registry( 218 connector_name: Annotated[ 219 str, 220 "The connector name (e.g., 'source-faker', 'destination-postgres')", 221 ], 222) -> VersionListResult: 223 """List all versions of a connector in the GCS registry. 224 225 Returns all published versions for a connector (excluding 'latest' and 'release_candidate'). 226 Requires GCS_CREDENTIALS environment variable to be set. 227 """ 228 bucket_name = PROD_METADATA_SERVICE_BUCKET_NAME 229 versions = list_connector_versions( 230 connector_name=connector_name, 231 bucket_name=bucket_name, 232 ) 233 return VersionListResult( 234 connector_name=connector_name, 235 bucket_name=bucket_name, 236 version_count=len(versions), 237 versions=versions, 238 ) 239 240 241# ============================================================================= 242# Yank Workflow Configuration 243# ============================================================================= 244 245YANK_WORKFLOW_REPO_OWNER = "airbytehq" 246YANK_WORKFLOW_REPO_NAME = "airbyte-ops-mcp" 247YANK_WORKFLOW_DEFAULT_BRANCH = "main" 248YANK_WORKFLOW_FILE = "registry-yank.yml" 249 250 251class YankConnectorVersionResponse(BaseModel): 252 """Response from triggering a yank connector version workflow.""" 253 254 message: str = Field(description="Human-readable status message") 255 workflow_url: str | None = Field( 256 default=None, 257 description="URL to view the GitHub Actions workflow file", 258 ) 259 github_run_id: int | None = Field( 260 default=None, 261 description="GitHub Actions workflow run ID (use with check_ci_workflow_status)", 262 ) 263 github_run_url: str | None = Field( 264 default=None, 265 description="Direct URL to the GitHub Actions workflow run", 266 ) 267 268 269@mcp_tool( 270 read_only=False, 271 idempotent=False, 272 open_world=True, 273) 274def yank_connector_version( 275 connector_name: Annotated[ 276 str, 277 "Connector name (e.g., 'source-faker', 'destination-postgres').", 278 ], 279 version: Annotated[ 280 str, 281 "Version to yank (e.g., '1.2.3').", 282 ], 283 store: Annotated[ 284 str, 285 "Store target (e.g., 'coral:dev', 'coral:prod').", 286 ], 287 reason: Annotated[ 288 str, 289 "Reason for yanking this version.", 290 ] = "", 291 unyank: Annotated[ 292 bool, 293 "Set to true to unyank (restore) the version instead of yanking it.", 294 ] = False, 295) -> YankConnectorVersionResponse: 296 """Yank or unyank a connector version and recompile the registry via GitHub Actions. 297 298 Triggers a workflow that marks the version as yanked (or unyanked) and then 299 recompiles the registry to update indexes and latest pointers. 300 301 Returns immediately with a workflow URL. Use check_ci_workflow_status to monitor 302 progress. 303 304 Requires GITHUB_CI_WORKFLOW_TRIGGER_PAT or GITHUB_TOKEN environment variable 305 with 'actions:write' permission. 306 """ 307 try: 308 token = resolve_ci_trigger_github_token() 309 except ValueError as e: 310 return YankConnectorVersionResponse( 311 message=str(e), 312 ) 313 314 action = "Unyank" if unyank else "Yank" 315 workflow_inputs: dict[str, str] = { 316 "connector_name": connector_name, 317 "version": version, 318 "store": store, 319 "unyank": str(unyank).lower(), 320 } 321 if reason: 322 workflow_inputs["reason"] = reason 323 324 dispatch_result = trigger_workflow_dispatch( 325 owner=YANK_WORKFLOW_REPO_OWNER, 326 repo=YANK_WORKFLOW_REPO_NAME, 327 workflow_file=YANK_WORKFLOW_FILE, 328 ref=YANK_WORKFLOW_DEFAULT_BRANCH, 329 inputs=workflow_inputs, 330 token=token, 331 ) 332 333 view_url = dispatch_result.run_url or dispatch_result.workflow_url 334 reason_info = f" (reason: {reason})" if reason else "" 335 return YankConnectorVersionResponse( 336 message=( 337 f"{action} workflow triggered for {connector_name}@{version} " 338 f"on {store}{reason_info}. View progress at: {view_url}" 339 ), 340 workflow_url=dispatch_result.workflow_url, 341 github_run_id=dispatch_result.run_id, 342 github_run_url=dispatch_result.run_url, 343 ) 344 345 346def register_registry_tools(app: FastMCP) -> None: 347 """Register registry tools with the FastMCP app.""" 348 register_mcp_tools(app, mcp_module=__name__)