airbyte_ops_mcp.mcp.connector_rollout
MCP tools for connector rollout management.
This module provides MCP tools for managing connector rollouts in Airbyte Cloud, including finalizing (promoting, rolling back, or canceling) rollouts.
MCP reference
MCP primitives registered by the connector_rollout module of the airbyte-internal-ops server: 4 tool(s), 0 prompt(s), 0 resource(s).
Tools (4)
finalize_connector_rollout
Hints: destructive · open-world
Finalize a connector rollout by promoting, rolling back, or canceling.
This tool allows admins to finalize connector rollouts that are in progress. Use this after monitoring a rollout and determining it is ready for finalization.
IMPORTANT: Finalization is asynchronous. This tool sends a finalization
request to the platform API, which transitions the rollout to finalizing
state and triggers a Temporal workflow. The actual promotion (PR creation,
connector publish, registry update) or rollback (GCS cleanup, registry
recompile) happens asynchronously via the finalize_rollout.yml GitHub
Actions workflow. A successful response from this tool means the request
was accepted — NOT that the promotion/rollback is complete.
After calling this tool, you MUST verify:
- The
finalize_rollout.ymlworkflow ran successfully in GitHub Actions - For promotions: a merged PR exists (e.g.,
chore: finalize promote for <connector>) - The rollout state transitioned to its terminal state (
succeeded,failed_rolled_back, orcanceled) viaquery_prod_connector_rollouts
Admin-only operation - Requires:
- AIRBYTE_INTERNAL_ADMIN_FLAG=airbyte.io environment variable
- approval_comment_url (Slack approval record URL from
escalate_to_human)
The admin user email is automatically derived from the Slack approval record, resolving the approver's @airbyte.io email via the team roster.
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
docker_repository |
string |
yes | — | The docker repository (e.g., 'airbyte/source-youtube-analytics') |
docker_image_tag |
string |
yes | — | The docker image tag (e.g., '1.2.0-rc.2') |
actor_definition_id |
string |
yes | — | The actor definition ID (UUID) |
rollout_id |
string |
yes | — | The rollout ID (UUID). Can be found in the 'pin_origin' field of rollout data from query_prod_actors_by_pinned_connector_version. |
state |
enum("succeeded", "failed_rolled_back", "canceled") |
yes | — | The final state for the rollout: 'succeeded' promotes the RC to GA (default version for all users), 'failed_rolled_back' rolls back the RC, 'canceled' cancels the rollout without promotion or rollback. |
approval_comment_url |
string | null |
no | null |
URL to the Slack approval record. Obtain this by calling the escalate_to_human tool with approval_requested=True; the backend delivers the approval record URL when a human clicks Approve. Format: https:// |
error_msg |
string | null |
no | null |
Optional error message for failed/canceled states. |
failed_reason |
string | null |
no | null |
Optional failure reason for failed/canceled states. |
retain_pins_on_cancellation |
boolean | null |
no | null |
If True, retain version pins when canceling. Only applicable when state is 'canceled'. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"docker_repository": {
"description": "The docker repository (e.g., 'airbyte/source-youtube-analytics')",
"type": "string"
},
"docker_image_tag": {
"description": "The docker image tag (e.g., '1.2.0-rc.2')",
"type": "string"
},
"actor_definition_id": {
"description": "The actor definition ID (UUID)",
"type": "string"
},
"rollout_id": {
"description": "The rollout ID (UUID). Can be found in the 'pin_origin' field of rollout data from query_prod_actors_by_pinned_connector_version.",
"type": "string"
},
"state": {
"description": "The final state for the rollout: 'succeeded' promotes the RC to GA (default version for all users), 'failed_rolled_back' rolls back the RC, 'canceled' cancels the rollout without promotion or rollback.",
"enum": [
"succeeded",
"failed_rolled_back",
"canceled"
],
"type": "string"
},
"approval_comment_url": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "URL to the Slack approval record. Obtain this by calling the `escalate_to_human` tool with `approval_requested=True`; the backend delivers the approval record URL when a human clicks Approve. Format: https://<workspace>.slack.com/archives/... The admin email is automatically resolved from the approver's identity via the team roster."
},
"error_msg": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional error message for failed/canceled states."
},
"failed_reason": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional failure reason for failed/canceled states."
},
"retain_pins_on_cancellation": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": null,
"description": "If True, retain version pins when canceling. Only applicable when state is 'canceled'."
}
},
"required": [
"docker_repository",
"docker_image_tag",
"actor_definition_id",
"rollout_id",
"state"
],
"type": "object"
}
Show output JSON schema
{
"description": "Result of a connector rollout finalization operation.\n\nThis model provides detailed information about the outcome of finalizing\na connector rollout (promote, rollback, or cancel).",
"properties": {
"success": {
"description": "Whether the operation succeeded",
"type": "boolean"
},
"message": {
"description": "Human-readable message describing the result",
"type": "string"
},
"rollout_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The rollout ID that was finalized"
},
"docker_repository": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The docker repository (e.g., 'airbyte/source-github')"
},
"docker_image_tag": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The docker image tag (e.g., '1.2.0-rc.2')"
},
"state": {
"anyOf": [
{
"enum": [
"succeeded",
"failed_rolled_back",
"canceled"
],
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The final state of the rollout"
}
},
"required": [
"success",
"message"
],
"type": "object"
}
progress_connector_rollout
Hints: destructive · open-world
Progress a connector rollout by pinning actors to the RC version.
This tool progresses a connector rollout by either:
- Setting a target percentage of actors to pin to the RC version
- Specifying specific actor IDs to pin
Admin-only operation - Requires:
- AIRBYTE_INTERNAL_ADMIN_FLAG=airbyte.io environment variable
- approval_comment_url (Slack approval record URL from
escalate_to_human)
The admin user email is automatically derived from the Slack approval record.
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
docker_repository |
string |
yes | — | The docker repository (e.g., 'airbyte/source-pokeapi') |
docker_image_tag |
string |
yes | — | The docker image tag (e.g., '0.3.48-rc.1') |
actor_definition_id |
string |
yes | — | The actor definition ID (UUID) |
rollout_id |
string |
yes | — | The rollout ID (UUID). Can be found from query_prod_connector_rollouts. |
approval_comment_url |
string | null |
no | null |
URL to the Slack approval record. Obtain this by calling the escalate_to_human tool with approval_requested=True; the backend delivers the approval record URL when a human clicks Approve. Format: https:// |
target_percentage |
integer | null |
no | null |
Target percentage of actors to pin to the RC (1-100). Either target_percentage or actor_ids must be provided. |
actor_ids |
array<string> | null |
no | null |
Specific actor IDs to pin to the RC. Either target_percentage or actor_ids must be provided. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"docker_repository": {
"description": "The docker repository (e.g., 'airbyte/source-pokeapi')",
"type": "string"
},
"docker_image_tag": {
"description": "The docker image tag (e.g., '0.3.48-rc.1')",
"type": "string"
},
"actor_definition_id": {
"description": "The actor definition ID (UUID)",
"type": "string"
},
"rollout_id": {
"description": "The rollout ID (UUID). Can be found from query_prod_connector_rollouts.",
"type": "string"
},
"approval_comment_url": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "URL to the Slack approval record. Obtain this by calling the `escalate_to_human` tool with `approval_requested=True`; the backend delivers the approval record URL when a human clicks Approve. Format: https://<workspace>.slack.com/archives/... The admin email is automatically resolved from the approver's identity via the team roster."
},
"target_percentage": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Target percentage of actors to pin to the RC (1-100). Either target_percentage or actor_ids must be provided."
},
"actor_ids": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "null"
}
],
"default": null,
"description": "Specific actor IDs to pin to the RC. Either target_percentage or actor_ids must be provided."
}
},
"required": [
"docker_repository",
"docker_image_tag",
"actor_definition_id",
"rollout_id"
],
"type": "object"
}
Show output JSON schema
{
"description": "Result of a connector rollout progress operation.\n\nThis model provides detailed information about the outcome of progressing\na connector rollout (pinning actors to the RC version).",
"properties": {
"success": {
"description": "Whether the operation succeeded",
"type": "boolean"
},
"message": {
"description": "Human-readable message describing the result",
"type": "string"
},
"rollout_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The rollout ID that was progressed"
},
"docker_repository": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The docker repository (e.g., 'airbyte/source-github')"
},
"docker_image_tag": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The docker image tag (e.g., '1.2.0-rc.2')"
},
"target_percentage": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "The target percentage of actors to pin"
}
},
"required": [
"success",
"message"
],
"type": "object"
}
query_prod_rollout_monitoring_stats
Hints: read-only · idempotent
Get monitoring stats for a connector rollout.
Returns actor selection info and per-actor sync stats for actors participating in the rollout. This uses the platform API's /get_actor_sync_info endpoint which filters sync stats to only include syncs that actually used the RC version associated with the rollout.
This is more accurate than SQL-based approaches which count all syncs regardless of which connector version was used.
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
rollout_id |
string |
yes | — | Rollout UUID to get monitoring stats for |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"rollout_id": {
"description": "Rollout UUID to get monitoring stats for",
"type": "string"
}
},
"required": [
"rollout_id"
],
"type": "object"
}
Show output JSON schema
{
"description": "Complete monitoring result for a rollout from the platform API.\n\nThis uses the platform API's /get_actor_sync_info endpoint which filters\nsync stats to only include syncs that actually used the RC version\nassociated with the rollout.",
"properties": {
"rollout_id": {
"description": "Rollout UUID",
"type": "string"
},
"actor_selection_info": {
"description": "Actor selection info for the rollout",
"properties": {
"num_actors": {
"description": "Total actors using this connector",
"type": "integer"
},
"num_pinned_to_connector_rollout": {
"description": "Actors specifically pinned to this rollout",
"type": "integer"
},
"num_actors_eligible_or_already_pinned": {
"description": "Actors eligible for pinning or already pinned",
"type": "integer"
}
},
"required": [
"num_actors",
"num_pinned_to_connector_rollout",
"num_actors_eligible_or_already_pinned"
],
"type": "object"
},
"actor_sync_stats": {
"description": "Per-actor sync stats for actors pinned to the rollout",
"items": {
"description": "Per-actor sync stats for a rollout (only syncs using the RC version).",
"properties": {
"actor_id": {
"description": "Actor UUID",
"type": "string"
},
"num_connections": {
"description": "Number of connections using this actor",
"type": "integer"
},
"num_succeeded": {
"description": "Number of successful syncs using the RC version",
"type": "integer"
},
"num_failed": {
"description": "Number of failed syncs using the RC version",
"type": "integer"
}
},
"required": [
"actor_id",
"num_connections",
"num_succeeded",
"num_failed"
],
"type": "object"
},
"type": "array"
}
},
"required": [
"rollout_id",
"actor_selection_info",
"actor_sync_stats"
],
"type": "object"
}
start_connector_rollout
Hints: destructive · open-world
Start or configure a connector rollout workflow.
This tool configures and starts a connector rollout workflow. It can be called multiple times while the rollout is in INITIALIZED state to update the configuration (strategy, percentages). Once the Temporal workflow starts and the state transitions to WORKFLOW_STARTED, the configuration is locked and cannot be changed.
Behavior:
- If rollout is INITIALIZED: Updates configuration and starts the workflow
- If rollout is already started: Returns an error (configuration is locked)
Configuration Parameters:
- rollout_strategy: 'manual' (default), 'automated', or 'overridden'
- initial_rollout_pct: Step size for progression (default: 25%)
- final_target_rollout_pct: Maximum percentage to pin (default: 50%)
- customer_tier: Customer tier to target - 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL' (default: TIER_2)
Admin-only operation - Requires:
- AIRBYTE_INTERNAL_ADMIN_FLAG=airbyte.io environment variable
- approval_comment_url (Slack approval record URL from
escalate_to_human)
The admin user email is automatically derived from the Slack approval record, resolving the approver's @airbyte.io email via the team roster.
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
docker_repository |
string |
yes | — | The docker repository (e.g., 'airbyte/source-pokeapi') |
docker_image_tag |
string |
yes | — | The docker image tag (e.g., '0.3.48-rc.1') |
actor_definition_id |
string |
yes | — | The actor definition ID (UUID) |
approval_comment_url |
string | null |
no | null |
URL to the Slack approval record. Obtain this by calling the escalate_to_human tool with approval_requested=True; the backend delivers the approval record URL when a human clicks Approve. Format: https:// |
rollout_strategy |
enum("manual", "automated", "overridden") |
no | "manual" |
The rollout strategy: 'manual' for manual control of rollout progression, 'automated' for automatic progression based on metrics, 'overridden' for special cases where normal rules are bypassed. |
initial_rollout_pct |
integer | null |
no | null |
Initial/step percentage for rollout progression (0-100). For automated rollouts, this is the percentage increment per step. For example, 25 means the rollout will advance by 25% each step. Default is 25% if not specified. |
final_target_rollout_pct |
integer | null |
no | null |
Maximum percentage of actors to pin (0-100). The rollout will not exceed this percentage. For example, 50 means at most 50% of actors will be pinned to the RC. Default is 50% if not specified. |
customer_tier |
enum("TIER_0", "TIER_1", "TIER_2", "ALL") | null |
no | null |
The customer tier to target for this rollout. Each tier represents a different group of customers: 'TIER_0' for the highest-priority customers, 'TIER_1' for mid-tier customers, 'TIER_2' for the broadest customer group (default if not specified), 'ALL' to target all customer tiers. When not specified, the platform defaults to TIER_2 only. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"docker_repository": {
"description": "The docker repository (e.g., 'airbyte/source-pokeapi')",
"type": "string"
},
"docker_image_tag": {
"description": "The docker image tag (e.g., '0.3.48-rc.1')",
"type": "string"
},
"actor_definition_id": {
"description": "The actor definition ID (UUID)",
"type": "string"
},
"approval_comment_url": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "URL to the Slack approval record. Obtain this by calling the `escalate_to_human` tool with `approval_requested=True`; the backend delivers the approval record URL when a human clicks Approve. Format: https://<workspace>.slack.com/archives/... The admin email is automatically resolved from the approver's identity via the team roster."
},
"rollout_strategy": {
"default": "manual",
"description": "The rollout strategy: 'manual' for manual control of rollout progression, 'automated' for automatic progression based on metrics, 'overridden' for special cases where normal rules are bypassed.",
"enum": [
"manual",
"automated",
"overridden"
],
"type": "string"
},
"initial_rollout_pct": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Initial/step percentage for rollout progression (0-100). For automated rollouts, this is the percentage increment per step. For example, 25 means the rollout will advance by 25% each step. Default is 25% if not specified."
},
"final_target_rollout_pct": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Maximum percentage of actors to pin (0-100). The rollout will not exceed this percentage. For example, 50 means at most 50% of actors will be pinned to the RC. Default is 50% if not specified."
},
"customer_tier": {
"anyOf": [
{
"enum": [
"TIER_0",
"TIER_1",
"TIER_2",
"ALL"
],
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The customer tier to target for this rollout. Each tier represents a different group of customers: 'TIER_0' for the highest-priority customers, 'TIER_1' for mid-tier customers, 'TIER_2' for the broadest customer group (default if not specified), 'ALL' to target all customer tiers. When not specified, the platform defaults to TIER_2 only."
}
},
"required": [
"docker_repository",
"docker_image_tag",
"actor_definition_id"
],
"type": "object"
}
Show output JSON schema
{
"description": "Result of a connector rollout start operation.\n\nThis model provides detailed information about the outcome of starting\na connector rollout workflow.",
"properties": {
"success": {
"description": "Whether the operation succeeded",
"type": "boolean"
},
"message": {
"description": "Human-readable message describing the result",
"type": "string"
},
"docker_repository": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The docker repository (e.g., 'airbyte/source-github')"
},
"docker_image_tag": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The docker image tag (e.g., '1.2.0-rc.2')"
},
"actor_definition_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The actor definition ID (UUID)"
},
"rollout_strategy": {
"anyOf": [
{
"enum": [
"manual",
"automated",
"overridden"
],
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The rollout strategy used"
}
},
"required": [
"success",
"message"
],
"type": "object"
}
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2"""MCP tools for connector rollout management. 3 4This module provides MCP tools for managing connector rollouts in Airbyte Cloud, 5including finalizing (promoting, rolling back, or canceling) rollouts. 6 7## MCP reference 8 9.. include:: ../../../docs/mcp-generated/connector_rollout.md 10 :start-line: 2 11""" 12 13# NOTE: We intentionally do NOT use `from __future__ import annotations` here. 14# FastMCP has issues resolving forward references when PEP 563 deferred annotations 15# are used. See: https://github.com/jlowin/fastmcp/issues/905 16# Python 3.12+ supports modern type hint syntax natively, so this is not needed. 17 18__all__: list[str] = [] 19 20from dataclasses import dataclass 21from typing import Annotated, Literal 22 23from airbyte import constants 24from airbyte.exceptions import PyAirbyteInputError 25from fastmcp import Context, FastMCP 26from fastmcp_extensions import get_mcp_config, mcp_tool, register_mcp_tools 27from pydantic import BaseModel, Field 28 29from airbyte_ops_mcp.approval_resolution import ( 30 ApprovalResolutionError, 31 resolve_admin_email_from_approval, 32) 33from airbyte_ops_mcp.cloud_admin import api_client 34from airbyte_ops_mcp.cloud_admin.auth import ( 35 CloudAuthError, 36 require_internal_admin_flag_only, 37) 38from airbyte_ops_mcp.cloud_admin.models import ( 39 ConnectorRolloutFinalizeResult, 40 ConnectorRolloutProgressResult, 41 ConnectorRolloutStartResult, 42) 43from airbyte_ops_mcp.constants import ServerConfigKey 44 45 46@dataclass(frozen=True) 47class _ResolvedCloudAuth: 48 """Resolved authentication for Airbyte Cloud API calls. 49 50 Either bearer_token OR (client_id AND client_secret) will be set, not both. 51 """ 52 53 bearer_token: str | None = None 54 client_id: str | None = None 55 client_secret: str | None = None 56 57 58def _resolve_cloud_auth(ctx: Context) -> _ResolvedCloudAuth: 59 """Resolve authentication credentials for Airbyte Cloud API. 60 61 Credentials are resolved in priority order: 62 1. Bearer token (Authorization header or AIRBYTE_CLOUD_BEARER_TOKEN env var) 63 2. Client credentials (X-Airbyte-Cloud-Client-Id/Secret headers or env vars) 64 65 Args: 66 ctx: FastMCP Context object from the current tool invocation. 67 68 Returns: 69 _ResolvedCloudAuth with either bearer_token or client credentials set. 70 71 Raises: 72 CloudAuthError: If credentials cannot be resolved from headers or env vars. 73 """ 74 # Try bearer token first (preferred, but not required) 75 bearer_token = get_mcp_config(ctx, ServerConfigKey.BEARER_TOKEN) 76 if bearer_token: 77 return _ResolvedCloudAuth(bearer_token=bearer_token) 78 79 # Fall back to client credentials 80 try: 81 client_id = get_mcp_config(ctx, ServerConfigKey.CLIENT_ID) 82 client_secret = get_mcp_config(ctx, ServerConfigKey.CLIENT_SECRET) 83 return _ResolvedCloudAuth( 84 client_id=client_id, 85 client_secret=client_secret, 86 ) 87 except ValueError as e: 88 raise CloudAuthError( 89 f"Failed to resolve credentials. Ensure credentials are provided " 90 f"via Authorization header (Bearer token), " 91 f"HTTP headers (X-Airbyte-Cloud-Client-Id, X-Airbyte-Cloud-Client-Secret), " 92 f"or environment variables. Error: {e}" 93 ) from e 94 95 96@mcp_tool( 97 destructive=True, 98 idempotent=False, 99 open_world=True, 100) 101def start_connector_rollout( 102 docker_repository: Annotated[ 103 str, 104 Field(description="The docker repository (e.g., 'airbyte/source-pokeapi')"), 105 ], 106 docker_image_tag: Annotated[ 107 str, 108 Field(description="The docker image tag (e.g., '0.3.48-rc.1')"), 109 ], 110 actor_definition_id: Annotated[ 111 str, 112 Field(description="The actor definition ID (UUID)"), 113 ], 114 approval_comment_url: Annotated[ 115 str | None, 116 Field( 117 description="URL to the Slack approval record. Obtain this by calling the " 118 "`escalate_to_human` tool with `approval_requested=True`; the backend delivers " 119 "the approval record URL when a human clicks Approve. " 120 "Format: https://<workspace>.slack.com/archives/... " 121 "The admin email is automatically resolved from the approver's identity " 122 "via the team roster.", 123 default=None, 124 ), 125 ], 126 rollout_strategy: Annotated[ 127 Literal["manual", "automated", "overridden"], 128 Field( 129 description="The rollout strategy: " 130 "'manual' for manual control of rollout progression, " 131 "'automated' for automatic progression based on metrics, " 132 "'overridden' for special cases where normal rules are bypassed.", 133 default="manual", 134 ), 135 ], 136 initial_rollout_pct: Annotated[ 137 int | None, 138 Field( 139 description="Initial/step percentage for rollout progression (0-100). " 140 "For automated rollouts, this is the percentage increment per step. " 141 "For example, 25 means the rollout will advance by 25% each step. " 142 "Default is 25% if not specified.", 143 default=None, 144 ), 145 ], 146 final_target_rollout_pct: Annotated[ 147 int | None, 148 Field( 149 description="Maximum percentage of actors to pin (0-100). " 150 "The rollout will not exceed this percentage. " 151 "For example, 50 means at most 50% of actors will be pinned to the RC. " 152 "Default is 50% if not specified.", 153 default=None, 154 ), 155 ], 156 customer_tier: Annotated[ 157 Literal["TIER_0", "TIER_1", "TIER_2", "ALL"] | None, 158 Field( 159 description="The customer tier to target for this rollout. " 160 "Each tier represents a different group of customers: " 161 "'TIER_0' for the highest-priority customers, " 162 "'TIER_1' for mid-tier customers, " 163 "'TIER_2' for the broadest customer group (default if not specified), " 164 "'ALL' to target all customer tiers. " 165 "When not specified, the platform defaults to TIER_2 only.", 166 default=None, 167 ), 168 ], 169 *, 170 ctx: Context, 171) -> ConnectorRolloutStartResult: 172 """Start or configure a connector rollout workflow. 173 174 This tool configures and starts a connector rollout workflow. It can be called 175 multiple times while the rollout is in INITIALIZED state to update the configuration 176 (strategy, percentages). Once the Temporal workflow starts and the state transitions 177 to WORKFLOW_STARTED, the configuration is locked and cannot be changed. 178 179 **Behavior:** 180 - If rollout is INITIALIZED: Updates configuration and starts the workflow 181 - If rollout is already started: Returns an error (configuration is locked) 182 183 **Configuration Parameters:** 184 - rollout_strategy: 'manual' (default), 'automated', or 'overridden' 185 - initial_rollout_pct: Step size for progression (default: 25%) 186 - final_target_rollout_pct: Maximum percentage to pin (default: 50%) 187 - customer_tier: Customer tier to target - 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL' (default: TIER_2) 188 189 **Admin-only operation** - Requires: 190 - AIRBYTE_INTERNAL_ADMIN_FLAG=airbyte.io environment variable 191 - approval_comment_url (Slack approval record URL from `escalate_to_human`) 192 193 The admin user email is automatically derived from the Slack approval record, 194 resolving the approver's @airbyte.io email via the team roster. 195 """ 196 # Validate admin access (check env var flag) 197 try: 198 require_internal_admin_flag_only() 199 except CloudAuthError as e: 200 return ConnectorRolloutStartResult( 201 success=False, 202 message=f"Admin authentication failed: {e}", 203 docker_repository=docker_repository, 204 docker_image_tag=docker_image_tag, 205 actor_definition_id=actor_definition_id, 206 ) 207 208 # Validate approval URL is provided 209 if not approval_comment_url: 210 return ConnectorRolloutStartResult( 211 success=False, 212 message="'approval_comment_url' is required. Use `escalate_to_human` with `approval_requested=True` to obtain a Slack approval record URL.", 213 docker_repository=docker_repository, 214 docker_image_tag=docker_image_tag, 215 actor_definition_id=actor_definition_id, 216 ) 217 218 # Derive admin email from the approval URL (domain-based dispatch) 219 try: 220 admin_user_email = resolve_admin_email_from_approval( 221 approval_comment_url=approval_comment_url, 222 ) 223 except ApprovalResolutionError as e: 224 return ConnectorRolloutStartResult( 225 success=False, 226 message=str(e), 227 docker_repository=docker_repository, 228 docker_image_tag=docker_image_tag, 229 actor_definition_id=actor_definition_id, 230 ) 231 232 # Resolve auth credentials 233 try: 234 auth = _resolve_cloud_auth(ctx) 235 except CloudAuthError as e: 236 return ConnectorRolloutStartResult( 237 success=False, 238 message=f"Failed to resolve credentials: {e}", 239 docker_repository=docker_repository, 240 docker_image_tag=docker_image_tag, 241 actor_definition_id=actor_definition_id, 242 ) 243 244 # Get user ID from admin email 245 try: 246 user_id = api_client.get_user_id_by_email( 247 email=admin_user_email, 248 config_api_root=constants.CLOUD_CONFIG_API_ROOT, 249 client_id=auth.client_id, 250 client_secret=auth.client_secret, 251 bearer_token=auth.bearer_token, 252 ) 253 except PyAirbyteInputError as e: 254 return ConnectorRolloutStartResult( 255 success=False, 256 message=f"Failed to get user ID for admin email '{admin_user_email}': {e}", 257 docker_repository=docker_repository, 258 docker_image_tag=docker_image_tag, 259 actor_definition_id=actor_definition_id, 260 ) 261 262 # Call the API to start the rollout 263 try: 264 api_client.start_connector_rollout( 265 docker_repository=docker_repository, 266 docker_image_tag=docker_image_tag, 267 actor_definition_id=actor_definition_id, 268 updated_by=user_id, 269 rollout_strategy=rollout_strategy, 270 config_api_root=constants.CLOUD_CONFIG_API_ROOT, 271 initial_rollout_pct=initial_rollout_pct, 272 final_target_rollout_pct=final_target_rollout_pct, 273 customer_tier=customer_tier, 274 client_id=auth.client_id, 275 client_secret=auth.client_secret, 276 bearer_token=auth.bearer_token, 277 ) 278 279 # Build message with configuration details 280 config_details = [] 281 if initial_rollout_pct is not None: 282 config_details.append(f"initial_rollout_pct={initial_rollout_pct}%") 283 if final_target_rollout_pct is not None: 284 config_details.append( 285 f"final_target_rollout_pct={final_target_rollout_pct}%" 286 ) 287 if customer_tier is not None: 288 config_details.append(f"customer_tier={customer_tier}") 289 config_str = ( 290 f" Configuration: {', '.join(config_details)}." if config_details else "" 291 ) 292 293 return ConnectorRolloutStartResult( 294 success=True, 295 message=f"Successfully started rollout workflow for " 296 f"{docker_repository}:{docker_image_tag}. " 297 f"The rollout state has transitioned from INITIALIZED to WORKFLOW_STARTED." 298 f"{config_str}", 299 docker_repository=docker_repository, 300 docker_image_tag=docker_image_tag, 301 actor_definition_id=actor_definition_id, 302 rollout_strategy=rollout_strategy, 303 ) 304 305 except PyAirbyteInputError as e: 306 return ConnectorRolloutStartResult( 307 success=False, 308 message=str(e), 309 docker_repository=docker_repository, 310 docker_image_tag=docker_image_tag, 311 actor_definition_id=actor_definition_id, 312 ) 313 314 315@mcp_tool( 316 destructive=True, 317 idempotent=False, 318 open_world=True, 319) 320def progress_connector_rollout( 321 docker_repository: Annotated[ 322 str, 323 Field(description="The docker repository (e.g., 'airbyte/source-pokeapi')"), 324 ], 325 docker_image_tag: Annotated[ 326 str, 327 Field(description="The docker image tag (e.g., '0.3.48-rc.1')"), 328 ], 329 actor_definition_id: Annotated[ 330 str, 331 Field(description="The actor definition ID (UUID)"), 332 ], 333 rollout_id: Annotated[ 334 str, 335 Field( 336 description="The rollout ID (UUID). Can be found from query_prod_connector_rollouts." 337 ), 338 ], 339 approval_comment_url: Annotated[ 340 str | None, 341 Field( 342 description="URL to the Slack approval record. Obtain this by calling the " 343 "`escalate_to_human` tool with `approval_requested=True`; the backend delivers " 344 "the approval record URL when a human clicks Approve. " 345 "Format: https://<workspace>.slack.com/archives/... " 346 "The admin email is automatically resolved from the approver's identity " 347 "via the team roster.", 348 default=None, 349 ), 350 ], 351 target_percentage: Annotated[ 352 int | None, 353 Field( 354 description="Target percentage of actors to pin to the RC (1-100). " 355 "Either target_percentage or actor_ids must be provided.", 356 default=None, 357 ), 358 ] = None, 359 actor_ids: Annotated[ 360 list[str] | None, 361 Field( 362 description="Specific actor IDs to pin to the RC. " 363 "Either target_percentage or actor_ids must be provided.", 364 default=None, 365 ), 366 ] = None, 367 *, 368 ctx: Context, 369) -> ConnectorRolloutProgressResult: 370 """Progress a connector rollout by pinning actors to the RC version. 371 372 This tool progresses a connector rollout by either: 373 - Setting a target percentage of actors to pin to the RC version 374 - Specifying specific actor IDs to pin 375 376 **Admin-only operation** - Requires: 377 - AIRBYTE_INTERNAL_ADMIN_FLAG=airbyte.io environment variable 378 - approval_comment_url (Slack approval record URL from `escalate_to_human`) 379 380 The admin user email is automatically derived from the Slack approval record. 381 """ 382 # Validate admin access (check env var flag) 383 try: 384 require_internal_admin_flag_only() 385 except CloudAuthError as e: 386 return ConnectorRolloutProgressResult( 387 success=False, 388 message=f"Admin authentication failed: {e}", 389 rollout_id=rollout_id, 390 docker_repository=docker_repository, 391 docker_image_tag=docker_image_tag, 392 ) 393 394 # Validate that at least one of target_percentage or actor_ids is provided 395 if target_percentage is None and actor_ids is None: 396 return ConnectorRolloutProgressResult( 397 success=False, 398 message="Either target_percentage or actor_ids must be provided", 399 rollout_id=rollout_id, 400 docker_repository=docker_repository, 401 docker_image_tag=docker_image_tag, 402 ) 403 404 # Validate approval URL is provided 405 if not approval_comment_url: 406 return ConnectorRolloutProgressResult( 407 success=False, 408 message="'approval_comment_url' is required. Use `escalate_to_human` with `approval_requested=True` to obtain a Slack approval record URL.", 409 rollout_id=rollout_id, 410 docker_repository=docker_repository, 411 docker_image_tag=docker_image_tag, 412 ) 413 414 # Derive admin email from the approval URL (domain-based dispatch) 415 try: 416 admin_user_email = resolve_admin_email_from_approval( 417 approval_comment_url=approval_comment_url, 418 ) 419 except ApprovalResolutionError as e: 420 return ConnectorRolloutProgressResult( 421 success=False, 422 message=str(e), 423 rollout_id=rollout_id, 424 docker_repository=docker_repository, 425 docker_image_tag=docker_image_tag, 426 ) 427 428 # Resolve auth credentials 429 try: 430 auth = _resolve_cloud_auth(ctx) 431 except CloudAuthError as e: 432 return ConnectorRolloutProgressResult( 433 success=False, 434 message=f"Failed to resolve credentials: {e}", 435 rollout_id=rollout_id, 436 docker_repository=docker_repository, 437 docker_image_tag=docker_image_tag, 438 ) 439 440 # Get user ID from admin email 441 try: 442 user_id = api_client.get_user_id_by_email( 443 email=admin_user_email, 444 config_api_root=constants.CLOUD_CONFIG_API_ROOT, 445 client_id=auth.client_id, 446 client_secret=auth.client_secret, 447 bearer_token=auth.bearer_token, 448 ) 449 except PyAirbyteInputError as e: 450 return ConnectorRolloutProgressResult( 451 success=False, 452 message=f"Failed to get user ID for admin email '{admin_user_email}': {e}", 453 rollout_id=rollout_id, 454 docker_repository=docker_repository, 455 docker_image_tag=docker_image_tag, 456 ) 457 458 # Call the API to progress the rollout 459 try: 460 api_client.progress_connector_rollout( 461 docker_repository=docker_repository, 462 docker_image_tag=docker_image_tag, 463 actor_definition_id=actor_definition_id, 464 rollout_id=rollout_id, 465 updated_by=user_id, 466 config_api_root=constants.CLOUD_CONFIG_API_ROOT, 467 target_percentage=target_percentage, 468 actor_ids=actor_ids, 469 client_id=auth.client_id, 470 client_secret=auth.client_secret, 471 bearer_token=auth.bearer_token, 472 ) 473 474 progress_msg = ( 475 f"target_percentage={target_percentage}%" 476 if target_percentage 477 else f"{len(actor_ids) if actor_ids else 0} specific actors" 478 ) 479 return ConnectorRolloutProgressResult( 480 success=True, 481 message=f"Successfully progressed rollout for " 482 f"{docker_repository}:{docker_image_tag} to {progress_msg}.", 483 rollout_id=rollout_id, 484 docker_repository=docker_repository, 485 docker_image_tag=docker_image_tag, 486 target_percentage=target_percentage, 487 ) 488 489 except PyAirbyteInputError as e: 490 return ConnectorRolloutProgressResult( 491 success=False, 492 message=str(e), 493 rollout_id=rollout_id, 494 docker_repository=docker_repository, 495 docker_image_tag=docker_image_tag, 496 ) 497 498 499@mcp_tool( 500 destructive=True, 501 idempotent=False, 502 open_world=True, 503) 504def finalize_connector_rollout( 505 docker_repository: Annotated[ 506 str, 507 Field( 508 description="The docker repository (e.g., 'airbyte/source-youtube-analytics')" 509 ), 510 ], 511 docker_image_tag: Annotated[ 512 str, 513 Field(description="The docker image tag (e.g., '1.2.0-rc.2')"), 514 ], 515 actor_definition_id: Annotated[ 516 str, 517 Field(description="The actor definition ID (UUID)"), 518 ], 519 rollout_id: Annotated[ 520 str, 521 Field( 522 description="The rollout ID (UUID). Can be found in the 'pin_origin' field " 523 "of rollout data from query_prod_actors_by_pinned_connector_version." 524 ), 525 ], 526 state: Annotated[ 527 Literal["succeeded", "failed_rolled_back", "canceled"], 528 Field( 529 description="The final state for the rollout: " 530 "'succeeded' promotes the RC to GA (default version for all users), " 531 "'failed_rolled_back' rolls back the RC, " 532 "'canceled' cancels the rollout without promotion or rollback." 533 ), 534 ], 535 approval_comment_url: Annotated[ 536 str | None, 537 Field( 538 description="URL to the Slack approval record. Obtain this by calling the " 539 "`escalate_to_human` tool with `approval_requested=True`; the backend delivers " 540 "the approval record URL when a human clicks Approve. " 541 "Format: https://<workspace>.slack.com/archives/... " 542 "The admin email is automatically resolved from the approver's identity " 543 "via the team roster.", 544 default=None, 545 ), 546 ], 547 error_msg: Annotated[ 548 str | None, 549 Field( 550 description="Optional error message for failed/canceled states.", 551 default=None, 552 ), 553 ] = None, 554 failed_reason: Annotated[ 555 str | None, 556 Field( 557 description="Optional failure reason for failed/canceled states.", 558 default=None, 559 ), 560 ] = None, 561 retain_pins_on_cancellation: Annotated[ 562 bool | None, 563 Field( 564 description="If True, retain version pins when canceling. " 565 "Only applicable when state is 'canceled'.", 566 default=None, 567 ), 568 ] = None, 569 *, 570 ctx: Context, 571) -> ConnectorRolloutFinalizeResult: 572 """Finalize a connector rollout by promoting, rolling back, or canceling. 573 574 This tool allows admins to finalize connector rollouts that are in progress. 575 Use this after monitoring a rollout and determining it is ready for finalization. 576 577 **IMPORTANT: Finalization is asynchronous.** This tool sends a finalization 578 request to the platform API, which transitions the rollout to `finalizing` 579 state and triggers a Temporal workflow. The actual promotion (PR creation, 580 connector publish, registry update) or rollback (GCS cleanup, registry 581 recompile) happens asynchronously via the `finalize_rollout.yml` GitHub 582 Actions workflow. A successful response from this tool means the request 583 was accepted — NOT that the promotion/rollback is complete. 584 585 After calling this tool, you MUST verify: 586 1. The `finalize_rollout.yml` workflow ran successfully in GitHub Actions 587 2. For promotions: a merged PR exists (e.g., `chore: finalize promote for <connector>`) 588 3. The rollout state transitioned to its terminal state (`succeeded`, 589 `failed_rolled_back`, or `canceled`) via `query_prod_connector_rollouts` 590 591 **Admin-only operation** - Requires: 592 - AIRBYTE_INTERNAL_ADMIN_FLAG=airbyte.io environment variable 593 - approval_comment_url (Slack approval record URL from `escalate_to_human`) 594 595 The admin user email is automatically derived from the Slack approval record, 596 resolving the approver's @airbyte.io email via the team roster. 597 """ 598 # Validate admin access (check env var flag) 599 try: 600 require_internal_admin_flag_only() 601 except CloudAuthError as e: 602 return ConnectorRolloutFinalizeResult( 603 success=False, 604 message=f"Admin authentication failed: {e}", 605 rollout_id=rollout_id, 606 docker_repository=docker_repository, 607 docker_image_tag=docker_image_tag, 608 ) 609 610 # Validate approval URL is provided 611 if not approval_comment_url: 612 return ConnectorRolloutFinalizeResult( 613 success=False, 614 message="'approval_comment_url' is required. Use `escalate_to_human` with `approval_requested=True` to obtain a Slack approval record URL.", 615 rollout_id=rollout_id, 616 docker_repository=docker_repository, 617 docker_image_tag=docker_image_tag, 618 ) 619 620 # Derive admin email from the approval URL (domain-based dispatch) 621 try: 622 admin_user_email = resolve_admin_email_from_approval( 623 approval_comment_url=approval_comment_url, 624 ) 625 except ApprovalResolutionError as e: 626 return ConnectorRolloutFinalizeResult( 627 success=False, 628 message=str(e), 629 rollout_id=rollout_id, 630 docker_repository=docker_repository, 631 docker_image_tag=docker_image_tag, 632 ) 633 634 # Resolve auth credentials 635 try: 636 auth = _resolve_cloud_auth(ctx) 637 except CloudAuthError as e: 638 return ConnectorRolloutFinalizeResult( 639 success=False, 640 message=f"Failed to resolve credentials: {e}", 641 rollout_id=rollout_id, 642 docker_repository=docker_repository, 643 docker_image_tag=docker_image_tag, 644 ) 645 646 # Get user ID from admin email 647 try: 648 user_id = api_client.get_user_id_by_email( 649 email=admin_user_email, 650 config_api_root=constants.CLOUD_CONFIG_API_ROOT, 651 client_id=auth.client_id, 652 client_secret=auth.client_secret, 653 bearer_token=auth.bearer_token, 654 ) 655 except PyAirbyteInputError as e: 656 return ConnectorRolloutFinalizeResult( 657 success=False, 658 message=f"Failed to get user ID for admin email '{admin_user_email}': {e}", 659 rollout_id=rollout_id, 660 docker_repository=docker_repository, 661 docker_image_tag=docker_image_tag, 662 ) 663 664 # Call the API to finalize the rollout 665 try: 666 api_client.finalize_connector_rollout( 667 docker_repository=docker_repository, 668 docker_image_tag=docker_image_tag, 669 actor_definition_id=actor_definition_id, 670 rollout_id=rollout_id, 671 updated_by=user_id, 672 state=state, 673 config_api_root=constants.CLOUD_CONFIG_API_ROOT, 674 client_id=auth.client_id, 675 client_secret=auth.client_secret, 676 bearer_token=auth.bearer_token, 677 error_msg=error_msg, 678 failed_reason=failed_reason, 679 retain_pins_on_cancellation=retain_pins_on_cancellation, 680 ) 681 682 state_descriptions = { 683 "succeeded": ( 684 "GA promotion has been initiated (state: finalizing). " 685 "The actual promotion (PR creation, publish, registry update) " 686 "happens asynchronously via the finalize_rollout.yml GitHub Actions workflow. " 687 "You MUST verify the workflow completes successfully and the rollout " 688 "transitions to 'succeeded' state before reporting completion. " 689 "Check: (1) GitHub Actions for a 'Finalize Progressive Rollout' workflow run, " 690 "(2) a merged promotion PR, and " 691 "(3) query_prod_connector_rollouts to confirm state is 'succeeded'." 692 ), 693 "failed_rolled_back": ( 694 "rollback has been initiated (state: finalizing). " 695 "The rollback happens asynchronously via the finalize_rollout.yml workflow. " 696 "Verify the workflow completes and the rollout transitions to " 697 "'failed_rolled_back' state." 698 ), 699 "canceled": "canceled", 700 } 701 state_desc = state_descriptions.get(state, state) 702 703 return ConnectorRolloutFinalizeResult( 704 success=True, 705 message=f"Finalization request accepted for {docker_repository}:{docker_image_tag}: " 706 f"{state_desc}", 707 rollout_id=rollout_id, 708 docker_repository=docker_repository, 709 docker_image_tag=docker_image_tag, 710 state=state, 711 ) 712 713 except PyAirbyteInputError as e: 714 return ConnectorRolloutFinalizeResult( 715 success=False, 716 message=str(e), 717 rollout_id=rollout_id, 718 docker_repository=docker_repository, 719 docker_image_tag=docker_image_tag, 720 ) 721 722 723class RolloutActorSelectionInfo(BaseModel): 724 """Actor selection info for a connector rollout.""" 725 726 num_actors: int = Field(description="Total actors using this connector") 727 num_pinned_to_connector_rollout: int = Field( 728 description="Actors specifically pinned to this rollout" 729 ) 730 num_actors_eligible_or_already_pinned: int = Field( 731 description="Actors eligible for pinning or already pinned" 732 ) 733 734 735class RolloutActorSyncStats(BaseModel): 736 """Per-actor sync stats for a rollout (only syncs using the RC version).""" 737 738 actor_id: str = Field(description="Actor UUID") 739 num_connections: int = Field(description="Number of connections using this actor") 740 num_succeeded: int = Field( 741 description="Number of successful syncs using the RC version" 742 ) 743 num_failed: int = Field(description="Number of failed syncs using the RC version") 744 745 746class RolloutMonitoringResult(BaseModel): 747 """Complete monitoring result for a rollout from the platform API. 748 749 This uses the platform API's /get_actor_sync_info endpoint which filters 750 sync stats to only include syncs that actually used the RC version 751 associated with the rollout. 752 """ 753 754 rollout_id: str = Field(description="Rollout UUID") 755 actor_selection_info: RolloutActorSelectionInfo = Field( 756 description="Actor selection info for the rollout" 757 ) 758 actor_sync_stats: list[RolloutActorSyncStats] = Field( 759 description="Per-actor sync stats for actors pinned to the rollout" 760 ) 761 762 763@mcp_tool( 764 read_only=True, 765 idempotent=True, 766) 767def query_prod_rollout_monitoring_stats( 768 rollout_id: Annotated[ 769 str, 770 Field(description="Rollout UUID to get monitoring stats for"), 771 ], 772 *, 773 ctx: Context, 774) -> RolloutMonitoringResult: 775 """Get monitoring stats for a connector rollout. 776 777 Returns actor selection info and per-actor sync stats for actors 778 participating in the rollout. This uses the platform API's 779 /get_actor_sync_info endpoint which filters sync stats to only include 780 syncs that actually used the RC version associated with the rollout. 781 782 This is more accurate than SQL-based approaches which count all syncs 783 regardless of which connector version was used. 784 """ 785 auth = _resolve_cloud_auth(ctx) 786 787 response = api_client.get_actor_sync_info( 788 rollout_id=rollout_id, 789 config_api_root=constants.CLOUD_CONFIG_API_ROOT, 790 client_id=auth.client_id, 791 client_secret=auth.client_secret, 792 bearer_token=auth.bearer_token, 793 ) 794 795 data = response.get("data", {}) 796 actor_selection_info_data = data.get("actor_selection_info", {}) 797 syncs_data = data.get("syncs", {}) 798 799 actor_selection_info = RolloutActorSelectionInfo( 800 num_actors=actor_selection_info_data.get("num_actors", 0), 801 num_pinned_to_connector_rollout=actor_selection_info_data.get( 802 "num_pinned_to_connector_rollout", 0 803 ), 804 num_actors_eligible_or_already_pinned=actor_selection_info_data.get( 805 "num_actors_eligible_or_already_pinned", 0 806 ), 807 ) 808 809 actor_sync_stats = [ 810 RolloutActorSyncStats( 811 actor_id=actor_id, 812 num_connections=sync_info.get("num_connections", 0), 813 num_succeeded=sync_info.get("num_succeeded", 0), 814 num_failed=sync_info.get("num_failed", 0), 815 ) 816 for actor_id, sync_info in syncs_data.items() 817 ] 818 819 return RolloutMonitoringResult( 820 rollout_id=rollout_id, 821 actor_selection_info=actor_selection_info, 822 actor_sync_stats=actor_sync_stats, 823 ) 824 825 826def register_connector_rollout_tools(app: FastMCP) -> None: 827 """Register connector rollout tools with the FastMCP app. 828 829 Args: 830 app: FastMCP application instance 831 """ 832 register_mcp_tools(app)