airbyte_ops_mcp.mcp.registry

MCP tools for connector registry operations.

This module provides MCP tools for interacting with the Airbyte connector registry stored in Google Cloud Storage, including:

  • Reading connector metadata and specs
  • Listing connectors and versions
  • Yanking connector versions (workflow-backed)

MCP reference

MCP primitives registered by the registry module of the airbyte-internal-ops server: 5 tool(s), 0 prompt(s), 0 resource(s).

Tools (5)

get_connector_registry_entry

Hints: read-only · idempotent · open-world

Read a connector's metadata from the GCS registry.

Returns the full metadata.yaml content for a connector at the specified version. Requires GCS_CREDENTIALS environment variable to be set.

Parameters:

Name Type Required Default Description
connector_name string yes The connector name (e.g., 'source-faker', 'destination-postgres')
version string no "latest" Version to read (e.g., 'latest', '1.2.3'). Defaults to 'latest'.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connector_name": {
      "description": "The connector name (e.g., 'source-faker', 'destination-postgres')",
      "type": "string"
    },
    "version": {
      "default": "latest",
      "description": "Version to read (e.g., 'latest', '1.2.3'). Defaults to 'latest'.",
      "type": "string"
    }
  },
  "required": [
    "connector_name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "description": "Result of reading a registry entry from GCS.\n\nThis model wraps the raw metadata dictionary with additional context.",
  "properties": {
    "connector_name": {
      "description": "The connector technical name",
      "type": "string"
    },
    "version": {
      "description": "The version that was read",
      "type": "string"
    },
    "bucket_name": {
      "description": "The GCS bucket name",
      "type": "string"
    },
    "gcs_path": {
      "description": "The GCS path that was read",
      "type": "string"
    },
    "metadata": {
      "additionalProperties": true,
      "description": "The raw metadata dictionary",
      "type": "object"
    }
  },
  "required": [
    "connector_name",
    "version",
    "bucket_name",
    "gcs_path",
    "metadata"
  ],
  "type": "object"
}

get_connector_registry_spec

Hints: read-only · idempotent · open-world

Read a connector's spec from the GCS registry.

Returns the spec.json content for a connector at the specified version. Requires GCS_CREDENTIALS environment variable to be set.

Parameters:

Name Type Required Default Description
connector_name string yes The connector name (e.g., 'source-faker', 'destination-postgres')
version string no "latest" Version to read (e.g., 'latest', '1.2.3'). Defaults to 'latest'.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connector_name": {
      "description": "The connector name (e.g., 'source-faker', 'destination-postgres')",
      "type": "string"
    },
    "version": {
      "default": "latest",
      "description": "Version to read (e.g., 'latest', '1.2.3'). Defaults to 'latest'.",
      "type": "string"
    }
  },
  "required": [
    "connector_name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "description": "Result of reading a connector spec from GCS.",
  "properties": {
    "connector_name": {
      "description": "The connector technical name",
      "type": "string"
    },
    "version": {
      "description": "The version that was read",
      "type": "string"
    },
    "bucket_name": {
      "description": "The GCS bucket name",
      "type": "string"
    },
    "gcs_path": {
      "description": "The GCS path that was read",
      "type": "string"
    },
    "spec": {
      "additionalProperties": true,
      "description": "The connector spec dictionary",
      "type": "object"
    }
  },
  "required": [
    "connector_name",
    "version",
    "bucket_name",
    "gcs_path",
    "spec"
  ],
  "type": "object"
}

list_connector_versions_in_registry

Hints: read-only · idempotent · open-world

List all versions of a connector in the GCS registry.

Returns all published versions for a connector (excluding 'latest' and 'release_candidate'). Requires GCS_CREDENTIALS environment variable to be set.

Parameters:

