airbyte_ops_mcp.mcp.connector_rollout

MCP tools for connector rollout management.

This module provides MCP tools for managing connector rollouts in Airbyte Cloud, including finalizing (promoting, rolling back, or canceling) rollouts.

MCP reference

MCP primitives registered by the connector_rollout module of the airbyte-internal-ops server: 4 tool(s), 0 prompt(s), 0 resource(s).

Tools (4)

finalize_connector_rollout

Hints: destructive · open-world

Finalize a connector rollout by promoting, rolling back, or canceling.

This tool allows admins to finalize connector rollouts that are in progress. Use this after monitoring a rollout and determining it is ready for finalization.

IMPORTANT: Finalization is asynchronous. This tool sends a finalization request to the platform API, which transitions the rollout to finalizing state and triggers a Temporal workflow. The actual promotion (PR creation, connector publish, registry update) or rollback (GCS cleanup, registry recompile) happens asynchronously via the finalize_rollout.yml GitHub Actions workflow. A successful response from this tool means the request was accepted — NOT that the promotion/rollback is complete.

After calling this tool, you MUST verify:

  1. The finalize_rollout.yml workflow ran successfully in GitHub Actions
  2. For promotions: a merged PR exists (e.g., chore: finalize promote for <connector>)
  3. The rollout state transitioned to its terminal state (succeeded, failed_rolled_back, or canceled) via query_prod_connector_rollouts

Admin-only operation - Requires:

  • AIRBYTE_INTERNAL_ADMIN_FLAG=airbyte.io environment variable
  • approval_comment_url (Slack approval record URL from escalate_to_human), OR admin_user_email_override when running inside the Ops Webapp.

Parameters:

Name Type Required Default Description
docker_repository string yes The docker repository (e.g., 'airbyte/source-youtube-analytics')
docker_image_tag string yes The docker image tag (e.g., '1.2.0-rc.2')
actor_definition_id string yes The actor definition ID (UUID)
rollout_id string yes The rollout ID (UUID). Can be found in the 'pin_origin' field of rollout data from query_prod_actors_by_pinned_connector_version.
state enum("succeeded", "failed_rolled_back", "canceled") yes The final state for the rollout: 'succeeded' promotes the RC to GA (default version for all users), 'failed_rolled_back' rolls back the RC, 'canceled' cancels the rollout without promotion or rollback.
approval_comment_url string | null no null URL to the Slack approval record. Obtain this by calling the escalate_to_human tool with approval_requested=True; the backend delivers the approval record URL when a human clicks Approve. Format: https://.slack.com/archives/... The admin email is automatically resolved from the approver's identity via the team roster.
admin_user_email_override string | null no null Direct admin email override for webapp-initiated actions. When the Ops Webapp env var is set, this bypasses the approval URL requirement. Ignored in agent/cron environments.
error_msg string | null no null Optional error message for failed/canceled states.
failed_reason string | null no null Optional failure reason for failed/canceled states.
retain_pins_on_cancellation boolean | null no null If True, retain version pins when canceling. Only applicable when state is 'canceled'.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "docker_repository": {
      "description": "The docker repository (e.g., 'airbyte/source-youtube-analytics')",
      "type": "string"
    },
    "docker_image_tag": {
      "description": "The docker image tag (e.g., '1.2.0-rc.2')",
      "type": "string"
    },
    "actor_definition_id": {
      "description": "The actor definition ID (UUID)",
      "type": "string"
    },
    "rollout_id": {
      "description": "The rollout ID (UUID). Can be found in the 'pin_origin' field of rollout data from query_prod_actors_by_pinned_connector_version.",
      "type": "string"
    },
    "state": {
      "description": "The final state for the rollout: 'succeeded' promotes the RC to GA (default version for all users), 'failed_rolled_back' rolls back the RC, 'canceled' cancels the rollout without promotion or rollback.",
      "enum": [
        "succeeded",
        "failed_rolled_back",
        "canceled"
      ],
      "type": "string"
    },
    "approval_comment_url": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "URL to the Slack approval record. Obtain this by calling the `escalate_to_human` tool with `approval_requested=True`; the backend delivers the approval record URL when a human clicks Approve. Format: https://<workspace>.slack.com/archives/... The admin email is automatically resolved from the approver's identity via the team roster."
    },
    "admin_user_email_override": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Direct admin email override for webapp-initiated actions. When the Ops Webapp env var is set, this bypasses the approval URL requirement. Ignored in agent/cron environments."
    },
    "error_msg": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional error message for failed/canceled states."
    },
    "failed_reason": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional failure reason for failed/canceled states."
    },
    "retain_pins_on_cancellation": {
      "anyOf": [
        {
          "type": "boolean"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "If True, retain version pins when canceling. Only applicable when state is 'canceled'."
    }
  },
  "required": [
    "docker_repository",
    "docker_image_tag",
    "actor_definition_id",
    "rollout_id",
    "state"
  ],
  "type": "object"
}

Show output JSON schema

{
  "description": "Result of a connector rollout finalization operation.\n\nThis model provides detailed information about the outcome of finalizing\na connector rollout (promote, rollback, or cancel).",
  "properties": {
    "success": {
      "description": "Whether the operation succeeded",
      "type": "boolean"
    },
    "message": {
      "description": "Human-readable message describing the result",
      "type": "string"
    },
    "rollout_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The rollout ID that was finalized"
    },
    "docker_repository": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The docker repository (e.g., 'airbyte/source-github')"
    },
    "docker_image_tag": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The docker image tag (e.g., '1.2.0-rc.2')"
    },
    "state": {
      "anyOf": [
        {
          "enum": [
            "succeeded",
            "failed_rolled_back",
            "canceled"
          ],
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The final state of the rollout"
    }
  },
  "required": [
    "success",
    "message"
  ],
  "type": "object"
}

progress_connector_rollout

Hints: destructive · open-world

Progress a connector rollout by pinning actors to the RC version.

This tool progresses a connector rollout by either:

  • Setting a target percentage of actors to pin to the RC version
  • Specifying specific actor IDs to pin

