airbyte_ops_mcp.mcp.cloud_connector_versions

MCP tools for cloud connector version management.

This module provides MCP tools for viewing and managing connector version overrides (pins) in Airbyte Cloud. These tools enable admins to pin connectors to specific versions for troubleshooting or stability purposes.

Uses direct API client calls with either bearer token or client credentials auth.

MCP reference

MCP primitives registered by the cloud_connector_versions module of the airbyte-internal-ops server: 4 tool(s), 0 prompt(s), 0 resource(s).

Tools (4)

get_cloud_connector_version

Hints: read-only · idempotent · open-world

Get the current version information for a deployed connector.

Returns version details including the current version string and whether an override (pin) is applied.

Authentication credentials are resolved in priority order:

  1. Bearer token (Authorization header or AIRBYTE_CLOUD_BEARER_TOKEN env var)
  2. HTTP headers: X-Airbyte-Cloud-Client-Id, X-Airbyte-Cloud-Client-Secret
  3. Environment variables: AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET

Parameters:

Name Type Required Default Description
workspace_id string | enum("266ebdfe-0d7b-4540-9817-de7e4505ba61") yes The Airbyte Cloud workspace ID (UUID) or alias. Accepts '@devin-ai-sandbox' as an alias for the Devin AI sandbox workspace.
actor_id string yes The ID of the deployed connector (source or destination)
actor_type enum("source", "destination") yes The type of connector (source or destination)
config_api_root string | null no null Optional API root URL override for the Config API. Defaults to Airbyte Cloud (https://cloud.airbyte.com/api/v1). Use this to target local or self-hosted deployments.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "description": "Workspace ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@devin-ai-sandbox\") and its value\nis the actual workspace UUID. Use `WorkspaceAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
          "enum": [
            "266ebdfe-0d7b-4540-9817-de7e4505ba61"
          ],
          "type": "string"
        }
      ],
      "description": "The Airbyte Cloud workspace ID (UUID) or alias. Accepts '@devin-ai-sandbox' as an alias for the Devin AI sandbox workspace."
    },
    "actor_id": {
      "description": "The ID of the deployed connector (source or destination)",
      "type": "string"
    },
    "actor_type": {
      "description": "The type of connector (source or destination)",
      "enum": [
        "source",
        "destination"
      ],
      "type": "string"
    },
    "config_api_root": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional API root URL override for the Config API. Defaults to Airbyte Cloud (https://cloud.airbyte.com/api/v1). Use this to target local or self-hosted deployments."
    }
  },
  "required": [
    "workspace_id",
    "actor_id",
    "actor_type"
  ],
  "type": "object"
}

Show output JSON schema

{
  "description": "Information about a cloud connector's version.\n\nThis model represents the current version state of a deployed connector,\nincluding whether a version override (pin) is active.",
  "properties": {
    "connector_id": {
      "description": "The ID of the deployed connector",
      "type": "string"
    },
    "connector_type": {
      "description": "The type of connector (source or destination)",
      "enum": [
        "source",
        "destination"
      ],
      "type": "string"
    },
    "version": {
      "description": "The current version string (e.g., '0.1.0')",
      "type": "string"
    },
    "is_version_pinned": {
      "description": "Whether a version override is active for this connector",
      "type": "boolean"
    }
  },
  "required": [
    "connector_id",
    "connector_type",
    "version",
    "is_version_pinned"
  ],
  "type": "object"
}

set_cloud_connector_version_override

Hints: destructive · open-world

Set or clear a version override for a deployed connector.

Admin-only operation - Requires:

  • AIRBYTE_INTERNAL_ADMIN_FLAG=airbyte.io environment variable
  • issue_url parameter (GitHub issue URL for context)
  • approval_comment_url (Slack approval record URL from escalate_to_human)

The admin user email is automatically derived from the Slack approval record, resolving the approver's @airbyte.io email via the team roster.

You must specify EXACTLY ONE of version OR unset=True, but not both. When setting a version, override_reason is required.

The customer_tier_filter parameter gates the operation: the call fails if the actual tier of the workspace's organization does not match. Use ALL to bypass the check (a warning is still emitted for sensitive tiers).

Business rules enforced:

  • Dev versions (-dev): Only creator can unpin their own dev version override
  • Production versions: Require strong justification mentioning customer/support/investigation
  • Release candidates (-rc): Any admin can pin/unpin RC versions

Authentication credentials are resolved in priority order:

  1. Bearer token (Authorization header or AIRBYTE_CLOUD_BEARER_TOKEN env var)
  2. HTTP headers: X-Airbyte-Cloud-Client-Id, X-Airbyte-Cloud-Client-Secret
  3. Environment variables: AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET

Parameters:

Name Type Required Default Description
workspace_id string | enum("266ebdfe-0d7b-4540-9817-de7e4505ba61") yes The Airbyte Cloud workspace ID (UUID) or alias. Accepts '@devin-ai-sandbox' as an alias for the Devin AI sandbox workspace.
actor_id string yes The ID of the deployed connector (source or destination)
actor_type enum("source", "destination") yes The type of connector (source or destination)
approval_comment_url string | null no null URL to the Slack approval record. Obtain this by calling the escalate_to_human tool with approval_requested=True; the backend delivers the approval record URL when a human clicks Approve. Format: https://.slack.com/archives/... The admin email is automatically resolved from the approver's identity via the team roster.
version string | null no null The semver version string to pin to (e.g., '0.1.0'). Must be None if unset is True.
unset boolean no false If True, removes any existing version override. Cannot be True if version is provided.
override_reason string | null no null Required when setting a version. Explanation for the override (min 10 characters).
override_reason_reference_url string | null no null Optional URL with more context (e.g., issue link).
issue_url string | null no null URL to the GitHub issue providing context for this operation. Must be a valid GitHub URL (https://github.com/...). Required for authorization.
ai_agent_session_url string | null no null URL to the AI agent session driving this operation, if applicable. Provides additional auditability for AI-driven operations.
force boolean no false If True, allow overwriting an existing version pin. Existing pins may have been set by rollouts, breaking-change migrations, or other operators. Defaults to False. NOTE: force=True only bypasses the existing-pin check — major-version crossings are always blocked and cannot be overridden.
config_api_root string | null no null Optional API root URL override for the Config API. Defaults to Airbyte Cloud (https://cloud.airbyte.com/api/v1). Use this to target local or self-hosted deployments.
customer_tier_filter enum("TIER_0", "TIER_1", "TIER_2", "ALL") no "TIER_2" Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. The operation will be rejected if the actual customer tier does not match. Use 'ALL' to proceed regardless of tier (a warning is shown for sensitive tiers).

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "description": "Workspace ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@devin-ai-sandbox\") and its value\nis the actual workspace UUID. Use `WorkspaceAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
          "enum": [
            "266ebdfe-0d7b-4540-9817-de7e4505ba61"
          ],
          "type": "string"
        }
      ],
      "description": "The Airbyte Cloud workspace ID (UUID) or alias. Accepts '@devin-ai-sandbox' as an alias for the Devin AI sandbox workspace."
    },
    "actor_id": {
      "description": "The ID of the deployed connector (source or destination)",
      "type": "string"
    },
    "actor_type": {
      "description": "The type of connector (source or destination)",
      "enum": [
        "source",
        "destination"
      ],
      "type": "string"
    },
    "approval_comment_url": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "URL to the Slack approval record. Obtain this by calling the `escalate_to_human` tool with `approval_requested=True`; the backend delivers the approval record URL when a human clicks Approve. Format: https://<workspace>.slack.com/archives/... The admin email is automatically resolved from the approver's identity via the team roster."
    },
    "version": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The semver version string to pin to (e.g., '0.1.0'). Must be None if unset is True."
    },
    "unset": {
      "default": false,
      "description": "If True, removes any existing version override. Cannot be True if version is provided.",
      "type": "boolean"
    },
    "override_reason": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Required when setting a version. Explanation for the override (min 10 characters)."
    },
    "override_reason_reference_url": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional URL with more context (e.g., issue link)."
    },
    "issue_url": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "URL to the GitHub issue providing context for this operation. Must be a valid GitHub URL (https://github.com/...). Required for authorization."
    },
    "ai_agent_session_url": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "URL to the AI agent session driving this operation, if applicable. Provides additional auditability for AI-driven operations."
    },
    "force": {
      "default": false,
      "description": "If `True`, allow overwriting an existing version pin. Existing pins may have been set by rollouts, breaking-change migrations, or other operators. Defaults to `False`. NOTE: `force=True` only bypasses the existing-pin check \u2014 major-version crossings are always blocked and cannot be overridden.",
      "type": "boolean"
    },
    "config_api_root": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional API root URL override for the Config API. Defaults to Airbyte Cloud (https://cloud.airbyte.com/api/v1). Use this to target local or self-hosted deployments."
    },
    "customer_tier_filter": {
      "default": "TIER_2",
      "description": "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. The operation will be rejected if the actual customer tier does not match. Use 'ALL' to proceed regardless of tier (a warning is shown for sensitive tiers).",
      "enum": [
        "TIER_0",
        "TIER_1",
        "TIER_2",
        "ALL"
      ],
      "type": "string"
    }
  },
  "required": [
    "workspace_id",
    "actor_id",
    "actor_type"
  ],
  "type": "object"
}