Name Type Required Default Description
connector_name string yes The connector name (e.g., 'source-faker', 'destination-postgres')

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connector_name": {
      "description": "The connector name (e.g., 'source-faker', 'destination-postgres')",
      "type": "string"
    }
  },
  "required": [
    "connector_name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "description": "Result of listing versions for a connector.",
  "properties": {
    "connector_name": {
      "description": "The connector technical name",
      "type": "string"
    },
    "bucket_name": {
      "description": "The GCS bucket name",
      "type": "string"
    },
    "version_count": {
      "description": "Number of versions found",
      "type": "integer"
    },
    "versions": {
      "description": "List of version strings",
      "items": {
        "type": "string"
      },
      "type": "array"
    }
  },
  "required": [
    "connector_name",
    "bucket_name",
    "version_count",
    "versions"
  ],
  "type": "object"
}

list_connectors_in_registry

Hints: read-only · idempotent · open-world

List connectors in the GCS registry with optional filtering.

When filters are applied, reads the compiled cloud_registry.json index for fast lookups. Without filters, falls back to scanning individual metadata blobs (captures all connectors including OSS-only).

Requires GCS_CREDENTIALS environment variable to be set.

Parameters:

Name Type Required Default Description
certified boolean no false When True, return only certified connectors. Shorthand for support_level='certified'.
support_level string no "" Exact support level to match (e.g., certified, community, archived). Empty string means no filter.
min_support_level string no "" Minimum support level threshold (inclusive). Levels: archived < community < certified. Empty string means no filter.
connector_type string no "" Filter by connector type: source or destination. Empty string means no filter.
language string no "" Filter by implementation language (e.g., python, java, manifest-only). Empty string means no filter.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "certified": {
      "default": false,
      "description": "When `True`, return only certified connectors. Shorthand for `support_level='certified'`.",
      "type": "boolean"
    },
    "support_level": {
      "default": "",
      "description": "Exact support level to match (e.g., `certified`, `community`, `archived`). Empty string means no filter.",
      "type": "string"
    },
    "min_support_level": {
      "default": "",
      "description": "Minimum support level threshold (inclusive). Levels: `archived` < `community` < `certified`. Empty string means no filter.",
      "type": "string"
    },
    "connector_type": {
      "default": "",
      "description": "Filter by connector type: `source` or `destination`. Empty string means no filter.",
      "type": "string"
    },
    "language": {
      "default": "",
      "description": "Filter by implementation language (e.g., `python`, `java`, `manifest-only`). Empty string means no filter.",
      "type": "string"
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "description": "Result of listing connectors in the registry.",
  "properties": {
    "bucket_name": {
      "description": "The GCS bucket name",
      "type": "string"
    },
    "connector_count": {
      "description": "Number of connectors found",
      "type": "integer"
    },
    "connectors": {
      "description": "List of connector names",
      "items": {
        "type": "string"
      },
      "type": "array"
    }
  },
  "required": [
    "bucket_name",
    "connector_count",
    "connectors"
  ],
  "type": "object"
}

yank_connector_version

Hints: open-world

Yank or unyank a connector version after Slack/HITL approval.

This MCP tool requires approval for all stores, including dev/test stores, so the safety behavior is consistent and prod-impacting coral:prod requests cannot dispatch without approval.

Without approval_comment_url, returns the exact approval request summary and Slack message to send via escalate_to_human; no GitHub Actions workflow is triggered. With an approved Slack record URL, validates the approver and then triggers a workflow that marks the version as yanked (or unyanked) and recompiles the registry to update indexes and latest pointers.

Requires GITHUB_CI_WORKFLOW_TRIGGER_PAT or GITHUB_TOKEN environment variable with 'actions:write' permission.

Parameters:

Name Type Required Default Description
connector_name string yes Connector name (e.g., 'source-faker', 'destination-postgres').
version string yes Version to yank (e.g., '1.2.3').
store string yes Store target (e.g., 'coral:dev', 'coral:prod').
reason string no "" Reason for yanking this version.
unyank boolean no false Set to true to unyank (restore) the version instead of yanking it.
approval_comment_url string | null no null Slack approval record URL. Obtain this by calling escalate_to_human with approval_requested=True using the approval request details returned by this tool. The backend validates the approval record and resolves the approver's @airbyte.io email before dispatching the registry workflow.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connector_name": {
      "description": "Connector name (e.g., 'source-faker', 'destination-postgres').",
      "type": "string"
    },
    "version": {
      "description": "Version to yank (e.g., '1.2.3').",
      "type": "string"
    },
    "store": {
      "description": "Store target (e.g., 'coral:dev', 'coral:prod').",
      "type": "string"
    },
    "reason": {
      "default": "",
      "description": "Reason for yanking this version.",
      "type": "string"
    },
    "unyank": {
      "default": false,
      "description": "Set to true to unyank (restore) the version instead of yanking it.",
      "type": "boolean"
    },
    "approval_comment_url": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Slack approval record URL. Obtain this by calling `escalate_to_human` with `approval_requested=True` using the approval request details returned by this tool. The backend validates the approval record and resolves the approver's `@airbyte.io` email before dispatching the registry workflow."
    }
  },
  "required": [
    "connector_name",
    "version",
    "store"
  ],
  "type": "object"
}

Show output JSON schema