Admin-only operation - Requires:

  • AIRBYTE_INTERNAL_ADMIN_FLAG=airbyte.io environment variable
  • approval_comment_url (Slack approval record URL from escalate_to_human), OR admin_user_email_override when running inside the Ops Webapp.

Parameters:

Name Type Required Default Description
docker_repository string yes The docker repository (e.g., 'airbyte/source-pokeapi')
docker_image_tag string yes The docker image tag (e.g., '0.3.48-rc.1')
actor_definition_id string yes The actor definition ID (UUID)
rollout_id string yes The rollout ID (UUID). Can be found from query_prod_connector_rollouts.
approval_comment_url string | null no null URL to the Slack approval record. Obtain this by calling the escalate_to_human tool with approval_requested=True; the backend delivers the approval record URL when a human clicks Approve. Format: https://.slack.com/archives/... The admin email is automatically resolved from the approver's identity via the team roster.
admin_user_email_override string | null no null Direct admin email override for webapp-initiated actions. When the Ops Webapp env var is set, this bypasses the approval URL requirement. Ignored in agent/cron environments.
target_percentage integer | null no null Target percentage of actors to pin to the RC (1-100). Either target_percentage or actor_ids must be provided.
actor_ids array<string> | null no null Specific actor IDs to pin to the RC. Either target_percentage or actor_ids must be provided.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "docker_repository": {
      "description": "The docker repository (e.g., 'airbyte/source-pokeapi')",
      "type": "string"
    },
    "docker_image_tag": {
      "description": "The docker image tag (e.g., '0.3.48-rc.1')",
      "type": "string"
    },
    "actor_definition_id": {
      "description": "The actor definition ID (UUID)",
      "type": "string"
    },
    "rollout_id": {
      "description": "The rollout ID (UUID). Can be found from query_prod_connector_rollouts.",
      "type": "string"
    },
    "approval_comment_url": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "URL to the Slack approval record. Obtain this by calling the `escalate_to_human` tool with `approval_requested=True`; the backend delivers the approval record URL when a human clicks Approve. Format: https://<workspace>.slack.com/archives/... The admin email is automatically resolved from the approver's identity via the team roster."
    },
    "admin_user_email_override": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Direct admin email override for webapp-initiated actions. When the Ops Webapp env var is set, this bypasses the approval URL requirement. Ignored in agent/cron environments."
    },
    "target_percentage": {
      "anyOf": [
        {
          "type": "integer"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Target percentage of actors to pin to the RC (1-100). Either target_percentage or actor_ids must be provided."
    },
    "actor_ids": {
      "anyOf": [
        {
          "items": {
            "type": "string"
          },
          "type": "array"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Specific actor IDs to pin to the RC. Either target_percentage or actor_ids must be provided."
    }
  },
  "required": [
    "docker_repository",
    "docker_image_tag",
    "actor_definition_id",
    "rollout_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "description": "Result of a connector rollout progress operation.\n\nThis model provides detailed information about the outcome of progressing\na connector rollout (pinning actors to the RC version).",
  "properties": {
    "success": {
      "description": "Whether the operation succeeded",
      "type": "boolean"
    },
    "message": {
      "description": "Human-readable message describing the result",
      "type": "string"
    },
    "rollout_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The rollout ID that was progressed"
    },
    "docker_repository": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The docker repository (e.g., 'airbyte/source-github')"
    },
    "docker_image_tag": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The docker image tag (e.g., '1.2.0-rc.2')"
    },
    "target_percentage": {
      "anyOf": [
        {
          "type": "integer"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The target percentage of actors to pin"
    }
  },
  "required": [
    "success",
    "message"
  ],
  "type": "object"
}

query_prod_rollout_monitoring_stats

Hints: read-only · idempotent

Get monitoring stats for a connector rollout.

Returns actor selection info and per-actor sync stats for actors participating in the rollout. This uses the platform API's /get_actor_sync_info endpoint which filters sync stats to only include syncs that actually used the RC version associated with the rollout.

This is more accurate than SQL-based approaches which count all syncs regardless of which connector version was used.

Parameters:

Name Type Required Default Description
rollout_id string yes Rollout UUID to get monitoring stats for

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "rollout_id": {
      "description": "Rollout UUID to get monitoring stats for",
      "type": "string"
    }
  },
  "required": [
    "rollout_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "description": "Complete monitoring result for a rollout from the platform API.\n\nThis uses the platform API's /get_actor_sync_info endpoint which filters\nsync stats to only include syncs that actually used the RC version\nassociated with the rollout.",
  "properties": {
    "rollout_id": {
      "description": "Rollout UUID",
      "type": "string"
    },
    "actor_selection_info": {
      "description": "Actor selection info for the rollout",
      "properties": {
        "num_actors": {
          "description": "Total actors using this connector",
          "type": "integer"
        },
        "num_pinned_to_connector_rollout": {
          "description": "Actors specifically pinned to this rollout",
          "type": "integer"
        },
        "num_actors_eligible_or_already_pinned": {
          "description": "Actors eligible for pinning or already pinned",
          "type": "integer"
        }
      },
      "required": [
        "num_actors",
        "num_pinned_to_connector_rollout",
        "num_actors_eligible_or_already_pinned"
      ],
      "type": "object"
    },
    "actor_sync_stats": {
      "description": "Per-actor sync stats for actors pinned to the rollout",
      "items": {
        "description": "Per-actor sync stats for a rollout (only syncs using the RC version).",
        "properties": {
          "actor_id": {
            "description": "Actor UUID",
            "type": "string"
          },
          "num_connections": {
            "description": "Number of connections using this actor",
            "type": "integer"
          },
          "num_succeeded": {
            "description": "Number of successful syncs using the RC version",
            "type": "integer"
          },
          "num_failed": {
            "description": "Number of failed syncs using the RC version",
            "type": "integer"
          }
        },
        "required": [
          "actor_id",
          "num_connections",
          "num_succeeded",
          "num_failed"
        ],
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "rollout_id",
    "actor_selection_info",
    "actor_sync_stats"
  ],
  "type": "object"
}

start_connector_rollout

Hints: destructive · open-world

Start or configure a connector rollout workflow.

This tool configures and starts a connector rollout workflow. It can be called multiple times while the rollout is in INITIALIZED state to update the configuration (strategy, percentages). Once the Temporal workflow starts and the state transitions to WORKFLOW_STARTED, the configuration is locked and cannot be changed.

Behavior:

  • If rollout is INITIALIZED: Updates configuration and starts the workflow
  • If rollout is already started: Returns an error (configuration is locked)

Configuration Parameters:

  • rollout_strategy: 'manual' (default), 'automated', or 'overridden'
  • initial_rollout_pct: Step size for progression (default: 25%)
  • final_target_rollout_pct: Maximum percentage to pin (default: 50%)
  • customer_tier: Customer tier to target - 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL' (default: TIER_2)

Admin-only operation - Requires:

  • AIRBYTE_INTERNAL_ADMIN_FLAG=airbyte.io environment variable
  • approval_comment_url (Slack approval record URL from escalate_to_human), OR admin_user_email_override when running inside the Ops Webapp.

Parameters:

Name Type Required Default Description
docker_repository string yes The docker repository (e.g., 'airbyte/source-pokeapi')
docker_image_tag string yes The docker image tag (e.g., '0.3.48-rc.1')
actor_definition_id string yes The actor definition ID (UUID)
approval_comment_url string | null no null URL to the Slack approval record. Obtain this by calling the escalate_to_human tool with approval_requested=True; the backend delivers the approval record URL when a human clicks Approve. Format: https://.slack.com/archives/... The admin email is automatically resolved from the approver's identity via the team roster.
admin_user_email_override string | null no null Direct admin email override for webapp-initiated actions. When the Ops Webapp env var is set, this bypasses the approval URL requirement. Ignored in agent/cron environments.
rollout_strategy enum("manual", "automated", "overridden") no "manual" The rollout strategy: 'manual' for manual control of rollout progression, 'automated' for automatic progression based on metrics, 'overridden' for special cases where normal rules are bypassed.
initial_rollout_pct integer | null no null Initial/step percentage for rollout progression (0-100). For automated rollouts, this is the percentage increment per step. For example, 25 means the rollout will advance by 25% each step. Default is 25% if not specified.
final_target_rollout_pct integer | null no null Maximum percentage of actors to pin (0-100). The rollout will not exceed this percentage. For example, 50 means at most 50% of actors will be pinned to the RC. Default is 50% if not specified.
customer_tier enum("TIER_0", "TIER_1", "TIER_2", "ALL") | null no null The customer tier to target for this rollout. Each tier represents a different group of customers: 'TIER_0' for the highest-priority customers, 'TIER_1' for mid-tier customers, 'TIER_2' for the broadest customer group (default if not specified), 'ALL' to target all customer tiers. When not specified, the platform defaults to TIER_2 only.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "docker_repository": {
      "description": "The docker repository (e.g., 'airbyte/source-pokeapi')",
      "type": "string"
    },
    "docker_image_tag": {
      "description": "The docker image tag (e.g., '0.3.48-rc.1')",
      "type": "string"
    },
    "actor_definition_id": {
      "description": "The actor definition ID (UUID)",
      "type": "string"
    },
    "approval_comment_url": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "URL to the Slack approval record. Obtain this by calling the `escalate_to_human` tool with `approval_requested=True`; the backend delivers the approval record URL when a human clicks Approve. Format: https://<workspace>.slack.com/archives/... The admin email is automatically resolved from the approver's identity via the team roster."
    },
    "admin_user_email_override": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Direct admin email override for webapp-initiated actions. When the Ops Webapp env var is set, this bypasses the approval URL requirement. Ignored in agent/cron environments."
    },
    "rollout_strategy": {
      "default": "manual",
      "description": "The rollout strategy: 'manual' for manual control of rollout progression, 'automated' for automatic progression based on metrics, 'overridden' for special cases where normal rules are bypassed.",
      "enum": [
        "manual",
        "automated",
        "overridden"
      ],
      "type": "string"
    },
    "initial_rollout_pct": {
      "anyOf": [
        {
          "type": "integer"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Initial/step percentage for rollout progression (0-100). For automated rollouts, this is the percentage increment per step. For example, 25 means the rollout will advance by 25% each step. Default is 25% if not specified."
    },
    "final_target_rollout_pct": {
      "anyOf": [
        {
          "type": "integer"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Maximum percentage of actors to pin (0-100). The rollout will not exceed this percentage. For example, 50 means at most 50% of actors will be pinned to the RC. Default is 50% if not specified."
    },
    "customer_tier": {
      "anyOf": [
        {
          "enum": [
            "TIER_0",
            "TIER_1",
            "TIER_2",
            "ALL"
          ],
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The customer tier to target for this rollout. Each tier represents a different group of customers: 'TIER_0' for the highest-priority customers, 'TIER_1' for mid-tier customers, 'TIER_2' for the broadest customer group (default if not specified), 'ALL' to target all customer tiers. When not specified, the platform defaults to TIER_2 only."
    }
  },
  "required": [
    "docker_repository",
    "docker_image_tag",
    "actor_definition_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "description": "Result of a connector rollout start operation.\n\nThis model provides detailed information about the outcome of starting\na connector rollout workflow.",
  "properties": {
    "success": {
      "description": "Whether the operation succeeded",
      "type": "boolean"
    },
    "message": {
      "description": "Human-readable message describing the result",
      "type": "string"
    },
    "docker_repository": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The docker repository (e.g., 'airbyte/source-github')"
    },
    "docker_image_tag": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The docker image tag (e.g., '1.2.0-rc.2')"
    },
    "actor_definition_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The actor definition ID (UUID)"
    },
    "rollout_strategy": {
      "anyOf": [
        {
          "enum": [
            "manual",
            "automated",
            "overridden"
          ],
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The rollout strategy used"
    }
  },
  "required": [
    "success",
    "message"
  ],
  "type": "object"
}

  1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
  2"""MCP tools for connector rollout management.
  3
  4This module provides MCP tools for managing connector rollouts in Airbyte Cloud,
  5including finalizing (promoting, rolling back, or canceling) rollouts.
  6
  7## MCP reference
  8
  9.. include:: ../../../docs/mcp-generated/connector_rollout.md
 10    :start-line: 2
 11"""
 12
 13# NOTE: We intentionally do NOT use `from __future__ import annotations` here.
 14# FastMCP has issues resolving forward references when PEP 563 deferred annotations
 15# are used. See: https://github.com/jlowin/fastmcp/issues/905
 16# Python 3.12+ supports modern type hint syntax natively, so this is not needed.
 17
 18__all__: list[str] = []
 19
 20from dataclasses import dataclass
 21from typing import Annotated, Literal
 22
 23from airbyte import constants
 24from airbyte.exceptions import PyAirbyteInputError
 25from fastmcp import Context, FastMCP
 26from fastmcp_extensions import get_mcp_config, mcp_tool, register_mcp_tools
 27from pydantic import BaseModel, Field
 28
 29from airbyte_ops_mcp.approval_resolution import (
 30    ApprovalStatus,
 31    check_approval_status,
 32)
 33from airbyte_ops_mcp.cloud_admin import api_client
 34from airbyte_ops_mcp.cloud_admin.auth import (
 35    CloudAuthError,
 36    require_internal_admin_flag_only,
 37)
 38from airbyte_ops_mcp.cloud_admin.models import (
 39    ConnectorRolloutFinalizeResult,
 40    ConnectorRolloutProgressResult,
 41    ConnectorRolloutStartResult,
 42)
 43from airbyte_ops_mcp.constants import ServerConfigKey
 44
 45
 46@dataclass(frozen=True)
 47class _ResolvedCloudAuth:
 48    """Resolved authentication for Airbyte Cloud API calls.
 49
 50    Either bearer_token OR (client_id AND client_secret) will be set, not both.
 51    """
 52
 53    bearer_token: str | None = None
 54    client_id: str | None = None
 55    client_secret: str | None = None
 56
 57
 58def _resolve_cloud_auth(ctx: Context) -> _ResolvedCloudAuth:
 59    """Resolve authentication credentials for Airbyte Cloud API.
 60
 61    Credentials are resolved in priority order:
 62    1. Bearer token (Authorization header or AIRBYTE_CLOUD_BEARER_TOKEN env var)
 63    2. Client credentials (X-Airbyte-Cloud-Client-Id/Secret headers or env vars)
 64
 65    Args:
 66        ctx: FastMCP Context object from the current tool invocation.
 67
 68    Returns:
 69        _ResolvedCloudAuth with either bearer_token or client credentials set.
 70
 71    Raises:
 72        CloudAuthError: If credentials cannot be resolved from headers or env vars.
 73    """
 74    # Try bearer token first (preferred, but not required)
 75    bearer_token = get_mcp_config(ctx, ServerConfigKey.BEARER_TOKEN)
 76    if bearer_token:
 77        return _ResolvedCloudAuth(bearer_token=bearer_token)
 78
 79    # Fall back to client credentials
 80    try:
 81        client_id = get_mcp_config(ctx, ServerConfigKey.CLIENT_ID)
 82        client_secret = get_mcp_config(ctx, ServerConfigKey.CLIENT_SECRET)
 83        return _ResolvedCloudAuth(
 84            client_id=client_id,
 85            client_secret=client_secret,
 86        )
 87    except ValueError as e:
 88        raise CloudAuthError(
 89            f"Failed to resolve credentials. Ensure credentials are provided "
 90            f"via Authorization header (Bearer token), "
 91            f"HTTP headers (X-Airbyte-Cloud-Client-Id, X-Airbyte-Cloud-Client-Secret), "
 92            f"or environment variables. Error: {e}"
 93        ) from e
 94
 95
 96@mcp_tool(
 97    destructive=True,
 98    idempotent=False,
 99    open_world=True,
100)
101def start_connector_rollout(
102    docker_repository: Annotated[
103        str,
104        Field(description="The docker repository (e.g., 'airbyte/source-pokeapi')"),
105    ],
106    docker_image_tag: Annotated[
107        str,
108        Field(description="The docker image tag (e.g., '0.3.48-rc.1')"),
109    ],
110    actor_definition_id: Annotated[
111        str,
112        Field(description="The actor definition ID (UUID)"),
113    ],
114    approval_comment_url: Annotated[
115        str | None,
116        Field(
117            description="URL to the Slack approval record. Obtain this by calling the "
118            "`escalate_to_human` tool with `approval_requested=True`; the backend delivers "
119            "the approval record URL when a human clicks Approve. "
120            "Format: https://<workspace>.slack.com/archives/... "
121            "The admin email is automatically resolved from the approver's identity "
122            "via the team roster.",
123            default=None,
124        ),
125    ],
126    admin_user_email_override: Annotated[
127        str | None,
128        Field(
129            description="Direct admin email override for webapp-initiated actions. "
130            "When the Ops Webapp env var is set, this bypasses the approval URL "
131            "requirement. Ignored in agent/cron environments.",
132            default=None,
133        ),
134    ],
135    rollout_strategy: Annotated[
136        Literal["manual", "automated", "overridden"],
137        Field(
138            description="The rollout strategy: "
139            "'manual' for manual control of rollout progression, "
140            "'automated' for automatic progression based on metrics, "
141            "'overridden' for special cases where normal rules are bypassed.",
142            default="manual",
143        ),
144    ],
145    initial_rollout_pct: Annotated[
146        int | None,
147        Field(
148            description="Initial/step percentage for rollout progression (0-100). "
149            "For automated rollouts, this is the percentage increment per step. "
150            "For example, 25 means the rollout will advance by 25% each step. "
151            "Default is 25% if not specified.",
152            default=None,
153        ),
154    ],
155    final_target_rollout_pct: Annotated[
156        int | None,
157        Field(
158            description="Maximum percentage of actors to pin (0-100). "
159            "The rollout will not exceed this percentage. "
160            "For example, 50 means at most 50% of actors will be pinned to the RC. "
161            "Default is 50% if not specified.",
162            default=None,
163        ),
164    ],
165    customer_tier: Annotated[
166        Literal["TIER_0", "TIER_1", "TIER_2", "ALL"] | None,
167        Field(
168            description="The customer tier to target for this rollout. "
169            "Each tier represents a different group of customers: "
170            "'TIER_0' for the highest-priority customers, "
171            "'TIER_1' for mid-tier customers, "
172            "'TIER_2' for the broadest customer group (default if not specified), "
173            "'ALL' to target all customer tiers. "
174            "When not specified, the platform defaults to TIER_2 only.",
175            default=None,
176        ),
177    ],
178    *,
179    ctx: Context,
180) -> ConnectorRolloutStartResult:
181    """Start or configure a connector rollout workflow.
182
183    This tool configures and starts a connector rollout workflow. It can be called
184    multiple times while the rollout is in INITIALIZED state to update the configuration
185    (strategy, percentages). Once the Temporal workflow starts and the state transitions
186    to WORKFLOW_STARTED, the configuration is locked and cannot be changed.
187
188    **Behavior:**
189    - If rollout is INITIALIZED: Updates configuration and starts the workflow
190    - If rollout is already started: Returns an error (configuration is locked)
191
192    **Configuration Parameters:**
193    - rollout_strategy: 'manual' (default), 'automated', or 'overridden'
194    - initial_rollout_pct: Step size for progression (default: 25%)
195    - final_target_rollout_pct: Maximum percentage to pin (default: 50%)
196    - customer_tier: Customer tier to target - 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL' (default: TIER_2)
197
198    **Admin-only operation** - Requires:
199    - AIRBYTE_INTERNAL_ADMIN_FLAG=airbyte.io environment variable
200    - `approval_comment_url` (Slack approval record URL from `escalate_to_human`),
201      OR `admin_user_email_override` when running inside the Ops Webapp.
202    """
203    # Validate admin access (check env var flag)
204    try:
205        require_internal_admin_flag_only()
206    except CloudAuthError as e:
207        return ConnectorRolloutStartResult(
208            success=False,
209            message=f"Admin authentication failed: {e}",
210            docker_repository=docker_repository,
211            docker_image_tag=docker_image_tag,
212            actor_definition_id=actor_definition_id,
213        )
214
215    # Resolve admin email: webapp bypass or external approval URL
216    approval = check_approval_status(
217        approval_comment_url=approval_comment_url,
218        user_email=admin_user_email_override,
219    )
220    if approval.status != ApprovalStatus.APPROVED:
221        return ConnectorRolloutStartResult(
222            success=False,
223            message=approval.reason or "Approval check failed",
224            docker_repository=docker_repository,
225            docker_image_tag=docker_image_tag,
226            actor_definition_id=actor_definition_id,
227        )
228    admin_user_email = approval.admin_email
229
230    # Resolve auth credentials
231    try:
232        auth = _resolve_cloud_auth(ctx)
233    except CloudAuthError as e:
234        return ConnectorRolloutStartResult(
235            success=False,
236            message=f"Failed to resolve credentials: {e}",
237            docker_repository=docker_repository,
238            docker_image_tag=docker_image_tag,
239            actor_definition_id=actor_definition_id,
240        )
241
242    # Get user ID from admin email
243    try:
244        user_id = api_client.get_user_id_by_email(
245            email=admin_user_email,
246            config_api_root=constants.CLOUD_CONFIG_API_ROOT,
247            client_id=auth.client_id,
248            client_secret=auth.client_secret,
249            bearer_token=auth.bearer_token,
250        )
251    except PyAirbyteInputError as e:
252        return ConnectorRolloutStartResult(
253            success=False,
254            message=f"Failed to get user ID for admin email '{admin_user_email}': {e}",
255            docker_repository=docker_repository,
256            docker_image_tag=docker_image_tag,
257            actor_definition_id=actor_definition_id,
258        )
259
260    # Call the API to start the rollout
261    try:
262        api_client.start_connector_rollout(
263            docker_repository=docker_repository,
264            docker_image_tag=docker_image_tag,
265            actor_definition_id=actor_definition_id,
266            updated_by=user_id,
267            rollout_strategy=rollout_strategy,
268            config_api_root=constants.CLOUD_CONFIG_API_ROOT,
269            initial_rollout_pct=initial_rollout_pct,
270            final_target_rollout_pct=final_target_rollout_pct,
271            customer_tier=customer_tier,
272            client_id=auth.client_id,
273            client_secret=auth.client_secret,
274            bearer_token=auth.bearer_token,
275        )
276
277        # Build message with configuration details
278        config_details = []
279        if initial_rollout_pct is not None:
280            config_details.append(f"initial_rollout_pct={initial_rollout_pct}%")
281        if final_target_rollout_pct is not None:
282            config_details.append(
283                f"final_target_rollout_pct={final_target_rollout_pct}%"
284            )
285        if customer_tier is not None:
286            config_details.append(f"customer_tier={customer_tier}")
287        config_str = (
288            f" Configuration: {', '.join(config_details)}." if config_details else ""
289        )
290
291        return ConnectorRolloutStartResult(
292            success=True,
293            message=f"Successfully started rollout workflow for "
294            f"{docker_repository}:{docker_image_tag}. "
295            f"The rollout state has transitioned from INITIALIZED to WORKFLOW_STARTED."
296            f"{config_str}",
297            docker_repository=docker_repository,
298            docker_image_tag=docker_image_tag,
299            actor_definition_id=actor_definition_id,
300            rollout_strategy=rollout_strategy,
301        )
302
303    except PyAirbyteInputError as e:
304        return ConnectorRolloutStartResult(
305            success=False,
306            message=str(e),
307            docker_repository=docker_repository,
308            docker_image_tag=docker_image_tag,
309            actor_definition_id=actor_definition_id,
310        )
311
312
313@mcp_tool(
314    destructive=True,
315    idempotent=False,
316    open_world=True,
317)
318def progress_connector_rollout(
319    docker_repository: Annotated[
320        str,
321        Field(description="The docker repository (e.g., 'airbyte/source-pokeapi')"),
322    ],
323    docker_image_tag: Annotated[
324        str,
325        Field(description="The docker image tag (e.g., '0.3.48-rc.1')"),
326    ],
327    actor_definition_id: Annotated[
328        str,
329        Field(description="The actor definition ID (UUID)"),
330    ],
331    rollout_id: Annotated[
332        str,
333        Field(
334            description="The rollout ID (UUID). Can be found from query_prod_connector_rollouts."
335        ),
336    ],
337    approval_comment_url: Annotated[
338        str | None,
339        Field(
340            description="URL to the Slack approval record. Obtain this by calling the "
341            "`escalate_to_human` tool with `approval_requested=True`; the backend delivers "
342            "the approval record URL when a human clicks Approve. "
343            "Format: https://<workspace>.slack.com/archives/... "
344            "The admin email is automatically resolved from the approver's identity "
345            "via the team roster.",
346            default=None,
347        ),
348    ],
349    admin_user_email_override: Annotated[
350        str | None,
351        Field(
352            description="Direct admin email override for webapp-initiated actions. "
353            "When the Ops Webapp env var is set, this bypasses the approval URL "
354            "requirement. Ignored in agent/cron environments.",
355            default=None,
356        ),
357    ],
358    target_percentage: Annotated[
359        int | None,
360        Field(
361            description="Target percentage of actors to pin to the RC (1-100). "
362            "Either target_percentage or actor_ids must be provided.",
363            default=None,
364        ),
365    ] = None,
366    actor_ids: Annotated[
367        list[str] | None,
368        Field(
369            description="Specific actor IDs to pin to the RC. "
370            "Either target_percentage or actor_ids must be provided.",
371            default=None,
372        ),
373    ] = None,
374    *,
375    ctx: Context,
376) -> ConnectorRolloutProgressResult:
377    """Progress a connector rollout by pinning actors to the RC version.
378
379    This tool progresses a connector rollout by either:
380    - Setting a target percentage of actors to pin to the RC version
381    - Specifying specific actor IDs to pin
382
383    **Admin-only operation** - Requires:
384    - AIRBYTE_INTERNAL_ADMIN_FLAG=airbyte.io environment variable
385    - `approval_comment_url` (Slack approval record URL from `escalate_to_human`),
386      OR `admin_user_email_override` when running inside the Ops Webapp.
387    """
388    # Validate admin access (check env var flag)
389    try:
390        require_internal_admin_flag_only()
391    except CloudAuthError as e:
392        return ConnectorRolloutProgressResult(
393            success=False,
394            message=f"Admin authentication failed: {e}",
395            rollout_id=rollout_id,
396            docker_repository=docker_repository,
397            docker_image_tag=docker_image_tag,
398        )
399
400    # Validate that at least one of target_percentage or actor_ids is provided
401    if target_percentage is None and actor_ids is None:
402        return ConnectorRolloutProgressResult(
403            success=False,
404            message="Either target_percentage or actor_ids must be provided",
405            rollout_id=rollout_id,
406            docker_repository=docker_repository,
407            docker_image_tag=docker_image_tag,
408        )
409
410    # Resolve admin email: webapp bypass or external approval URL
411    approval = check_approval_status(
412        approval_comment_url=approval_comment_url,
413        user_email=admin_user_email_override,
414    )
415    if approval.status != ApprovalStatus.APPROVED:
416        return ConnectorRolloutProgressResult(
417            success=False,
418            message=approval.reason or "Approval check failed",
419            rollout_id=rollout_id,
420            docker_repository=docker_repository,
421            docker_image_tag=docker_image_tag,
422        )
423    admin_user_email = approval.admin_email
424
425    # Resolve auth credentials
426    try:
427        auth = _resolve_cloud_auth(ctx)
428    except CloudAuthError as e:
429        return ConnectorRolloutProgressResult(
430            success=False,
431            message=f"Failed to resolve credentials: {e}",
432            rollout_id=rollout_id,
433            docker_repository=docker_repository,
434            docker_image_tag=docker_image_tag,
435        )
436
437    # Get user ID from admin email
438    try:
439        user_id = api_client.get_user_id_by_email(
440            email=admin_user_email,
441            config_api_root=constants.CLOUD_CONFIG_API_ROOT,
442            client_id=auth.client_id,
443            client_secret=auth.client_secret,
444            bearer_token=auth.bearer_token,
445        )
446    except PyAirbyteInputError as e:
447        return ConnectorRolloutProgressResult(
448            success=False,
449            message=f"Failed to get user ID for admin email '{admin_user_email}': {e}",
450            rollout_id=rollout_id,
451            docker_repository=docker_repository,
452            docker_image_tag=docker_image_tag,
453        )
454
455    # Call the API to progress the rollout
456    try:
457        api_client.progress_connector_rollout(
458            docker_repository=docker_repository,
459            docker_image_tag=docker_image_tag,
460            actor_definition_id=actor_definition_id,
461            rollout_id=rollout_id,
462            updated_by=user_id,
463            config_api_root=constants.CLOUD_CONFIG_API_ROOT,
464            target_percentage=target_percentage,
465            actor_ids=actor_ids,
466            client_id=auth.client_id,
467            client_secret=auth.client_secret,
468            bearer_token=auth.bearer_token,
469        )
470
471        progress_msg = (
472            f"target_percentage={target_percentage}%"
473            if target_percentage
474            else f"{len(actor_ids) if actor_ids else 0} specific actors"
475        )
476        return ConnectorRolloutProgressResult(
477            success=True,
478            message=f"Successfully progressed rollout for "
479            f"{docker_repository}:{docker_image_tag} to {progress_msg}.",
480            rollout_id=rollout_id,
481            docker_repository=docker_repository,
482            docker_image_tag=docker_image_tag,
483            target_percentage=target_percentage,
484        )
485
486    except PyAirbyteInputError as e:
487        return ConnectorRolloutProgressResult(
488            success=False,
489            message=str(e),
490            rollout_id=rollout_id,
491            docker_repository=docker_repository,
492            docker_image_tag=docker_image_tag,
493        )
494
495
496@mcp_tool(
497    destructive=True,
498    idempotent=False,
499    open_world=True,
500)
501def finalize_connector_rollout(
502    docker_repository: Annotated[
503        str,
504        Field(
505            description="The docker repository (e.g., 'airbyte/source-youtube-analytics')"
506        ),
507    ],
508    docker_image_tag: Annotated[
509        str,
510        Field(description="The docker image tag (e.g., '1.2.0-rc.2')"),
511    ],
512    actor_definition_id: Annotated[
513        str,
514        Field(description="The actor definition ID (UUID)"),
515    ],
516    rollout_id: Annotated[
517        str,
518        Field(
519            description="The rollout ID (UUID). Can be found in the 'pin_origin' field "
520            "of rollout data from query_prod_actors_by_pinned_connector_version."
521        ),
522    ],
523    state: Annotated[
524        Literal["succeeded", "failed_rolled_back", "canceled"],
525        Field(
526            description="The final state for the rollout: "
527            "'succeeded' promotes the RC to GA (default version for all users), "
528            "'failed_rolled_back' rolls back the RC, "
529            "'canceled' cancels the rollout without promotion or rollback."
530        ),
531    ],
532    approval_comment_url: Annotated[
533        str | None,
534        Field(
535            description="URL to the Slack approval record. Obtain this by calling the "
536            "`escalate_to_human` tool with `approval_requested=True`; the backend delivers "
537            "the approval record URL when a human clicks Approve. "
538            "Format: https://<workspace>.slack.com/archives/... "
539            "The admin email is automatically resolved from the approver's identity "
540            "via the team roster.",
541            default=None,
542        ),
543    ],
544    admin_user_email_override: Annotated[
545        str | None,
546        Field(
547            description="Direct admin email override for webapp-initiated actions. "
548            "When the Ops Webapp env var is set, this bypasses the approval URL "
549            "requirement. Ignored in agent/cron environments.",
550            default=None,
551        ),
552    ],
553    error_msg: Annotated[
554        str | None,
555        Field(
556            description="Optional error message for failed/canceled states.",
557            default=None,
558        ),
559    ] = None,
560    failed_reason: Annotated[
561        str | None,
562        Field(
563            description="Optional failure reason for failed/canceled states.",
564            default=None,
565        ),
566    ] = None,
567    retain_pins_on_cancellation: Annotated[
568        bool | None,
569        Field(
570            description="If True, retain version pins when canceling. "
571            "Only applicable when state is 'canceled'.",
572            default=None,
573        ),
574    ] = None,
575    *,
576    ctx: Context,
577) -> ConnectorRolloutFinalizeResult:
578    """Finalize a connector rollout by promoting, rolling back, or canceling.
579
580    This tool allows admins to finalize connector rollouts that are in progress.
581    Use this after monitoring a rollout and determining it is ready for finalization.
582
583    **IMPORTANT: Finalization is asynchronous.** This tool sends a finalization
584    request to the platform API, which transitions the rollout to `finalizing`
585    state and triggers a Temporal workflow. The actual promotion (PR creation,
586    connector publish, registry update) or rollback (GCS cleanup, registry
587    recompile) happens asynchronously via the `finalize_rollout.yml` GitHub
588    Actions workflow. A successful response from this tool means the request
589    was accepted — NOT that the promotion/rollback is complete.
590
591    After calling this tool, you MUST verify:
592    1. The `finalize_rollout.yml` workflow ran successfully in GitHub Actions
593    2. For promotions: a merged PR exists (e.g., `chore: finalize promote for <connector>`)
594    3. The rollout state transitioned to its terminal state (`succeeded`,
595       `failed_rolled_back`, or `canceled`) via `query_prod_connector_rollouts`
596
597    **Admin-only operation** - Requires:
598    - AIRBYTE_INTERNAL_ADMIN_FLAG=airbyte.io environment variable
599    - `approval_comment_url` (Slack approval record URL from `escalate_to_human`),
600      OR `admin_user_email_override` when running inside the Ops Webapp.
601    """
602    # Validate admin access (check env var flag)
603    try:
604        require_internal_admin_flag_only()
605    except CloudAuthError as e:
606        return ConnectorRolloutFinalizeResult(
607            success=False,
608            message=f"Admin authentication failed: {e}",
609            rollout_id=rollout_id,
610            docker_repository=docker_repository,
611            docker_image_tag=docker_image_tag,
612        )
613
614    # Resolve admin email: webapp bypass or external approval URL
615    approval = check_approval_status(
616        approval_comment_url=approval_comment_url,
617        user_email=admin_user_email_override,
618    )
619    if approval.status != ApprovalStatus.APPROVED:
620        return ConnectorRolloutFinalizeResult(
621            success=False,
622            message=approval.reason or "Approval check failed",
623            rollout_id=rollout_id,
624            docker_repository=docker_repository,
625            docker_image_tag=docker_image_tag,
626        )
627    admin_user_email = approval.admin_email
628
629    # Resolve auth credentials
630    try:
631        auth = _resolve_cloud_auth(ctx)
632    except CloudAuthError as e:
633        return ConnectorRolloutFinalizeResult(
634            success=False,
635            message=f"Failed to resolve credentials: {e}",
636            rollout_id=rollout_id,
637            docker_repository=docker_repository,
638            docker_image_tag=docker_image_tag,
639        )
640
641    # Get user ID from admin email
642    try:
643        user_id = api_client.get_user_id_by_email(
644            email=admin_user_email,
645            config_api_root=constants.CLOUD_CONFIG_API_ROOT,
646            client_id=auth.client_id,
647            client_secret=auth.client_secret,
648            bearer_token=auth.bearer_token,
649        )
650    except PyAirbyteInputError as e:
651        return ConnectorRolloutFinalizeResult(
652            success=False,
653            message=f"Failed to get user ID for admin email '{admin_user_email}': {e}",
654            rollout_id=rollout_id,
655            docker_repository=docker_repository,
656            docker_image_tag=docker_image_tag,
657        )
658
659    # Call the API to finalize the rollout
660    try:
661        api_client.finalize_connector_rollout(
662            docker_repository=docker_repository,
663            docker_image_tag=docker_image_tag,
664            actor_definition_id=actor_definition_id,
665            rollout_id=rollout_id,
666            updated_by=user_id,
667            state=state,
668            config_api_root=constants.CLOUD_CONFIG_API_ROOT,
669            client_id=auth.client_id,
670            client_secret=auth.client_secret,
671            bearer_token=auth.bearer_token,
672            error_msg=error_msg,
673            failed_reason=failed_reason,
674            retain_pins_on_cancellation=retain_pins_on_cancellation,
675        )
676
677        state_descriptions = {
678            "succeeded": (
679                "GA promotion has been initiated (state: finalizing). "
680                "The actual promotion (PR creation, publish, registry update) "
681                "happens asynchronously via the finalize_rollout.yml GitHub Actions workflow. "
682                "You MUST verify the workflow completes successfully and the rollout "
683                "transitions to 'succeeded' state before reporting completion. "
684                "Check: (1) GitHub Actions for a 'Finalize Progressive Rollout' workflow run, "
685                "(2) a merged promotion PR, and "
686                "(3) query_prod_connector_rollouts to confirm state is 'succeeded'."
687            ),
688            "failed_rolled_back": (
689                "rollback has been initiated (state: finalizing). "
690                "The rollback happens asynchronously via the finalize_rollout.yml workflow. "
691                "Verify the workflow completes and the rollout transitions to "
692                "'failed_rolled_back' state."
693            ),
694            "canceled": "canceled",
695        }
696        state_desc = state_descriptions.get(state, state)
697
698        return ConnectorRolloutFinalizeResult(
699            success=True,
700            message=f"Finalization request accepted for {docker_repository}:{docker_image_tag}: "
701            f"{state_desc}",
702            rollout_id=rollout_id,
703            docker_repository=docker_repository,
704            docker_image_tag=docker_image_tag,
705            state=state,
706        )
707
708    except PyAirbyteInputError as e:
709        return ConnectorRolloutFinalizeResult(
710            success=False,
711            message=str(e),
712            rollout_id=rollout_id,
713            docker_repository=docker_repository,
714            docker_image_tag=docker_image_tag,
715        )
716
717
718class RolloutActorSelectionInfo(BaseModel):
719    """Actor selection info for a connector rollout."""
720
721    num_actors: int = Field(description="Total actors using this connector")
722    num_pinned_to_connector_rollout: int = Field(
723        description="Actors specifically pinned to this rollout"
724    )
725    num_actors_eligible_or_already_pinned: int = Field(
726        description="Actors eligible for pinning or already pinned"
727    )
728
729
730class RolloutActorSyncStats(BaseModel):
731    """Per-actor sync stats for a rollout (only syncs using the RC version)."""
732
733    actor_id: str = Field(description="Actor UUID")
734    num_connections: int = Field(description="Number of connections using this actor")
735    num_succeeded: int = Field(
736        description="Number of successful syncs using the RC version"
737    )
738    num_failed: int = Field(description="Number of failed syncs using the RC version")
739
740
741class RolloutMonitoringResult(BaseModel):
742    """Complete monitoring result for a rollout from the platform API.
743
744    This uses the platform API's /get_actor_sync_info endpoint which filters
745    sync stats to only include syncs that actually used the RC version
746    associated with the rollout.
747    """
748
749    rollout_id: str = Field(description="Rollout UUID")
750    actor_selection_info: RolloutActorSelectionInfo = Field(
751        description="Actor selection info for the rollout"
752    )
753    actor_sync_stats: list[RolloutActorSyncStats] = Field(
754        description="Per-actor sync stats for actors pinned to the rollout"
755    )
756
757
758@mcp_tool(
759    read_only=True,
760    idempotent=True,
761)
762def query_prod_rollout_monitoring_stats(
763    rollout_id: Annotated[
764        str,
765        Field(description="Rollout UUID to get monitoring stats for"),
766    ],
767    *,
768    ctx: Context,
769) -> RolloutMonitoringResult:
770    """Get monitoring stats for a connector rollout.
771
772    Returns actor selection info and per-actor sync stats for actors
773    participating in the rollout. This uses the platform API's
774    /get_actor_sync_info endpoint which filters sync stats to only include
775    syncs that actually used the RC version associated with the rollout.
776
777    This is more accurate than SQL-based approaches which count all syncs
778    regardless of which connector version was used.
779    """
780    auth = _resolve_cloud_auth(ctx)
781
782    response = api_client.get_actor_sync_info(
783        rollout_id=rollout_id,
784        config_api_root=constants.CLOUD_CONFIG_API_ROOT,
785        client_id=auth.client_id,
786        client_secret=auth.client_secret,
787        bearer_token=auth.bearer_token,
788    )
789
790    data = response.get("data", {})
791    actor_selection_info_data = data.get("actor_selection_info", {})
792    syncs_data = data.get("syncs", {})
793
794    actor_selection_info = RolloutActorSelectionInfo(
795        num_actors=actor_selection_info_data.get("num_actors", 0),
796        num_pinned_to_connector_rollout=actor_selection_info_data.get(
797            "num_pinned_to_connector_rollout", 0
798        ),
799        num_actors_eligible_or_already_pinned=actor_selection_info_data.get(
800            "num_actors_eligible_or_already_pinned", 0
801        ),
802    )
803
804    actor_sync_stats = [
805        RolloutActorSyncStats(
806            actor_id=actor_id,
807            num_connections=sync_info.get("num_connections", 0),
808            num_succeeded=sync_info.get("num_succeeded", 0),
809            num_failed=sync_info.get("num_failed", 0),
810        )
811        for actor_id, sync_info in syncs_data.items()
812    ]
813
814    return RolloutMonitoringResult(
815        rollout_id=rollout_id,
816        actor_selection_info=actor_selection_info,
817        actor_sync_stats=actor_sync_stats,
818    )
819
820
821def register_connector_rollout_tools(app: FastMCP) -> None:
822    """Register connector rollout tools with the FastMCP app.
823
824    Args:
825        app: FastMCP application instance
826    """
827    register_mcp_tools(app)