Show output JSON schema

{
  "description": "Result of a version override operation (set or clear).\n\nThis model provides detailed information about the outcome of a version\npinning or unpinning operation.",
  "properties": {
    "success": {
      "description": "Whether the operation succeeded",
      "type": "boolean"
    },
    "message": {
      "description": "Human-readable message describing the result",
      "type": "string"
    },
    "connector_id": {
      "description": "The ID of the connector that was modified",
      "type": "string"
    },
    "connector_type": {
      "description": "The type of connector (source or destination)",
      "enum": [
        "source",
        "destination"
      ],
      "type": "string"
    },
    "previous_version": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The version before the operation (None if not available)"
    },
    "new_version": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The version after the operation (None if cleared or failed)"
    },
    "was_pinned_before": {
      "anyOf": [
        {
          "type": "boolean"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Whether a pin was active before the operation"
    },
    "is_pinned_after": {
      "anyOf": [
        {
          "type": "boolean"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Whether a pin is active after the operation"
    },
    "customer_tier": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Customer tier of the affected entity (TIER_0, TIER_1, TIER_2). Included as a guardrail annotation."
    },
    "is_eu": {
      "anyOf": [
        {
          "type": "boolean"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Whether the affected entity is in the EU region."
    },
    "tier_warning": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Warning message if the operation targets a sensitive customer tier."
    }
  },
  "required": [
    "success",
    "message",
    "connector_id",
    "connector_type"
  ],
  "type": "object"
}

set_organization_connector_version_override

Hints: destructive · open-world

Set or clear an organization-level version override for a connector type.

This pins ALL instances of a connector type across an entire organization to a specific version. For example, pinning 'source-github' at organization level means all GitHub sources in all workspaces within that organization will use the pinned version.

Admin-only operation - Requires:

  • AIRBYTE_INTERNAL_ADMIN_FLAG=airbyte.io environment variable
  • issue_url parameter (GitHub issue URL for context)
  • approval_comment_url (Slack approval record URL from escalate_to_human)

You must specify EXACTLY ONE of version OR unset=True, but not both. When setting a version, override_reason is required.

The customer_tier_filter parameter gates the operation: the call fails if the actual tier of the organization does not match. Use ALL to bypass the check (a warning is still emitted for sensitive tiers).

Parameters:

Name Type Required Default Description
organization_id string yes The Airbyte Cloud organization ID.
connector_name string yes The connector name (e.g., 'source-github', 'destination-bigquery').
connector_type enum("source", "destination") yes The type of connector (source or destination)
approval_comment_url string | null no null URL to the Slack approval record. Obtain this by calling the escalate_to_human tool with approval_requested=True; the backend delivers the approval record URL when a human clicks Approve. Format: https://.slack.com/archives/... The admin email is automatically resolved from the approver's identity via the team roster.
version string | null no null The semver version string to pin to (e.g., '0.1.0'). Must be None if unset is True.
unset boolean no false If True, removes any existing version override. Cannot be True if version is provided.
override_reason string | null no null Required when setting a version. Explanation for the override (min 10 characters).
override_reason_reference_url string | null no null Optional URL with more context (e.g., issue link).
issue_url string | null no null URL to the GitHub issue providing context for this operation. Must be a valid GitHub URL (https://github.com/...). Required for authorization.
ai_agent_session_url string | null no null URL to the AI agent session driving this operation, if applicable. Provides additional auditability for AI-driven operations.
force boolean no false If True, allow overwriting an existing version pin. Existing pins may have been set by rollouts, breaking-change migrations, or other operators. Defaults to False. NOTE: force=True only bypasses the existing-pin check — major-version crossings are always blocked and cannot be overridden.
config_api_root string | null no null Optional API root URL override for the Config API. Defaults to Airbyte Cloud (https://cloud.airbyte.com/api/v1). Use this to target local or self-hosted deployments.
customer_tier_filter enum("TIER_0", "TIER_1", "TIER_2", "ALL") no "TIER_2" Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. The operation will be rejected if the actual customer tier does not match. Use 'ALL' to proceed regardless of tier (a warning is shown for sensitive tiers).

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "organization_id": {
      "description": "The Airbyte Cloud organization ID.",
      "type": "string"
    },
    "connector_name": {
      "description": "The connector name (e.g., 'source-github', 'destination-bigquery').",
      "type": "string"
    },
    "connector_type": {
      "description": "The type of connector (source or destination)",
      "enum": [
        "source",
        "destination"
      ],
      "type": "string"
    },
    "approval_comment_url": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "URL to the Slack approval record. Obtain this by calling the `escalate_to_human` tool with `approval_requested=True`; the backend delivers the approval record URL when a human clicks Approve. Format: https://<workspace>.slack.com/archives/... The admin email is automatically resolved from the approver's identity via the team roster."
    },
    "version": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The semver version string to pin to (e.g., '0.1.0'). Must be None if unset is True."
    },
    "unset": {
      "default": false,
      "description": "If True, removes any existing version override. Cannot be True if version is provided.",
      "type": "boolean"
    },
    "override_reason": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Required when setting a version. Explanation for the override (min 10 characters)."
    },
    "override_reason_reference_url": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional URL with more context (e.g., issue link)."
    },
    "issue_url": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "URL to the GitHub issue providing context for this operation. Must be a valid GitHub URL (https://github.com/...). Required for authorization."
    },
    "ai_agent_session_url": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "URL to the AI agent session driving this operation, if applicable. Provides additional auditability for AI-driven operations."
    },
    "force": {
      "default": false,
      "description": "If `True`, allow overwriting an existing version pin. Existing pins may have been set by rollouts, breaking-change migrations, or other operators. Defaults to `False`. NOTE: `force=True` only bypasses the existing-pin check \u2014 major-version crossings are always blocked and cannot be overridden.",
      "type": "boolean"
    },
    "config_api_root": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional API root URL override for the Config API. Defaults to Airbyte Cloud (https://cloud.airbyte.com/api/v1). Use this to target local or self-hosted deployments."
    },
    "customer_tier_filter": {
      "default": "TIER_2",
      "description": "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. The operation will be rejected if the actual customer tier does not match. Use 'ALL' to proceed regardless of tier (a warning is shown for sensitive tiers).",
      "enum": [
        "TIER_0",
        "TIER_1",
        "TIER_2",
        "ALL"
      ],
      "type": "string"
    }
  },
  "required": [
    "organization_id",
    "connector_name",
    "connector_type"
  ],
  "type": "object"
}

Show output JSON schema

{
  "description": "Result of an organization-level version override operation.\n\nThis model provides detailed information about the outcome of an organization-level\nversion pinning or unpinning operation.",
  "properties": {
    "success": {
      "description": "Whether the operation succeeded",
      "type": "boolean"
    },
    "message": {
      "description": "Human-readable message describing the result",
      "type": "string"
    },
    "organization_id": {
      "description": "The organization ID",
      "type": "string"
    },
    "connector_name": {
      "description": "The connector name (e.g., 'source-github')",
      "type": "string"
    },
    "connector_type": {
      "description": "The type of connector (source or destination)",
      "enum": [
        "source",
        "destination"
      ],
      "type": "string"
    },
    "version": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The version that was pinned (None if cleared or failed)"
    },
    "customer_tier": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Customer tier of the organization (TIER_0, TIER_1, TIER_2). Included as a guardrail annotation."
    },
    "tier_warning": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Warning message if the operation targets a sensitive customer tier."
    }
  },
  "required": [
    "success",
    "message",
    "organization_id",
    "connector_name",
    "connector_type"
  ],
  "type": "object"
}

set_workspace_connector_version_override

Hints: destructive · open-world

Set or clear a workspace-level version override for a connector type.

This pins ALL instances of a connector type within a workspace to a specific version. For example, pinning 'source-github' at workspace level means all GitHub sources in that workspace will use the pinned version.

Admin-only operation - Requires:

  • AIRBYTE_INTERNAL_ADMIN_FLAG=airbyte.io environment variable
  • issue_url parameter (GitHub issue URL for context)
  • approval_comment_url (Slack approval record URL from escalate_to_human)

You must specify EXACTLY ONE of version OR unset=True, but not both. When setting a version, override_reason is required.

The customer_tier_filter parameter gates the operation: the call fails if the actual tier of the workspace's organization does not match. Use ALL to bypass the check (a warning is still emitted for sensitive tiers).

Parameters:

Name Type Required Default Description
workspace_id string | enum("266ebdfe-0d7b-4540-9817-de7e4505ba61") yes The Airbyte Cloud workspace ID (UUID) or alias. Accepts '@devin-ai-sandbox' as an alias for the Devin AI sandbox workspace.
connector_name string yes The connector name (e.g., 'source-github', 'destination-bigquery').
connector_type enum("source", "destination") yes The type of connector (source or destination)
approval_comment_url string | null no null URL to the Slack approval record. Obtain this by calling the escalate_to_human tool with approval_requested=True; the backend delivers the approval record URL when a human clicks Approve. Format: https://.slack.com/archives/... The admin email is automatically resolved from the approver's identity via the team roster.
version string | null no null The semver version string to pin to (e.g., '0.1.0'). Must be None if unset is True.
unset boolean no false If True, removes any existing version override. Cannot be True if version is provided.
override_reason string | null no null Required when setting a version. Explanation for the override (min 10 characters).
override_reason_reference_url string | null no null Optional URL with more context (e.g., issue link).
issue_url string | null no null URL to the GitHub issue providing context for this operation. Must be a valid GitHub URL (https://github.com/...). Required for authorization.
ai_agent_session_url string | null no null URL to the AI agent session driving this operation, if applicable. Provides additional auditability for AI-driven operations.
force boolean no false If True, allow overwriting an existing version pin. Existing pins may have been set by rollouts, breaking-change migrations, or other operators. Defaults to False. NOTE: force=True only bypasses the existing-pin check — major-version crossings are always blocked and cannot be overridden.
config_api_root string | null no null Optional API root URL override for the Config API. Defaults to Airbyte Cloud (https://cloud.airbyte.com/api/v1). Use this to target local or self-hosted deployments.
customer_tier_filter enum("TIER_0", "TIER_1", "TIER_2", "ALL") no "TIER_2" Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. The operation will be rejected if the actual customer tier does not match. Use 'ALL' to proceed regardless of tier (a warning is shown for sensitive tiers).

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "description": "Workspace ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@devin-ai-sandbox\") and its value\nis the actual workspace UUID. Use `WorkspaceAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
          "enum": [
            "266ebdfe-0d7b-4540-9817-de7e4505ba61"
          ],
          "type": "string"
        }
      ],
      "description": "The Airbyte Cloud workspace ID (UUID) or alias. Accepts '@devin-ai-sandbox' as an alias for the Devin AI sandbox workspace."
    },
    "connector_name": {
      "description": "The connector name (e.g., 'source-github', 'destination-bigquery').",
      "type": "string"
    },
    "connector_type": {
      "description": "The type of connector (source or destination)",
      "enum": [
        "source",
        "destination"
      ],
      "type": "string"
    },
    "approval_comment_url": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "URL to the Slack approval record. Obtain this by calling the `escalate_to_human` tool with `approval_requested=True`; the backend delivers the approval record URL when a human clicks Approve. Format: https://<workspace>.slack.com/archives/... The admin email is automatically resolved from the approver's identity via the team roster."
    },
    "version": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The semver version string to pin to (e.g., '0.1.0'). Must be None if unset is True."
    },
    "unset": {
      "default": false,
      "description": "If True, removes any existing version override. Cannot be True if version is provided.",
      "type": "boolean"
    },
    "override_reason": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Required when setting a version. Explanation for the override (min 10 characters)."
    },
    "override_reason_reference_url": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional URL with more context (e.g., issue link)."
    },
    "issue_url": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "URL to the GitHub issue providing context for this operation. Must be a valid GitHub URL (https://github.com/...). Required for authorization."
    },
    "ai_agent_session_url": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "URL to the AI agent session driving this operation, if applicable. Provides additional auditability for AI-driven operations."
    },
    "force": {
      "default": false,
      "description": "If `True`, allow overwriting an existing version pin. Existing pins may have been set by rollouts, breaking-change migrations, or other operators. Defaults to `False`. NOTE: `force=True` only bypasses the existing-pin check \u2014 major-version crossings are always blocked and cannot be overridden.",
      "type": "boolean"
    },
    "config_api_root": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional API root URL override for the Config API. Defaults to Airbyte Cloud (https://cloud.airbyte.com/api/v1). Use this to target local or self-hosted deployments."
    },
    "customer_tier_filter": {
      "default": "TIER_2",
      "description": "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. The operation will be rejected if the actual customer tier does not match. Use 'ALL' to proceed regardless of tier (a warning is shown for sensitive tiers).",
      "enum": [
        "TIER_0",
        "TIER_1",
        "TIER_2",
        "ALL"
      ],
      "type": "string"
    }
  },
  "required": [
    "workspace_id",
    "connector_name",
    "connector_type"
  ],
  "type": "object"
}

Show output JSON schema