{
  "description": "Response from triggering a yank connector version workflow.",
  "properties": {
    "approval_required": {
      "default": false,
      "description": "Whether the operation still requires Slack/HITL approval before dispatch.",
      "type": "boolean"
    },
    "message": {
      "description": "Human-readable status message",
      "type": "string"
    },
    "approval_request_summary": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Short approval summary to pass to `escalate_to_human` when approval is required."
    },
    "approval_request_message": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Detailed Slack message to pass to `escalate_to_human` when approval is required."
    },
    "approved_by": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Resolved `@airbyte.io` email for the Slack approver."
    },
    "workflow_url": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "URL to view the GitHub Actions workflow file"
    },
    "github_run_id": {
      "anyOf": [
        {
          "type": "integer"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "GitHub Actions workflow run ID (use with check_ci_workflow_status)"
    },
    "github_run_url": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Direct URL to the GitHub Actions workflow run"
    }
  },
  "required": [
    "message"
  ],
  "type": "object"
}

  1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
  2"""MCP tools for connector registry operations.
  3
  4This module provides MCP tools for interacting with the Airbyte connector registry
  5stored in Google Cloud Storage, including:
  6- Reading connector metadata and specs
  7- Listing connectors and versions
  8- Yanking connector versions (workflow-backed)
  9
 10## MCP reference
 11
 12.. include:: ../../../docs/mcp-generated/registry.md
 13    :start-line: 2
 14"""
 15
 16from __future__ import annotations
 17
 18__all__: list[str] = []
 19
 20from typing import Annotated, Any
 21
 22from fastmcp import FastMCP
 23from fastmcp_extensions import mcp_tool, register_mcp_tools
 24from pydantic import BaseModel, Field
 25
 26from airbyte_ops_mcp.approval_resolution import (
 27    ApprovalResolutionError,
 28    resolve_admin_email_from_approval,
 29)
 30from airbyte_ops_mcp.github_actions import trigger_workflow_dispatch
 31from airbyte_ops_mcp.github_api import resolve_ci_trigger_github_token
 32from airbyte_ops_mcp.human_in_the_loop import APPROVAL_REQUEST_SUMMARY_MAX_LENGTH
 33from airbyte_ops_mcp.registry import (
 34    PROD_METADATA_SERVICE_BUCKET_NAME,
 35    ConnectorListResult,
 36    RegistryEntryResult,
 37    VersionListResult,
 38)
 39from airbyte_ops_mcp.registry._enums import (
 40    ConnectorLanguage,
 41    ConnectorType,
 42    SupportLevel,
 43)
 44from airbyte_ops_mcp.registry.operations import (
 45    get_registry_entry,
 46    get_registry_spec,
 47    list_connector_versions,
 48    list_registry_connectors,
 49    list_registry_connectors_filtered,
 50)
 51
 52
 53class RegistrySpecResult(BaseModel):
 54    """Result of reading a connector spec from GCS."""
 55
 56    connector_name: str = Field(description="The connector technical name")
 57    version: str = Field(description="The version that was read")
 58    bucket_name: str = Field(description="The GCS bucket name")
 59    gcs_path: str = Field(description="The GCS path that was read")
 60    spec: dict[str, Any] = Field(description="The connector spec dictionary")
 61
 62
 63@mcp_tool(
 64    read_only=True,
 65    idempotent=True,
 66    open_world=True,
 67)
 68def get_connector_registry_entry(
 69    connector_name: Annotated[
 70        str,
 71        "The connector name (e.g., 'source-faker', 'destination-postgres')",
 72    ],
 73    version: Annotated[
 74        str,
 75        "Version to read (e.g., 'latest', '1.2.3'). Defaults to 'latest'.",
 76    ] = "latest",
 77) -> RegistryEntryResult:
 78    """Read a connector's metadata from the GCS registry.
 79
 80    Returns the full metadata.yaml content for a connector at the specified version.
 81    Requires GCS_CREDENTIALS environment variable to be set.
 82    """
 83    bucket_name = PROD_METADATA_SERVICE_BUCKET_NAME
 84    metadata = get_registry_entry(
 85        connector_name=connector_name,
 86        bucket_name=bucket_name,
 87        version=version,
 88    )
 89    gcs_path = f"metadata/airbyte/{connector_name}/{version}/metadata.yaml"
 90    return RegistryEntryResult(
 91        connector_name=connector_name,
 92        version=version,
 93        bucket_name=bucket_name,
 94        gcs_path=gcs_path,
 95        metadata=metadata,
 96    )
 97
 98
 99@mcp_tool(
100    read_only=True,
101    idempotent=True,
102    open_world=True,
103)
104def get_connector_registry_spec(
105    connector_name: Annotated[
106        str,
107        "The connector name (e.g., 'source-faker', 'destination-postgres')",
108    ],
109    version: Annotated[
110        str,
111        "Version to read (e.g., 'latest', '1.2.3'). Defaults to 'latest'.",
112    ] = "latest",
113) -> RegistrySpecResult:
114    """Read a connector's spec from the GCS registry.
115
116    Returns the spec.json content for a connector at the specified version.
117    Requires GCS_CREDENTIALS environment variable to be set.
118    """
119    bucket_name = PROD_METADATA_SERVICE_BUCKET_NAME
120    spec = get_registry_spec(
121        connector_name=connector_name,
122        bucket_name=bucket_name,
123        version=version,
124    )
125    gcs_path = f"metadata/airbyte/{connector_name}/{version}/spec.json"
126    return RegistrySpecResult(
127        connector_name=connector_name,
128        version=version,
129        bucket_name=bucket_name,
130        gcs_path=gcs_path,
131        spec=spec,
132    )
133
134
135@mcp_tool(
136    read_only=True,
137    idempotent=True,
138    open_world=True,
139)
140def list_connectors_in_registry(
141    certified: Annotated[
142        bool,
143        "When `True`, return only certified connectors. Shorthand for `support_level='certified'`.",
144    ] = False,
145    support_level: Annotated[
146        str,
147        "Exact support level to match (e.g., `certified`, `community`, `archived`). Empty string means no filter.",
148    ] = "",
149    min_support_level: Annotated[
150        str,
151        "Minimum support level threshold (inclusive). Levels: `archived` < `community` < `certified`. Empty string means no filter.",
152    ] = "",
153    connector_type: Annotated[
154        str,
155        "Filter by connector type: `source` or `destination`. Empty string means no filter.",
156    ] = "",
157    language: Annotated[
158        str,
159        "Filter by implementation language (e.g., `python`, `java`, `manifest-only`). Empty string means no filter.",
160    ] = "",
161) -> ConnectorListResult:
162    """List connectors in the GCS registry with optional filtering.
163
164    When filters are applied, reads the compiled `cloud_registry.json` index
165    for fast lookups. Without filters, falls back to scanning individual
166    metadata blobs (captures all connectors including OSS-only).
167
168    Requires GCS_CREDENTIALS environment variable to be set.
169    """
170    bucket_name = PROD_METADATA_SERVICE_BUCKET_NAME
171
172    # Normalise empty strings to typed enums or `None` for downstream logic.
173    eff_support_level: SupportLevel | None = (
174        SupportLevel.parse(support_level) if support_level else None
175    )
176    eff_min_support_level: SupportLevel | None = (
177        SupportLevel.parse(min_support_level) if min_support_level else None
178    )
179    eff_connector_type: ConnectorType | None = (
180        ConnectorType.parse(connector_type) if connector_type else None
181    )
182    eff_language: ConnectorLanguage | None = (
183        ConnectorLanguage.parse(language) if language else None
184    )
185
186    # `certified=True` is sugar for `support_level="certified"`.
187    if certified:
188        if eff_support_level and eff_support_level != SupportLevel.CERTIFIED:
189            raise ValueError(
190                "`certified=True` conflicts with `support_level="
191                f"{eff_support_level!r}`. Use one or the other."
192            )
193        eff_support_level = SupportLevel.CERTIFIED
194
195    has_filters = any(
196        [eff_support_level, eff_min_support_level, eff_connector_type, eff_language]
197    )
198
199    if has_filters:
200        connectors = list_registry_connectors_filtered(
201            bucket_name=bucket_name,
202            support_level=eff_support_level,
203            min_support_level=eff_min_support_level,
204            connector_type=eff_connector_type,
205            language=eff_language,
206        )
207    else:
208        connectors = list_registry_connectors(bucket_name=bucket_name)
209
210    return ConnectorListResult(
211        bucket_name=bucket_name,
212        connector_count=len(connectors),
213        connectors=connectors,
214    )
215
216
217@mcp_tool(
218    read_only=True,
219    idempotent=True,
220    open_world=True,
221)
222def list_connector_versions_in_registry(
223    connector_name: Annotated[
224        str,
225        "The connector name (e.g., 'source-faker', 'destination-postgres')",
226    ],
227) -> VersionListResult:
228    """List all versions of a connector in the GCS registry.
229
230    Returns all published versions for a connector (excluding 'latest' and 'release_candidate').
231    Requires GCS_CREDENTIALS environment variable to be set.
232    """
233    bucket_name = PROD_METADATA_SERVICE_BUCKET_NAME
234    versions = list_connector_versions(
235        connector_name=connector_name,
236        bucket_name=bucket_name,
237    )
238    return VersionListResult(
239        connector_name=connector_name,
240        bucket_name=bucket_name,
241        version_count=len(versions),
242        versions=versions,
243    )
244
245
246# =============================================================================
247# Yank Workflow Configuration
248# =============================================================================
249
250YANK_WORKFLOW_REPO_OWNER = "airbytehq"
251YANK_WORKFLOW_REPO_NAME = "airbyte"
252YANK_WORKFLOW_DEFAULT_BRANCH = "master"
253YANK_WORKFLOW_FILE = "version-yank-command.yml"
254
255
256class YankConnectorVersionResponse(BaseModel):
257    """Response from triggering a yank connector version workflow."""
258
259    approval_required: bool = Field(
260        default=False,
261        description=(
262            "Whether the operation still requires Slack/HITL approval before dispatch."
263        ),
264    )
265    message: str = Field(description="Human-readable status message")
266    approval_request_summary: str | None = Field(
267        default=None,
268        description=(
269            "Short approval summary to pass to `escalate_to_human` when approval is required."
270        ),
271    )
272    approval_request_message: str | None = Field(
273        default=None,
274        description=(
275            "Detailed Slack message to pass to `escalate_to_human` when approval is required."
276        ),
277    )
278    approved_by: str | None = Field(
279        default=None,
280        description="Resolved `@airbyte.io` email for the Slack approver.",
281    )
282    workflow_url: str | None = Field(
283        default=None,
284        description="URL to view the GitHub Actions workflow file",
285    )
286    github_run_id: int | None = Field(
287        default=None,
288        description="GitHub Actions workflow run ID (use with check_ci_workflow_status)",
289    )
290    github_run_url: str | None = Field(
291        default=None,
292        description="Direct URL to the GitHub Actions workflow run",
293    )
294
295
296def _format_yank_action(unyank: bool) -> str:
297    """Return the registry action label for a yank tool call."""
298    return "unyank" if unyank else "yank"
299
300
301def _sanitize_approval_request_summary_text(value: str) -> str:
302    """Return text safe for Slack approval request formatting."""
303    return value.replace("`", "'")
304
305
306def _build_yank_approval_request_summary(
307    *,
308    connector_name: str,
309    version: str,
310    store: str,
311    reason: str,
312    unyank: bool,
313) -> str:
314    """Build a Slack confirmation summary for registry yank approval."""
315    action = _format_yank_action(unyank)
316    sanitized_connector_name = _sanitize_approval_request_summary_text(connector_name)
317    sanitized_version = _sanitize_approval_request_summary_text(version)
318    sanitized_store = _sanitize_approval_request_summary_text(store)
319    summary = (
320        f"{action} {sanitized_connector_name}@{sanitized_version} in "
321        f"{sanitized_store}; registry will be recompiled"
322    )
323    if reason:
324        sanitized_reason = _sanitize_approval_request_summary_text(reason)
325        summary = f"{summary}; reason: {sanitized_reason}"
326    if len(summary) <= APPROVAL_REQUEST_SUMMARY_MAX_LENGTH:
327        return summary
328    return f"{summary[: APPROVAL_REQUEST_SUMMARY_MAX_LENGTH - 3].rstrip()}..."
329
330
331def _build_yank_approval_request_message(
332    *,
333    connector_name: str,
334    version: str,
335    store: str,
336    reason: str,
337    unyank: bool,
338) -> str:
339    """Build the Slack message body for registry yank approval."""
340    action = _format_yank_action(unyank)
341    sanitized_connector_name = _sanitize_approval_request_summary_text(connector_name)
342    sanitized_version = _sanitize_approval_request_summary_text(version)
343    sanitized_store = _sanitize_approval_request_summary_text(store)
344    reason_text = (
345        _sanitize_approval_request_summary_text(reason) if reason else "(none provided)"
346    )
347    return (
348        "Approval requested for an MCP registry connector-version operation.\n\n"
349        f"- Action: `{action}`\n"
350        f"- Connector: `{sanitized_connector_name}`\n"
351        f"- Version: `{sanitized_version}`\n"
352        f"- Store: `{sanitized_store}`\n"
353        f"- Reason: {reason_text}\n"
354        "- Consequence: after approval, the MCP tool will dispatch "
355        "`airbyte/.github/workflows/version-yank-command.yml`; that workflow "
356        "will update the yank marker and run `airbyte-ops registry store compile`, "
357        "recompiling registry indexes and latest pointers.\n\n"
358        "After Slack approval, copy the Slack approval record URL into "
359        "`approval_comment_url` and call `yank_connector_version` again with "
360        "the same connector name, version, store, reason, and unyank values."
361    )
362
363
364@mcp_tool(
365    read_only=False,
366    idempotent=False,
367    open_world=True,
368)
369def yank_connector_version(
370    connector_name: Annotated[
371        str,
372        "Connector name (e.g., 'source-faker', 'destination-postgres').",
373    ],
374    version: Annotated[
375        str,
376        "Version to yank (e.g., '1.2.3').",
377    ],
378    store: Annotated[
379        str,
380        "Store target (e.g., 'coral:dev', 'coral:prod').",
381    ],
382    reason: Annotated[
383        str,
384        "Reason for yanking this version.",
385    ] = "",
386    unyank: Annotated[
387        bool,
388        "Set to true to unyank (restore) the version instead of yanking it.",
389    ] = False,
390    approval_comment_url: Annotated[
391        str | None,
392        Field(
393            description=(
394                "Slack approval record URL. Obtain this by calling "
395                "`escalate_to_human` with `approval_requested=True` using the "
396                "approval request details returned by this tool. The backend "
397                "validates the approval record and resolves the approver's "
398                "`@airbyte.io` email before dispatching the registry workflow."
399            ),
400            default=None,
401        ),
402    ] = None,
403) -> YankConnectorVersionResponse:
404    """Yank or unyank a connector version after Slack/HITL approval.
405
406    This MCP tool requires approval for all stores, including dev/test stores,
407    so the safety behavior is consistent and prod-impacting `coral:prod`
408    requests cannot dispatch without approval.
409
410    Without `approval_comment_url`, returns the exact approval request summary
411    and Slack message to send via `escalate_to_human`; no GitHub Actions
412    workflow is triggered. With an approved Slack record URL, validates the
413    approver and then triggers a workflow that marks the version as yanked
414    (or unyanked) and recompiles the registry to update indexes and latest
415    pointers.
416
417    Requires GITHUB_CI_WORKFLOW_TRIGGER_PAT or GITHUB_TOKEN environment variable
418    with 'actions:write' permission.
419    """
420    action = _format_yank_action(unyank)
421    action_title = action.capitalize()
422
423    approval_request_summary = _build_yank_approval_request_summary(
424        connector_name=connector_name,
425        version=version,
426        store=store,
427        reason=reason,
428        unyank=unyank,
429    )
430    approval_request_message = _build_yank_approval_request_message(
431        connector_name=connector_name,
432        version=version,
433        store=store,
434        reason=reason,
435        unyank=unyank,
436    )
437
438    if not approval_comment_url:
439        return YankConnectorVersionResponse(
440            approval_required=True,
441            message=(
442                f"Slack/HITL approval is required before dispatching the {action} "
443                f"workflow for {connector_name}@{version} on {store}. Call "
444                "`escalate_to_human` with `approval_requested=True`, "
445                "`request_type='approval'`, the returned "
446                "`approval_request_summary`, and the returned "
447                "`approval_request_message`. After approval, call this tool "
448                "again with the Slack approval record URL as `approval_comment_url`."
449            ),
450            approval_request_summary=approval_request_summary,
451            approval_request_message=approval_request_message,
452        )
453
454    try:
455        approved_by = resolve_admin_email_from_approval(
456            approval_comment_url=approval_comment_url,
457        )
458    except ApprovalResolutionError as e:
459        return YankConnectorVersionResponse(
460            approval_required=True,
461            message=str(e),
462            approval_request_summary=approval_request_summary,
463            approval_request_message=approval_request_message,
464        )
465
466    try:
467        token = resolve_ci_trigger_github_token()
468    except ValueError as e:
469        return YankConnectorVersionResponse(
470            message=str(e),
471            approved_by=approved_by,
472        )
473
474    workflow_inputs: dict[str, str] = {
475        "connector-name": connector_name,
476        "version": version,
477        "store": store,
478        "unyank": str(unyank).lower(),
479        "approval-url": approval_comment_url,
480    }
481    if reason:
482        workflow_inputs["reason"] = reason
483
484    dispatch_result = trigger_workflow_dispatch(
485        owner=YANK_WORKFLOW_REPO_OWNER,
486        repo=YANK_WORKFLOW_REPO_NAME,
487        workflow_file=YANK_WORKFLOW_FILE,
488        ref=YANK_WORKFLOW_DEFAULT_BRANCH,
489        inputs=workflow_inputs,
490        token=token,
491    )
492
493    view_url = dispatch_result.run_url or dispatch_result.workflow_url
494    reason_info = f" (reason: {reason})" if reason else ""
495    return YankConnectorVersionResponse(
496        message=(
497            f"{action_title} workflow triggered for {connector_name}@{version} "
498            f"on {store}{reason_info} after approval by {approved_by}. "
499            f"View progress at: {view_url}"
500        ),
501        approved_by=approved_by,
502        workflow_url=dispatch_result.workflow_url,
503        github_run_id=dispatch_result.run_id,
504        github_run_url=dispatch_result.run_url,
505    )
506
507
508def register_registry_tools(app: FastMCP) -> None:
509    """Register registry tools with the FastMCP app."""
510    register_mcp_tools(app, mcp_module=__name__)