{
  "description": "Result of a workspace-level version override operation.\n\nThis model provides detailed information about the outcome of a workspace-level\nversion pinning or unpinning operation.",
  "properties": {
    "success": {
      "description": "Whether the operation succeeded",
      "type": "boolean"
    },
    "message": {
      "description": "Human-readable message describing the result",
      "type": "string"
    },
    "workspace_id": {
      "description": "The workspace ID",
      "type": "string"
    },
    "connector_name": {
      "description": "The connector name (e.g., 'source-github')",
      "type": "string"
    },
    "connector_type": {
      "description": "The type of connector (source or destination)",
      "enum": [
        "source",
        "destination"
      ],
      "type": "string"
    },
    "version": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The version that was pinned (None if cleared or failed)"
    },
    "customer_tier": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Customer tier of the workspace's organization (TIER_0, TIER_1, TIER_2). Included as a guardrail annotation."
    },
    "is_eu": {
      "anyOf": [
        {
          "type": "boolean"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Whether the workspace is in the EU region."
    },
    "tier_warning": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Warning message if the operation targets a sensitive customer tier."
    }
  },
  "required": [
    "success",
    "message",
    "workspace_id",
    "connector_name",
    "connector_type"
  ],
  "type": "object"
}

   1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
   2"""MCP tools for cloud connector version management.
   3
   4This module provides MCP tools for viewing and managing connector version
   5overrides (pins) in Airbyte Cloud. These tools enable admins to pin connectors
   6to specific versions for troubleshooting or stability purposes.
   7
   8Uses direct API client calls with either bearer token or client credentials auth.
   9
  10## MCP reference
  11
  12.. include:: ../../../docs/mcp-generated/cloud_connector_versions.md
  13    :start-line: 2
  14"""
  15
  16# NOTE: We intentionally do NOT use `from __future__ import annotations` here.
  17# FastMCP has issues resolving forward references when PEP 563 deferred annotations
  18# are used. See: https://github.com/jlowin/fastmcp/issues/905
  19# Python 3.12+ supports modern type hint syntax natively, so this is not needed.
  20
  21__all__: list[str] = []
  22
  23import logging
  24from dataclasses import dataclass
  25from typing import Annotated, Literal
  26
  27import requests as _requests
  28from airbyte import constants
  29from airbyte.exceptions import PyAirbyteInputError
  30from fastmcp import Context, FastMCP
  31from fastmcp_extensions import get_mcp_config, mcp_tool, register_mcp_tools
  32from pydantic import Field
  33
  34from airbyte_ops_mcp.approval_resolution import (
  35    ApprovalResolutionError,
  36    resolve_admin_email_from_approval,
  37)
  38from airbyte_ops_mcp.cloud_admin import api_client
  39from airbyte_ops_mcp.cloud_admin.api_client import (
  40    _get_access_token,
  41    _ScopeType,
  42)
  43from airbyte_ops_mcp.cloud_admin.auth import (
  44    CloudAuthError,
  45    require_internal_admin_flag_only,
  46)
  47from airbyte_ops_mcp.cloud_admin.models import (
  48    ConnectorVersionInfo,
  49    OrganizationVersionOverrideResult,
  50    VersionOverrideOperationResult,
  51    WorkspaceVersionOverrideResult,
  52)
  53from airbyte_ops_mcp.cloud_admin.version_guard import (
  54    check_existing_pins,
  55)
  56from airbyte_ops_mcp.constants import USER_AGENT, ServerConfigKey, WorkspaceAliasEnum
  57from airbyte_ops_mcp.tier_cache import TierFilter, get_org_tier, resolve_workspace
  58
  59
  60def _build_tier_warning(customer_tier: str) -> str | None:
  61    """Build a warning message for sensitive customer tiers."""
  62    if customer_tier == "TIER_0":
  63        return (
  64            "WARNING: This is a TIER_0 (highest-value) customer. "
  65            "Proceed with extreme caution."
  66        )
  67    if customer_tier == "TIER_1":
  68        return "WARNING: This is a TIER_1 (high-value) customer. Proceed with caution."
  69    return None
  70
  71
  72def _validate_tier_filter(
  73    actual_tier: str,
  74    requested_filter: TierFilter,
  75) -> tuple[bool, str | None]:
  76    """Check if actual tier matches the requested filter.
  77
  78    Returns `(ok, error_message)`.  When `ok` is `False` the caller should
  79    reject the operation with `error_message`.
  80    """
  81    if requested_filter == "ALL":
  82        return True, None
  83    if actual_tier != requested_filter:
  84        return False, (
  85            f"Tier mismatch: the target entity is {actual_tier} but the requested "
  86            f"tier filter is {requested_filter}. Either specify the correct tier "
  87            f"or use 'ALL' to proceed with a warning."
  88        )
  89    return True, None
  90
  91
  92logger = logging.getLogger(__name__)
  93
  94
  95@dataclass(frozen=True)
  96class _ResolvedCloudAuth:
  97    """Resolved authentication for Airbyte Cloud API calls.
  98
  99    Either bearer_token OR (client_id AND client_secret) will be set, not both.
 100    """
 101
 102    bearer_token: str | None = None
 103    client_id: str | None = None
 104    client_secret: str | None = None
 105
 106
 107def _resolve_cloud_auth(ctx: Context) -> _ResolvedCloudAuth:
 108    """Resolve authentication credentials for Airbyte Cloud API.
 109
 110    Credentials are resolved in priority order:
 111    1. Bearer token (Authorization header or AIRBYTE_CLOUD_BEARER_TOKEN env var)
 112    2. Client credentials (X-Airbyte-Cloud-Client-Id/Secret headers or env vars)
 113
 114    Args:
 115        ctx: FastMCP Context object from the current tool invocation.
 116
 117    Returns:
 118        _ResolvedCloudAuth with either bearer_token or client credentials set.
 119
 120    Raises:
 121        CloudAuthError: If credentials cannot be resolved from headers or env vars.
 122    """
 123    # Try bearer token first (preferred, but not required)
 124    bearer_token = get_mcp_config(ctx, ServerConfigKey.BEARER_TOKEN)
 125    if bearer_token:
 126        return _ResolvedCloudAuth(bearer_token=bearer_token)
 127
 128    # Fall back to client credentials
 129    try:
 130        client_id = get_mcp_config(ctx, ServerConfigKey.CLIENT_ID)
 131        client_secret = get_mcp_config(ctx, ServerConfigKey.CLIENT_SECRET)
 132        return _ResolvedCloudAuth(
 133            client_id=client_id,
 134            client_secret=client_secret,
 135        )
 136    except ValueError as e:
 137        raise CloudAuthError(
 138            f"Failed to resolve credentials. Ensure credentials are provided "
 139            f"via Authorization header (Bearer token), "
 140            f"HTTP headers (X-Airbyte-Cloud-Client-Id, X-Airbyte-Cloud-Client-Secret), "
 141            f"or environment variables. Error: {e}"
 142        ) from e
 143
 144
 145@mcp_tool(
 146    read_only=True,
 147    idempotent=True,
 148    open_world=True,
 149)
 150def get_cloud_connector_version(
 151    workspace_id: Annotated[
 152        str | WorkspaceAliasEnum,
 153        Field(
 154            description="The Airbyte Cloud workspace ID (UUID) or alias. "
 155            "Accepts '@devin-ai-sandbox' as an alias for the Devin AI sandbox workspace."
 156        ),
 157    ],
 158    actor_id: Annotated[
 159        str, "The ID of the deployed connector (source or destination)"
 160    ],
 161    actor_type: Annotated[
 162        Literal["source", "destination"],
 163        "The type of connector (source or destination)",
 164    ],
 165    config_api_root: Annotated[
 166        str | None,
 167        Field(
 168            description="Optional API root URL override for the Config API. "
 169            "Defaults to Airbyte Cloud (https://cloud.airbyte.com/api/v1). "
 170            "Use this to target local or self-hosted deployments.",
 171            default=None,
 172        ),
 173    ] = None,
 174    *,
 175    ctx: Context,
 176) -> ConnectorVersionInfo:
 177    """Get the current version information for a deployed connector.
 178
 179    Returns version details including the current version string and whether
 180    an override (pin) is applied.
 181
 182    Authentication credentials are resolved in priority order:
 183    1. Bearer token (Authorization header or AIRBYTE_CLOUD_BEARER_TOKEN env var)
 184    2. HTTP headers: X-Airbyte-Cloud-Client-Id, X-Airbyte-Cloud-Client-Secret
 185    3. Environment variables: AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET
 186    """
 187    # Resolve workspace ID alias
 188    resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id)
 189
 190    auth = _resolve_cloud_auth(ctx)
 191
 192    # Use vendored API client instead of connector.get_connector_version()
 193    # Use Config API root for version management operations
 194    # Pass workspace_id to get detailed scoped configuration context
 195    version_data = api_client.get_connector_version(
 196        connector_id=actor_id,
 197        connector_type=actor_type,
 198        config_api_root=config_api_root or constants.CLOUD_CONFIG_API_ROOT,
 199        client_id=auth.client_id,
 200        client_secret=auth.client_secret,
 201        bearer_token=auth.bearer_token,
 202        workspace_id=resolved_workspace_id,
 203    )
 204
 205    # Determine if version is pinned from scoped config context (more reliable)
 206    # The API's isVersionOverrideApplied only returns true for USER-created pins,
 207    # not system-generated pins (e.g., breaking_change origin). Check scopedConfigs
 208    # for a more accurate picture of whether ANY pin exists.
 209    scoped_configs = version_data.get("scopedConfigs", {})
 210    has_any_pin = (
 211        any(config is not None for config in scoped_configs.values())
 212        if scoped_configs
 213        else False
 214    )
 215
 216    # Use scoped config existence as the source of truth for "is pinned"
 217    # Fall back to API's isVersionOverrideApplied if no scoped config data
 218    is_pinned = (
 219        has_any_pin if scoped_configs else version_data["isVersionOverrideApplied"]
 220    )
 221
 222    return ConnectorVersionInfo(
 223        connector_id=actor_id,
 224        connector_type=actor_type,
 225        version=version_data["dockerImageTag"],
 226        is_version_pinned=is_pinned,
 227    )
 228
 229
 230@mcp_tool(
 231    destructive=True,
 232    idempotent=False,
 233    open_world=True,
 234)
 235def set_cloud_connector_version_override(
 236    workspace_id: Annotated[
 237        str | WorkspaceAliasEnum,
 238        Field(
 239            description="The Airbyte Cloud workspace ID (UUID) or alias. "
 240            "Accepts '@devin-ai-sandbox' as an alias for the Devin AI sandbox workspace."
 241        ),
 242    ],
 243    actor_id: Annotated[
 244        str, "The ID of the deployed connector (source or destination)"
 245    ],
 246    actor_type: Annotated[
 247        Literal["source", "destination"],
 248        "The type of connector (source or destination)",
 249    ],
 250    approval_comment_url: Annotated[
 251        str | None,
 252        Field(
 253            description="URL to the Slack approval record. Obtain this by calling the "
 254            "`escalate_to_human` tool with `approval_requested=True`; the backend delivers "
 255            "the approval record URL when a human clicks Approve. "
 256            "Format: https://<workspace>.slack.com/archives/... "
 257            "The admin email is automatically resolved from the approver's identity "
 258            "via the team roster.",
 259            default=None,
 260        ),
 261    ],
 262    version: Annotated[
 263        str | None,
 264        Field(
 265            description="The semver version string to pin to (e.g., '0.1.0'). "
 266            "Must be None if unset is True.",
 267            default=None,
 268        ),
 269    ],
 270    unset: Annotated[
 271        bool,
 272        Field(
 273            description="If True, removes any existing version override. "
 274            "Cannot be True if version is provided.",
 275            default=False,
 276        ),
 277    ],
 278    override_reason: Annotated[
 279        str | None,
 280        Field(
 281            description="Required when setting a version. "
 282            "Explanation for the override (min 10 characters).",
 283            default=None,
 284        ),
 285    ],
 286    override_reason_reference_url: Annotated[
 287        str | None,
 288        Field(
 289            description="Optional URL with more context (e.g., issue link).",
 290            default=None,
 291        ),
 292    ],
 293    issue_url: Annotated[
 294        str | None,
 295        Field(
 296            description="URL to the GitHub issue providing context for this operation. "
 297            "Must be a valid GitHub URL (https://github.com/...). Required for authorization.",
 298            default=None,
 299        ),
 300    ],
 301    ai_agent_session_url: Annotated[
 302        str | None,
 303        Field(
 304            description="URL to the AI agent session driving this operation, if applicable. "
 305            "Provides additional auditability for AI-driven operations.",
 306            default=None,
 307        ),
 308    ] = None,
 309    force: Annotated[
 310        bool,
 311        Field(
 312            description="If `True`, allow overwriting an existing version pin. "
 313            "Existing pins may have been set by rollouts, breaking-change migrations, "
 314            "or other operators. Defaults to `False`. NOTE: `force=True` only "
 315            "bypasses the existing-pin check — major-version crossings are always "
 316            "blocked and cannot be overridden.",
 317            default=False,
 318        ),
 319    ] = False,
 320    config_api_root: Annotated[
 321        str | None,
 322        Field(
 323            description="Optional API root URL override for the Config API. "
 324            "Defaults to Airbyte Cloud (https://cloud.airbyte.com/api/v1). "
 325            "Use this to target local or self-hosted deployments.",
 326            default=None,
 327        ),
 328    ] = None,
 329    customer_tier_filter: Annotated[
 330        TierFilter,
 331        Field(
 332            description=(
 333                "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. "
 334                "The operation will be rejected if the actual customer tier does not match. "
 335                "Use 'ALL' to proceed regardless of tier (a warning is shown for sensitive tiers)."
 336            ),
 337        ),
 338    ] = "TIER_2",
 339    *,
 340    ctx: Context,
 341) -> VersionOverrideOperationResult:
 342    """Set or clear a version override for a deployed connector.
 343
 344    **Admin-only operation** - Requires:
 345    - AIRBYTE_INTERNAL_ADMIN_FLAG=airbyte.io environment variable
 346    - issue_url parameter (GitHub issue URL for context)
 347    - approval_comment_url (Slack approval record URL from `escalate_to_human`)
 348
 349    The admin user email is automatically derived from the Slack approval record,
 350    resolving the approver's @airbyte.io email via the team roster.
 351
 352    You must specify EXACTLY ONE of `version` OR `unset=True`, but not both.
 353    When setting a version, `override_reason` is required.
 354
 355    The `customer_tier_filter` parameter gates the operation: the call fails if
 356    the actual tier of the workspace's organization does not match.  Use `ALL`
 357    to bypass the check (a warning is still emitted for sensitive tiers).
 358
 359    Business rules enforced:
 360    - Dev versions (-dev): Only creator can unpin their own dev version override
 361    - Production versions: Require strong justification mentioning customer/support/investigation
 362    - Release candidates (-rc): Any admin can pin/unpin RC versions
 363
 364    Authentication credentials are resolved in priority order:
 365    1. Bearer token (Authorization header or AIRBYTE_CLOUD_BEARER_TOKEN env var)
 366    2. HTTP headers: X-Airbyte-Cloud-Client-Id, X-Airbyte-Cloud-Client-Secret
 367    3. Environment variables: AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET
 368    """
 369    # Resolve workspace ID alias (workspace_id is required, so resolved value is never None)
 370    resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id)
 371    assert resolved_workspace_id is not None  # Type narrowing: workspace_id is required
 372
 373    # Validate admin access (check env var flag)
 374    try:
 375        require_internal_admin_flag_only()
 376    except CloudAuthError as e:
 377        return VersionOverrideOperationResult(
 378            success=False,
 379            message=f"Admin authentication failed: {e}",
 380            connector_id=actor_id,
 381            connector_type=actor_type,
 382        )
 383
 384    # Validate authorization parameters
 385    validation_errors: list[str] = []
 386
 387    if not issue_url:
 388        validation_errors.append(
 389            "issue_url is required for authorization (GitHub issue URL)"
 390        )
 391    elif not issue_url.startswith("https://github.com/"):
 392        validation_errors.append(
 393            f"issue_url must be a valid GitHub URL (https://github.com/...), got: {issue_url}"
 394        )
 395
 396    if not approval_comment_url:
 397        validation_errors.append(
 398            "'approval_comment_url' is required. Use `escalate_to_human` with "
 399            "`approval_requested=True` to obtain a Slack approval record URL."
 400        )
 401
 402    if validation_errors:
 403        return VersionOverrideOperationResult(
 404            success=False,
 405            message="Authorization validation failed: " + "; ".join(validation_errors),
 406            connector_id=actor_id,
 407            connector_type=actor_type,
 408        )
 409
 410    # Derive admin email from the approval URL (domain-based dispatch)
 411    try:
 412        admin_user_email = resolve_admin_email_from_approval(
 413            approval_comment_url=approval_comment_url,
 414        )
 415    except ApprovalResolutionError as e:
 416        return VersionOverrideOperationResult(
 417            success=False,
 418            message=str(e),
 419            connector_id=actor_id,
 420            connector_type=actor_type,
 421        )
 422
 423    # Tier guardrail: resolve workspace tier BEFORE the destructive operation
 424    ws_resolution = resolve_workspace(resolved_workspace_id)
 425    if not ws_resolution.organization_id:
 426        return VersionOverrideOperationResult(
 427            success=False,
 428            message=f"Could not resolve organization for workspace {resolved_workspace_id}",
 429            connector_id=actor_id,
 430            connector_type=actor_type,
 431        )
 432
 433    customer_tier = ws_resolution.customer_tier
 434    is_eu = (
 435        ws_resolution.dataplane_name == "EU" if ws_resolution.dataplane_name else None
 436    )
 437    tier_warning = _build_tier_warning(customer_tier)
 438
 439    tier_ok, tier_error = _validate_tier_filter(customer_tier, customer_tier_filter)
 440    if not tier_ok:
 441        return VersionOverrideOperationResult(
 442            success=False,
 443            message=tier_error or "Tier filter mismatch",
 444            connector_id=actor_id,
 445            connector_type=actor_type,
 446            customer_tier=customer_tier,
 447            is_eu=is_eu,
 448            tier_warning=tier_warning,
 449        )
 450
 451    # Build enhanced override reason with audit fields (only for 'set' operations)
 452    enhanced_override_reason = override_reason
 453    if not unset and override_reason:
 454        audit_parts = [override_reason]
 455        audit_parts.append(f"Issue: {issue_url}")
 456        audit_parts.append(f"Approval: {approval_comment_url}")
 457        if ai_agent_session_url:
 458            audit_parts.append(f"AI Session: {ai_agent_session_url}")
 459        enhanced_override_reason = " | ".join(audit_parts)
 460
 461    # Resolve auth and get current version info
 462    resolved_config_api_root = config_api_root or constants.CLOUD_CONFIG_API_ROOT
 463    try:
 464        auth = _resolve_cloud_auth(ctx)
 465
 466        # Get current version info before the operation
 467        current_version_data = api_client.get_connector_version(
 468            connector_id=actor_id,
 469            connector_type=actor_type,
 470            config_api_root=resolved_config_api_root,
 471            client_id=auth.client_id,
 472            client_secret=auth.client_secret,
 473            bearer_token=auth.bearer_token,
 474        )
 475        current_version = current_version_data["dockerImageTag"]
 476        was_pinned_before = current_version_data["isVersionOverrideApplied"]
 477
 478    except CloudAuthError as e:
 479        return VersionOverrideOperationResult(
 480            success=False,
 481            message=f"Failed to resolve credentials or get connector: {e}",
 482            connector_id=actor_id,
 483            connector_type=actor_type,
 484        )
 485
 486    # Existing-pin guard: check actor, workspace, and org scopes (skip when unsetting)
 487    if not unset and version:
 488        try:
 489            access_token = _get_access_token(
 490                auth.client_id, auth.client_secret, auth.bearer_token
 491            )
 492            # Get actor_definition_id from the connector info
 493            get_endpoint = f"{resolved_config_api_root}/{actor_type}s/get"
 494            get_resp = _requests.post(
 495                get_endpoint,
 496                json={f"{actor_type}Id": actor_id},
 497                headers={
 498                    "Authorization": f"Bearer {access_token}",
 499                    "User-Agent": USER_AGENT,
 500                    "Content-Type": "application/json",
 501                },
 502                timeout=30,
 503            )
 504            get_resp.raise_for_status()
 505            actor_definition_id = get_resp.json().get(f"{actor_type}DefinitionId")
 506            if not actor_definition_id:
 507                return VersionOverrideOperationResult(
 508                    success=False,
 509                    message=f"Could not find {actor_type}DefinitionId in connector info for actor {actor_id}",
 510                    connector_id=actor_id,
 511                    connector_type=actor_type,
 512                    previous_version=current_version,
 513                    was_pinned_before=was_pinned_before,
 514                )
 515
 516            guard = check_existing_pins(
 517                scopes=[
 518                    (_ScopeType.ACTOR, actor_id, "actor"),
 519                    (_ScopeType.WORKSPACE, resolved_workspace_id, "workspace"),
 520                    (
 521                        _ScopeType.ORGANIZATION,
 522                        ws_resolution.organization_id,
 523                        "organization",
 524                    ),
 525                ],
 526                actor_definition_id=actor_definition_id,
 527                config_api_root=resolved_config_api_root,
 528                access_token=access_token,
 529                target_version=version,
 530                force=force,
 531            )
 532            if guard.blocked:
 533                assert guard.error_msg is not None  # narrowing for type checker
 534                return VersionOverrideOperationResult(
 535                    success=False,
 536                    message=guard.error_msg,
 537                    connector_id=actor_id,
 538                    connector_type=actor_type,
 539                    previous_version=current_version,
 540                    was_pinned_before=was_pinned_before,
 541                )
 542        except (
 543            PyAirbyteInputError,
 544            CloudAuthError,
 545            _requests.exceptions.HTTPError,
 546        ) as e:
 547            return VersionOverrideOperationResult(
 548                success=False,
 549                message=f"Pin guard check failed: {e}",
 550                connector_id=actor_id,
 551                connector_type=actor_type,
 552                previous_version=current_version,
 553                was_pinned_before=was_pinned_before,
 554            )
 555
 556    # Call vendored API client's set_connector_version_override method
 557    try:
 558        result = api_client.set_connector_version_override(
 559            connector_id=actor_id,
 560            connector_type=actor_type,
 561            config_api_root=resolved_config_api_root,
 562            client_id=auth.client_id,
 563            client_secret=auth.client_secret,
 564            workspace_id=resolved_workspace_id,
 565            version=version,
 566            unset=unset,
 567            override_reason=enhanced_override_reason,
 568            override_reason_reference_url=override_reason_reference_url,
 569            user_email=admin_user_email,
 570            bearer_token=auth.bearer_token,
 571        )
 572
 573        # Get updated version info after the operation
 574        updated_version_data = api_client.get_connector_version(
 575            connector_id=actor_id,
 576            connector_type=actor_type,
 577            config_api_root=resolved_config_api_root,
 578            client_id=auth.client_id,
 579            client_secret=auth.client_secret,
 580            bearer_token=auth.bearer_token,
 581        )
 582        new_version = updated_version_data["dockerImageTag"] if not unset else None
 583        is_pinned_after = updated_version_data["isVersionOverrideApplied"]
 584
 585        if unset:
 586            if result:
 587                message = "Successfully cleared version override. Connector will now use default version."
 588            else:
 589                message = "No version override was active (nothing to clear)"
 590        else:
 591            message = f"Successfully pinned connector to version {version}"
 592
 593        if tier_warning:
 594            message = f"{tier_warning} {message}"
 595
 596        return VersionOverrideOperationResult(
 597            success=True,
 598            message=message,
 599            connector_id=actor_id,
 600            connector_type=actor_type,
 601            previous_version=current_version,
 602            new_version=new_version,
 603            was_pinned_before=was_pinned_before,
 604            is_pinned_after=is_pinned_after,
 605            customer_tier=customer_tier,
 606            is_eu=is_eu,
 607            tier_warning=tier_warning,
 608        )
 609
 610    except PyAirbyteInputError as e:
 611        # PyAirbyte raises this for validation errors and permission denials
 612        return VersionOverrideOperationResult(
 613            success=False,
 614            message=str(e),
 615            connector_id=actor_id,
 616            connector_type=actor_type,
 617            previous_version=current_version,
 618            was_pinned_before=was_pinned_before,
 619        )
 620
 621
 622@mcp_tool(
 623    destructive=True,
 624    idempotent=False,
 625    open_world=True,
 626)
 627def set_workspace_connector_version_override(
 628    workspace_id: Annotated[
 629        str | WorkspaceAliasEnum,
 630        Field(
 631            description="The Airbyte Cloud workspace ID (UUID) or alias. "
 632            "Accepts '@devin-ai-sandbox' as an alias for the Devin AI sandbox workspace."
 633        ),
 634    ],
 635    connector_name: Annotated[
 636        str,
 637        Field(
 638            description="The connector name (e.g., 'source-github', 'destination-bigquery')."
 639        ),
 640    ],
 641    connector_type: Annotated[
 642        Literal["source", "destination"],
 643        "The type of connector (source or destination)",
 644    ],
 645    approval_comment_url: Annotated[
 646        str | None,
 647        Field(
 648            description="URL to the Slack approval record. Obtain this by calling the "
 649            "`escalate_to_human` tool with `approval_requested=True`; the backend delivers "
 650            "the approval record URL when a human clicks Approve. "
 651            "Format: https://<workspace>.slack.com/archives/... "
 652            "The admin email is automatically resolved from the approver's identity "
 653            "via the team roster.",
 654            default=None,
 655        ),
 656    ],
 657    version: Annotated[
 658        str | None,
 659        Field(
 660            description="The semver version string to pin to (e.g., '0.1.0'). "
 661            "Must be None if unset is True.",
 662            default=None,
 663        ),
 664    ],
 665    unset: Annotated[
 666        bool,
 667        Field(
 668            description="If True, removes any existing version override. "
 669            "Cannot be True if version is provided.",
 670            default=False,
 671        ),
 672    ],
 673    override_reason: Annotated[
 674        str | None,
 675        Field(
 676            description="Required when setting a version. "
 677            "Explanation for the override (min 10 characters).",
 678            default=None,
 679        ),
 680    ],
 681    override_reason_reference_url: Annotated[
 682        str | None,
 683        Field(
 684            description="Optional URL with more context (e.g., issue link).",
 685            default=None,
 686        ),
 687    ],
 688    issue_url: Annotated[
 689        str | None,
 690        Field(
 691            description="URL to the GitHub issue providing context for this operation. "
 692            "Must be a valid GitHub URL (https://github.com/...). Required for authorization.",
 693            default=None,
 694        ),
 695    ],
 696    ai_agent_session_url: Annotated[
 697        str | None,
 698        Field(
 699            description="URL to the AI agent session driving this operation, if applicable. "
 700            "Provides additional auditability for AI-driven operations.",
 701            default=None,
 702        ),
 703    ] = None,
 704    force: Annotated[
 705        bool,
 706        Field(
 707            description="If `True`, allow overwriting an existing version pin. "
 708            "Existing pins may have been set by rollouts, breaking-change migrations, "
 709            "or other operators. Defaults to `False`. NOTE: `force=True` only "
 710            "bypasses the existing-pin check — major-version crossings are always "
 711            "blocked and cannot be overridden.",
 712            default=False,
 713        ),
 714    ] = False,
 715    config_api_root: Annotated[
 716        str | None,
 717        Field(
 718            description="Optional API root URL override for the Config API. "
 719            "Defaults to Airbyte Cloud (https://cloud.airbyte.com/api/v1). "
 720            "Use this to target local or self-hosted deployments.",
 721            default=None,
 722        ),
 723    ] = None,
 724    customer_tier_filter: Annotated[
 725        TierFilter,
 726        Field(
 727            description=(
 728                "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. "
 729                "The operation will be rejected if the actual customer tier does not match. "
 730                "Use 'ALL' to proceed regardless of tier (a warning is shown for sensitive tiers)."
 731            ),
 732        ),
 733    ] = "TIER_2",
 734    *,
 735    ctx: Context,
 736) -> WorkspaceVersionOverrideResult:
 737    """Set or clear a workspace-level version override for a connector type.
 738
 739    This pins ALL instances of a connector type within a workspace to a specific version.
 740    For example, pinning 'source-github' at workspace level means all GitHub sources
 741    in that workspace will use the pinned version.
 742
 743    **Admin-only operation** - Requires:
 744    - AIRBYTE_INTERNAL_ADMIN_FLAG=airbyte.io environment variable
 745    - issue_url parameter (GitHub issue URL for context)
 746    - approval_comment_url (Slack approval record URL from `escalate_to_human`)
 747
 748    You must specify EXACTLY ONE of `version` OR `unset=True`, but not both.
 749    When setting a version, `override_reason` is required.
 750
 751    The `customer_tier_filter` parameter gates the operation: the call fails if
 752    the actual tier of the workspace's organization does not match.  Use `ALL`
 753    to bypass the check (a warning is still emitted for sensitive tiers).
 754    """
 755    # Resolve workspace ID alias (workspace_id is required, so resolved value is never None)
 756    resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id)
 757    assert resolved_workspace_id is not None  # Type narrowing: workspace_id is required
 758
 759    # Validate admin access (check env var flag)
 760    try:
 761        require_internal_admin_flag_only()
 762    except CloudAuthError as e:
 763        return WorkspaceVersionOverrideResult(
 764            success=False,
 765            message=f"Admin authentication failed: {e}",
 766            workspace_id=resolved_workspace_id,
 767            connector_name=connector_name,
 768            connector_type=connector_type,
 769        )
 770
 771    # Validate authorization parameters
 772    validation_errors: list[str] = []
 773
 774    if not issue_url:
 775        validation_errors.append(
 776            "issue_url is required for authorization (GitHub issue URL)"
 777        )
 778    elif not issue_url.startswith("https://github.com/"):
 779        validation_errors.append(
 780            f"issue_url must be a valid GitHub URL (https://github.com/...), got: {issue_url}"
 781        )
 782
 783    if not approval_comment_url:
 784        validation_errors.append(
 785            "'approval_comment_url' is required. Use `escalate_to_human` with "
 786            "`approval_requested=True` to obtain a Slack approval record URL."
 787        )
 788
 789    if validation_errors:
 790        return WorkspaceVersionOverrideResult(
 791            success=False,
 792            message="Authorization validation failed: " + "; ".join(validation_errors),
 793            workspace_id=resolved_workspace_id,
 794            connector_name=connector_name,
 795            connector_type=connector_type,
 796        )
 797
 798    # Derive admin email from the approval URL (domain-based dispatch)
 799    try:
 800        admin_user_email = resolve_admin_email_from_approval(
 801            approval_comment_url=approval_comment_url,
 802        )
 803    except ApprovalResolutionError as e:
 804        return WorkspaceVersionOverrideResult(
 805            success=False,
 806            message=str(e),
 807            workspace_id=resolved_workspace_id,
 808            connector_name=connector_name,
 809            connector_type=connector_type,
 810        )
 811
 812    # Tier guardrail: resolve workspace tier BEFORE the destructive operation
 813    ws_resolution = resolve_workspace(resolved_workspace_id)
 814    if not ws_resolution.organization_id:
 815        return WorkspaceVersionOverrideResult(
 816            success=False,
 817            message=f"Could not resolve organization for workspace {resolved_workspace_id}",
 818            workspace_id=resolved_workspace_id,
 819            connector_name=connector_name,
 820            connector_type=connector_type,
 821        )
 822
 823    customer_tier = ws_resolution.customer_tier
 824    is_eu = (
 825        ws_resolution.dataplane_name == "EU" if ws_resolution.dataplane_name else None
 826    )
 827    tier_warning = _build_tier_warning(customer_tier)
 828
 829    tier_ok, tier_error = _validate_tier_filter(customer_tier, customer_tier_filter)
 830    if not tier_ok:
 831        return WorkspaceVersionOverrideResult(
 832            success=False,
 833            message=tier_error or "Tier filter mismatch",
 834            workspace_id=resolved_workspace_id,
 835            connector_name=connector_name,
 836            connector_type=connector_type,
 837            customer_tier=customer_tier,
 838            is_eu=is_eu,
 839            tier_warning=tier_warning,
 840        )
 841
 842    # Build enhanced override reason with audit fields (only for 'set' operations)
 843    enhanced_override_reason = override_reason
 844    if not unset and override_reason:
 845        audit_parts = [override_reason]
 846        audit_parts.append(f"Issue: {issue_url}")
 847        audit_parts.append(f"Approval: {approval_comment_url}")
 848        if ai_agent_session_url:
 849            audit_parts.append(f"AI Session: {ai_agent_session_url}")
 850        enhanced_override_reason = " | ".join(audit_parts)
 851
 852    # Existing-pin guard: check workspace and org scopes (skip when unsetting)
 853    if not unset and version:
 854        try:
 855            auth = _resolve_cloud_auth(ctx)
 856            resolved_config_api_root = (
 857                config_api_root or constants.CLOUD_CONFIG_API_ROOT
 858            )
 859            access_token = _get_access_token(
 860                auth.client_id, auth.client_secret, auth.bearer_token
 861            )
 862            from airbyte_ops_mcp.mcp.prod_db_queries import (
 863                _resolve_canonical_name_to_definition_id,
 864            )
 865
 866            actor_definition_id = _resolve_canonical_name_to_definition_id(
 867                connector_name
 868            )
 869
 870            guard = check_existing_pins(
 871                scopes=[
 872                    (_ScopeType.WORKSPACE, resolved_workspace_id, "workspace"),
 873                    (
 874                        _ScopeType.ORGANIZATION,
 875                        ws_resolution.organization_id,
 876                        "organization",
 877                    ),
 878                ],
 879                actor_definition_id=actor_definition_id,
 880                config_api_root=resolved_config_api_root,
 881                access_token=access_token,
 882                target_version=version,
 883                force=force,
 884            )
 885            if guard.blocked:
 886                assert guard.error_msg is not None  # narrowing for type checker
 887                return WorkspaceVersionOverrideResult(
 888                    success=False,
 889                    message=guard.error_msg,
 890                    workspace_id=resolved_workspace_id,
 891                    connector_name=connector_name,
 892                    connector_type=connector_type,
 893                )
 894        except (PyAirbyteInputError, CloudAuthError) as e:
 895            return WorkspaceVersionOverrideResult(
 896                success=False,
 897                message=f"Pin guard check failed: {e}",
 898                workspace_id=resolved_workspace_id,
 899                connector_name=connector_name,
 900                connector_type=connector_type,
 901            )
 902
 903    # Resolve auth and call API client
 904    try:
 905        auth = _resolve_cloud_auth(ctx)
 906
 907        result = api_client.set_workspace_connector_version_override(
 908            workspace_id=resolved_workspace_id,
 909            connector_name=connector_name,
 910            connector_type=connector_type,
 911            config_api_root=config_api_root or constants.CLOUD_CONFIG_API_ROOT,
 912            client_id=auth.client_id,
 913            client_secret=auth.client_secret,
 914            bearer_token=auth.bearer_token,
 915            version=version,
 916            unset=unset,
 917            override_reason=enhanced_override_reason,
 918            override_reason_reference_url=override_reason_reference_url,
 919            user_email=admin_user_email,
 920        )
 921
 922        if unset:
 923            if result:
 924                message = f"Successfully cleared workspace-level version override for {connector_name}."
 925            else:
 926                message = f"No workspace-level version override was active for {connector_name} (nothing to clear)"
 927        else:
 928            message = f"Successfully pinned {connector_name} to version {version} at workspace level."
 929
 930        if tier_warning:
 931            message = f"{tier_warning} {message}"
 932
 933        return WorkspaceVersionOverrideResult(
 934            success=True,
 935            message=message,
 936            workspace_id=resolved_workspace_id,
 937            connector_name=connector_name,
 938            connector_type=connector_type,
 939            version=version if not unset else None,
 940            customer_tier=customer_tier,
 941            is_eu=is_eu,
 942            tier_warning=tier_warning,
 943        )
 944
 945    except PyAirbyteInputError as e:
 946        return WorkspaceVersionOverrideResult(
 947            success=False,
 948            message=str(e),
 949            workspace_id=resolved_workspace_id,
 950            connector_name=connector_name,
 951            connector_type=connector_type,
 952        )
 953    except CloudAuthError as e:
 954        return WorkspaceVersionOverrideResult(
 955            success=False,
 956            message=f"Authentication failed: {e}",
 957            workspace_id=resolved_workspace_id,
 958            connector_name=connector_name,
 959            connector_type=connector_type,
 960        )
 961
 962
 963@mcp_tool(
 964    destructive=True,
 965    idempotent=False,
 966    open_world=True,
 967)
 968def set_organization_connector_version_override(
 969    organization_id: Annotated[
 970        str,
 971        Field(description="The Airbyte Cloud organization ID."),
 972    ],
 973    connector_name: Annotated[
 974        str,
 975        Field(
 976            description="The connector name (e.g., 'source-github', 'destination-bigquery')."
 977        ),
 978    ],
 979    connector_type: Annotated[
 980        Literal["source", "destination"],
 981        "The type of connector (source or destination)",
 982    ],
 983    approval_comment_url: Annotated[
 984        str | None,
 985        Field(
 986            description="URL to the Slack approval record. Obtain this by calling the "
 987            "`escalate_to_human` tool with `approval_requested=True`; the backend delivers "
 988            "the approval record URL when a human clicks Approve. "
 989            "Format: https://<workspace>.slack.com/archives/... "
 990            "The admin email is automatically resolved from the approver's identity "
 991            "via the team roster.",
 992            default=None,
 993        ),
 994    ],
 995    version: Annotated[
 996        str | None,
 997        Field(
 998            description="The semver version string to pin to (e.g., '0.1.0'). "
 999            "Must be None if unset is True.",
1000            default=None,
1001        ),
1002    ],
1003    unset: Annotated[
1004        bool,
1005        Field(
1006            description="If True, removes any existing version override. "
1007            "Cannot be True if version is provided.",
1008            default=False,
1009        ),
1010    ],
1011    override_reason: Annotated[
1012        str | None,
1013        Field(
1014            description="Required when setting a version. "
1015            "Explanation for the override (min 10 characters).",
1016            default=None,
1017        ),
1018    ],
1019    override_reason_reference_url: Annotated[
1020        str | None,
1021        Field(
1022            description="Optional URL with more context (e.g., issue link).",
1023            default=None,
1024        ),
1025    ],
1026    issue_url: Annotated[
1027        str | None,
1028        Field(
1029            description="URL to the GitHub issue providing context for this operation. "
1030            "Must be a valid GitHub URL (https://github.com/...). Required for authorization.",
1031            default=None,
1032        ),
1033    ],
1034    ai_agent_session_url: Annotated[
1035        str | None,
1036        Field(
1037            description="URL to the AI agent session driving this operation, if applicable. "
1038            "Provides additional auditability for AI-driven operations.",
1039            default=None,
1040        ),
1041    ] = None,
1042    force: Annotated[
1043        bool,
1044        Field(
1045            description="If `True`, allow overwriting an existing version pin. "
1046            "Existing pins may have been set by rollouts, breaking-change migrations, "
1047            "or other operators. Defaults to `False`. NOTE: `force=True` only "
1048            "bypasses the existing-pin check — major-version crossings are always "
1049            "blocked and cannot be overridden.",
1050            default=False,
1051        ),
1052    ] = False,
1053    config_api_root: Annotated[
1054        str | None,
1055        Field(
1056            description="Optional API root URL override for the Config API. "
1057            "Defaults to Airbyte Cloud (https://cloud.airbyte.com/api/v1). "
1058            "Use this to target local or self-hosted deployments.",
1059            default=None,
1060        ),
1061    ] = None,
1062    customer_tier_filter: Annotated[
1063        TierFilter,
1064        Field(
1065            description=(
1066                "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. "
1067                "The operation will be rejected if the actual customer tier does not match. "
1068                "Use 'ALL' to proceed regardless of tier (a warning is shown for sensitive tiers)."
1069            ),
1070        ),
1071    ] = "TIER_2",
1072    *,
1073    ctx: Context,
1074) -> OrganizationVersionOverrideResult:
1075    """Set or clear an organization-level version override for a connector type.
1076
1077    This pins ALL instances of a connector type across an entire organization to a
1078    specific version. For example, pinning 'source-github' at organization level means
1079    all GitHub sources in all workspaces within that organization will use the pinned version.
1080
1081    **Admin-only operation** - Requires:
1082    - AIRBYTE_INTERNAL_ADMIN_FLAG=airbyte.io environment variable
1083    - issue_url parameter (GitHub issue URL for context)
1084    - approval_comment_url (Slack approval record URL from `escalate_to_human`)
1085
1086    You must specify EXACTLY ONE of `version` OR `unset=True`, but not both.
1087    When setting a version, `override_reason` is required.
1088
1089    The `customer_tier_filter` parameter gates the operation: the call fails if
1090    the actual tier of the organization does not match.  Use `ALL` to bypass
1091    the check (a warning is still emitted for sensitive tiers).
1092    """
1093    # Validate admin access (check env var flag)
1094    try:
1095        require_internal_admin_flag_only()
1096    except CloudAuthError as e:
1097        return OrganizationVersionOverrideResult(
1098            success=False,
1099            message=f"Admin authentication failed: {e}",
1100            organization_id=organization_id,
1101            connector_name=connector_name,
1102            connector_type=connector_type,
1103        )
1104
1105    # Validate authorization parameters
1106    validation_errors: list[str] = []
1107
1108    if not issue_url:
1109        validation_errors.append(
1110            "issue_url is required for authorization (GitHub issue URL)"
1111        )
1112    elif not issue_url.startswith("https://github.com/"):
1113        validation_errors.append(
1114            f"issue_url must be a valid GitHub URL (https://github.com/...), got: {issue_url}"
1115        )
1116
1117    if not approval_comment_url:
1118        validation_errors.append(
1119            "'approval_comment_url' is required. Use `escalate_to_human` with "
1120            "`approval_requested=True` to obtain a Slack approval record URL."
1121        )
1122
1123    if validation_errors:
1124        return OrganizationVersionOverrideResult(
1125            success=False,
1126            message="Authorization validation failed: " + "; ".join(validation_errors),
1127            organization_id=organization_id,
1128            connector_name=connector_name,
1129            connector_type=connector_type,
1130        )
1131
1132    # Derive admin email from the approval URL (domain-based dispatch)
1133    try:
1134        admin_user_email = resolve_admin_email_from_approval(
1135            approval_comment_url=approval_comment_url,
1136        )
1137    except ApprovalResolutionError as e:
1138        return OrganizationVersionOverrideResult(
1139            success=False,
1140            message=str(e),
1141            organization_id=organization_id,
1142            connector_name=connector_name,
1143            connector_type=connector_type,
1144        )
1145
1146    # Tier guardrail: resolve org tier BEFORE the destructive operation
1147    # (placed after auth checks to avoid exposing tier info to unauthenticated callers)
1148    tier_result = get_org_tier(organization_id)
1149    customer_tier = tier_result.customer_tier
1150    tier_warning = _build_tier_warning(customer_tier)
1151
1152    tier_ok, tier_error = _validate_tier_filter(customer_tier, customer_tier_filter)
1153    if not tier_ok:
1154        return OrganizationVersionOverrideResult(
1155            success=False,
1156            message=tier_error or "Tier filter mismatch",
1157            organization_id=organization_id,
1158            connector_name=connector_name,
1159            connector_type=connector_type,
1160            customer_tier=customer_tier,
1161            tier_warning=tier_warning,
1162        )
1163
1164    # Build enhanced override reason with audit fields (only for 'set' operations)
1165    enhanced_override_reason = override_reason
1166    if not unset and override_reason:
1167        audit_parts = [override_reason]
1168        audit_parts.append(f"Issue: {issue_url}")
1169        audit_parts.append(f"Approval: {approval_comment_url}")
1170        if ai_agent_session_url:
1171            audit_parts.append(f"AI Session: {ai_agent_session_url}")
1172        enhanced_override_reason = " | ".join(audit_parts)
1173
1174    # Existing-pin guard: check org scope (skip when unsetting)
1175    if not unset and version:
1176        try:
1177            auth = _resolve_cloud_auth(ctx)
1178            resolved_config_api_root = (
1179                config_api_root or constants.CLOUD_CONFIG_API_ROOT
1180            )
1181            access_token = _get_access_token(
1182                auth.client_id, auth.client_secret, auth.bearer_token
1183            )
1184            from airbyte_ops_mcp.mcp.prod_db_queries import (
1185                _resolve_canonical_name_to_definition_id,
1186            )
1187
1188            actor_definition_id = _resolve_canonical_name_to_definition_id(
1189                connector_name
1190            )
1191
1192            guard = check_existing_pins(
1193                scopes=[
1194                    (_ScopeType.ORGANIZATION, organization_id, "organization"),
1195                ],
1196                actor_definition_id=actor_definition_id,
1197                config_api_root=resolved_config_api_root,
1198                access_token=access_token,
1199                target_version=version,
1200                force=force,
1201            )
1202            if guard.blocked:
1203                assert guard.error_msg is not None  # narrowing for type checker
1204                return OrganizationVersionOverrideResult(
1205                    success=False,
1206                    message=guard.error_msg,
1207                    organization_id=organization_id,
1208                    connector_name=connector_name,
1209                    connector_type=connector_type,
1210                )
1211        except (PyAirbyteInputError, CloudAuthError) as e:
1212            return OrganizationVersionOverrideResult(
1213                success=False,
1214                message=f"Pin guard check failed: {e}",
1215                organization_id=organization_id,
1216                connector_name=connector_name,
1217                connector_type=connector_type,
1218            )
1219
1220    # Resolve auth and call API client
1221    try:
1222        auth = _resolve_cloud_auth(ctx)
1223
1224        result = api_client.set_organization_connector_version_override(
1225            organization_id=organization_id,
1226            connector_name=connector_name,
1227            connector_type=connector_type,
1228            config_api_root=config_api_root or constants.CLOUD_CONFIG_API_ROOT,
1229            client_id=auth.client_id,
1230            client_secret=auth.client_secret,
1231            bearer_token=auth.bearer_token,
1232            version=version,
1233            unset=unset,
1234            override_reason=enhanced_override_reason,
1235            override_reason_reference_url=override_reason_reference_url,
1236            user_email=admin_user_email,
1237        )
1238
1239        if unset:
1240            if result:
1241                message = f"Successfully cleared organization-level version override for {connector_name}."
1242            else:
1243                message = f"No organization-level version override was active for {connector_name} (nothing to clear)"
1244        else:
1245            message = f"Successfully pinned {connector_name} to version {version} at organization level."
1246
1247        if tier_warning:
1248            message = f"{tier_warning} {message}"
1249
1250        return OrganizationVersionOverrideResult(
1251            success=True,
1252            message=message,
1253            organization_id=organization_id,
1254            connector_name=connector_name,
1255            connector_type=connector_type,
1256            version=version if not unset else None,
1257            customer_tier=customer_tier,
1258            tier_warning=tier_warning,
1259        )
1260
1261    except PyAirbyteInputError as e:
1262        return OrganizationVersionOverrideResult(
1263            success=False,
1264            message=str(e),
1265            organization_id=organization_id,
1266            connector_name=connector_name,
1267            connector_type=connector_type,
1268        )
1269    except CloudAuthError as e:
1270        return OrganizationVersionOverrideResult(
1271            success=False,
1272            message=f"Authentication failed: {e}",
1273            organization_id=organization_id,
1274            connector_name=connector_name,
1275            connector_type=connector_type,
1276        )
1277
1278
1279def register_cloud_connector_version_tools(app: FastMCP) -> None:
1280    """Register cloud connector version management tools with the FastMCP app.
1281
1282    Args:
1283        app: FastMCP application instance
1284    """
1285    register_mcp_tools(app, mcp_module=__name__)