airbyte_ops_mcp.mcp.cloud_connector_versions
MCP tools for cloud connector version management.
This module provides MCP tools for viewing and managing connector version overrides (pins) in Airbyte Cloud. These tools enable admins to pin connectors to specific versions for troubleshooting or stability purposes.
Uses direct API client calls with either bearer token or client credentials auth.
MCP reference
MCP primitives registered by the cloud_connector_versions module of the airbyte-internal-ops server: 4 tool(s), 0 prompt(s), 0 resource(s).
Tools (4)
get_cloud_connector_version
Hints: read-only · idempotent · open-world
Get the current version information for a deployed connector.
Returns version details including the current version string and whether an override (pin) is applied.
Authentication credentials are resolved in priority order:
- Bearer token (Authorization header or AIRBYTE_CLOUD_BEARER_TOKEN env var)
- HTTP headers: X-Airbyte-Cloud-Client-Id, X-Airbyte-Cloud-Client-Secret
- Environment variables: AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
workspace_id |
string | enum("266ebdfe-0d7b-4540-9817-de7e4505ba61") |
yes | — | The Airbyte Cloud workspace ID (UUID) or alias. Accepts '@devin-ai-sandbox' as an alias for the Devin AI sandbox workspace. |
actor_id |
string |
yes | — | The ID of the deployed connector (source or destination) |
actor_type |
enum("source", "destination") |
yes | — | The type of connector (source or destination) |
config_api_root |
string | null |
no | null |
Optional API root URL override for the Config API. Defaults to Airbyte Cloud (https://cloud.airbyte.com/api/v1). Use this to target local or self-hosted deployments. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"description": "Workspace ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@devin-ai-sandbox\") and its value\nis the actual workspace UUID. Use `WorkspaceAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
"enum": [
"266ebdfe-0d7b-4540-9817-de7e4505ba61"
],
"type": "string"
}
],
"description": "The Airbyte Cloud workspace ID (UUID) or alias. Accepts '@devin-ai-sandbox' as an alias for the Devin AI sandbox workspace."
},
"actor_id": {
"description": "The ID of the deployed connector (source or destination)",
"type": "string"
},
"actor_type": {
"description": "The type of connector (source or destination)",
"enum": [
"source",
"destination"
],
"type": "string"
},
"config_api_root": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional API root URL override for the Config API. Defaults to Airbyte Cloud (https://cloud.airbyte.com/api/v1). Use this to target local or self-hosted deployments."
}
},
"required": [
"workspace_id",
"actor_id",
"actor_type"
],
"type": "object"
}
Show output JSON schema
{
"description": "Information about a cloud connector's version.\n\nThis model represents the current version state of a deployed connector,\nincluding whether a version override (pin) is active.",
"properties": {
"connector_id": {
"description": "The ID of the deployed connector",
"type": "string"
},
"connector_type": {
"description": "The type of connector (source or destination)",
"enum": [
"source",
"destination"
],
"type": "string"
},
"version": {
"description": "The current version string (e.g., '0.1.0')",
"type": "string"
},
"is_version_pinned": {
"description": "Whether a version override is active for this connector",
"type": "boolean"
}
},
"required": [
"connector_id",
"connector_type",
"version",
"is_version_pinned"
],
"type": "object"
}
set_cloud_connector_version_override
Hints: destructive · open-world
Set or clear a version override for a deployed connector.
Admin-only operation - Requires:
- AIRBYTE_INTERNAL_ADMIN_FLAG=airbyte.io environment variable
- issue_url parameter (GitHub issue URL for context)
- approval_comment_url (Slack approval record URL from
escalate_to_human)
The admin user email is automatically derived from the Slack approval record, resolving the approver's @airbyte.io email via the team roster.
You must specify EXACTLY ONE of version OR unset=True, but not both.
When setting a version, override_reason is required.
The customer_tier_filter parameter gates the operation: the call fails if
the actual tier of the workspace's organization does not match. Use ALL
to bypass the check (a warning is still emitted for sensitive tiers).
Business rules enforced:
- Dev versions (-dev): Only creator can unpin their own dev version override
- Production versions: Require strong justification mentioning customer/support/investigation
- Release candidates (-rc): Any admin can pin/unpin RC versions
Authentication credentials are resolved in priority order:
- Bearer token (Authorization header or AIRBYTE_CLOUD_BEARER_TOKEN env var)
- HTTP headers: X-Airbyte-Cloud-Client-Id, X-Airbyte-Cloud-Client-Secret
- Environment variables: AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
workspace_id |
string | enum("266ebdfe-0d7b-4540-9817-de7e4505ba61") |
yes | — | The Airbyte Cloud workspace ID (UUID) or alias. Accepts '@devin-ai-sandbox' as an alias for the Devin AI sandbox workspace. |
actor_id |
string |
yes | — | The ID of the deployed connector (source or destination) |
actor_type |
enum("source", "destination") |
yes | — | The type of connector (source or destination) |
approval_comment_url |
string | null |
no | null |
URL to the Slack approval record. Obtain this by calling the escalate_to_human tool with approval_requested=True; the backend delivers the approval record URL when a human clicks Approve. Format: https:// |
version |
string | null |
no | null |
The semver version string to pin to (e.g., '0.1.0'). Must be None if unset is True. |
unset |
boolean |
no | false |
If True, removes any existing version override. Cannot be True if version is provided. |
override_reason |
string | null |
no | null |
Required when setting a version. Explanation for the override (min 10 characters). |
override_reason_reference_url |
string | null |
no | null |
Optional URL with more context (e.g., issue link). |
issue_url |
string | null |
no | null |
URL to the GitHub issue providing context for this operation. Must be a valid GitHub URL (https://github.com/...). Required for authorization. |
ai_agent_session_url |
string | null |
no | null |
URL to the AI agent session driving this operation, if applicable. Provides additional auditability for AI-driven operations. |
force |
boolean |
no | false |
If True, allow overwriting an existing version pin. Existing pins may have been set by rollouts, breaking-change migrations, or other operators. Defaults to False. NOTE: force=True only bypasses the existing-pin check — major-version crossings are always blocked and cannot be overridden. |
config_api_root |
string | null |
no | null |
Optional API root URL override for the Config API. Defaults to Airbyte Cloud (https://cloud.airbyte.com/api/v1). Use this to target local or self-hosted deployments. |
customer_tier_filter |
enum("TIER_0", "TIER_1", "TIER_2", "ALL") |
no | "TIER_2" |
Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. The operation will be rejected if the actual customer tier does not match. Use 'ALL' to proceed regardless of tier (a warning is shown for sensitive tiers). |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"description": "Workspace ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@devin-ai-sandbox\") and its value\nis the actual workspace UUID. Use `WorkspaceAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
"enum": [
"266ebdfe-0d7b-4540-9817-de7e4505ba61"
],
"type": "string"
}
],
"description": "The Airbyte Cloud workspace ID (UUID) or alias. Accepts '@devin-ai-sandbox' as an alias for the Devin AI sandbox workspace."
},
"actor_id": {
"description": "The ID of the deployed connector (source or destination)",
"type": "string"
},
"actor_type": {
"description": "The type of connector (source or destination)",
"enum": [
"source",
"destination"
],
"type": "string"
},
"approval_comment_url": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "URL to the Slack approval record. Obtain this by calling the `escalate_to_human` tool with `approval_requested=True`; the backend delivers the approval record URL when a human clicks Approve. Format: https://<workspace>.slack.com/archives/... The admin email is automatically resolved from the approver's identity via the team roster."
},
"version": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The semver version string to pin to (e.g., '0.1.0'). Must be None if unset is True."
},
"unset": {
"default": false,
"description": "If True, removes any existing version override. Cannot be True if version is provided.",
"type": "boolean"
},
"override_reason": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Required when setting a version. Explanation for the override (min 10 characters)."
},
"override_reason_reference_url": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional URL with more context (e.g., issue link)."
},
"issue_url": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "URL to the GitHub issue providing context for this operation. Must be a valid GitHub URL (https://github.com/...). Required for authorization."
},
"ai_agent_session_url": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "URL to the AI agent session driving this operation, if applicable. Provides additional auditability for AI-driven operations."
},
"force": {
"default": false,
"description": "If `True`, allow overwriting an existing version pin. Existing pins may have been set by rollouts, breaking-change migrations, or other operators. Defaults to `False`. NOTE: `force=True` only bypasses the existing-pin check \u2014 major-version crossings are always blocked and cannot be overridden.",
"type": "boolean"
},
"config_api_root": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional API root URL override for the Config API. Defaults to Airbyte Cloud (https://cloud.airbyte.com/api/v1). Use this to target local or self-hosted deployments."
},
"customer_tier_filter": {
"default": "TIER_2",
"description": "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. The operation will be rejected if the actual customer tier does not match. Use 'ALL' to proceed regardless of tier (a warning is shown for sensitive tiers).",
"enum": [
"TIER_0",
"TIER_1",
"TIER_2",
"ALL"
],
"type": "string"
}
},
"required": [
"workspace_id",
"actor_id",
"actor_type"
],
"type": "object"
}
Show output JSON schema
{
"description": "Result of a version override operation (set or clear).\n\nThis model provides detailed information about the outcome of a version\npinning or unpinning operation.",
"properties": {
"success": {
"description": "Whether the operation succeeded",
"type": "boolean"
},
"message": {
"description": "Human-readable message describing the result",
"type": "string"
},
"connector_id": {
"description": "The ID of the connector that was modified",
"type": "string"
},
"connector_type": {
"description": "The type of connector (source or destination)",
"enum": [
"source",
"destination"
],
"type": "string"
},
"previous_version": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The version before the operation (None if not available)"
},
"new_version": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The version after the operation (None if cleared or failed)"
},
"was_pinned_before": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": null,
"description": "Whether a pin was active before the operation"
},
"is_pinned_after": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": null,
"description": "Whether a pin is active after the operation"
},
"customer_tier": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Customer tier of the affected entity (TIER_0, TIER_1, TIER_2). Included as a guardrail annotation."
},
"is_eu": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": null,
"description": "Whether the affected entity is in the EU region."
},
"tier_warning": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Warning message if the operation targets a sensitive customer tier."
}
},
"required": [
"success",
"message",
"connector_id",
"connector_type"
],
"type": "object"
}
set_organization_connector_version_override
Hints: destructive · open-world
Set or clear an organization-level version override for a connector type.
This pins ALL instances of a connector type across an entire organization to a specific version. For example, pinning 'source-github' at organization level means all GitHub sources in all workspaces within that organization will use the pinned version.
Admin-only operation - Requires:
- AIRBYTE_INTERNAL_ADMIN_FLAG=airbyte.io environment variable
- issue_url parameter (GitHub issue URL for context)
- approval_comment_url (Slack approval record URL from
escalate_to_human)
You must specify EXACTLY ONE of version OR unset=True, but not both.
When setting a version, override_reason is required.
The customer_tier_filter parameter gates the operation: the call fails if
the actual tier of the organization does not match. Use ALL to bypass
the check (a warning is still emitted for sensitive tiers).
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
organization_id |
string |
yes | — | The Airbyte Cloud organization ID. |
connector_name |
string |
yes | — | The connector name (e.g., 'source-github', 'destination-bigquery'). |
connector_type |
enum("source", "destination") |
yes | — | The type of connector (source or destination) |
approval_comment_url |
string | null |
no | null |
URL to the Slack approval record. Obtain this by calling the escalate_to_human tool with approval_requested=True; the backend delivers the approval record URL when a human clicks Approve. Format: https:// |
version |
string | null |
no | null |
The semver version string to pin to (e.g., '0.1.0'). Must be None if unset is True. |
unset |
boolean |
no | false |
If True, removes any existing version override. Cannot be True if version is provided. |
override_reason |
string | null |
no | null |
Required when setting a version. Explanation for the override (min 10 characters). |
override_reason_reference_url |
string | null |
no | null |
Optional URL with more context (e.g., issue link). |
issue_url |
string | null |
no | null |
URL to the GitHub issue providing context for this operation. Must be a valid GitHub URL (https://github.com/...). Required for authorization. |
ai_agent_session_url |
string | null |
no | null |
URL to the AI agent session driving this operation, if applicable. Provides additional auditability for AI-driven operations. |
force |
boolean |
no | false |
If True, allow overwriting an existing version pin. Existing pins may have been set by rollouts, breaking-change migrations, or other operators. Defaults to False. NOTE: force=True only bypasses the existing-pin check — major-version crossings are always blocked and cannot be overridden. |
config_api_root |
string | null |
no | null |
Optional API root URL override for the Config API. Defaults to Airbyte Cloud (https://cloud.airbyte.com/api/v1). Use this to target local or self-hosted deployments. |
customer_tier_filter |
enum("TIER_0", "TIER_1", "TIER_2", "ALL") |
no | "TIER_2" |
Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. The operation will be rejected if the actual customer tier does not match. Use 'ALL' to proceed regardless of tier (a warning is shown for sensitive tiers). |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"organization_id": {
"description": "The Airbyte Cloud organization ID.",
"type": "string"
},
"connector_name": {
"description": "The connector name (e.g., 'source-github', 'destination-bigquery').",
"type": "string"
},
"connector_type": {
"description": "The type of connector (source or destination)",
"enum": [
"source",
"destination"
],
"type": "string"
},
"approval_comment_url": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "URL to the Slack approval record. Obtain this by calling the `escalate_to_human` tool with `approval_requested=True`; the backend delivers the approval record URL when a human clicks Approve. Format: https://<workspace>.slack.com/archives/... The admin email is automatically resolved from the approver's identity via the team roster."
},
"version": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The semver version string to pin to (e.g., '0.1.0'). Must be None if unset is True."
},
"unset": {
"default": false,
"description": "If True, removes any existing version override. Cannot be True if version is provided.",
"type": "boolean"
},
"override_reason": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Required when setting a version. Explanation for the override (min 10 characters)."
},
"override_reason_reference_url": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional URL with more context (e.g., issue link)."
},
"issue_url": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "URL to the GitHub issue providing context for this operation. Must be a valid GitHub URL (https://github.com/...). Required for authorization."
},
"ai_agent_session_url": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "URL to the AI agent session driving this operation, if applicable. Provides additional auditability for AI-driven operations."
},
"force": {
"default": false,
"description": "If `True`, allow overwriting an existing version pin. Existing pins may have been set by rollouts, breaking-change migrations, or other operators. Defaults to `False`. NOTE: `force=True` only bypasses the existing-pin check \u2014 major-version crossings are always blocked and cannot be overridden.",
"type": "boolean"
},
"config_api_root": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional API root URL override for the Config API. Defaults to Airbyte Cloud (https://cloud.airbyte.com/api/v1). Use this to target local or self-hosted deployments."
},
"customer_tier_filter": {
"default": "TIER_2",
"description": "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. The operation will be rejected if the actual customer tier does not match. Use 'ALL' to proceed regardless of tier (a warning is shown for sensitive tiers).",
"enum": [
"TIER_0",
"TIER_1",
"TIER_2",
"ALL"
],
"type": "string"
}
},
"required": [
"organization_id",
"connector_name",
"connector_type"
],
"type": "object"
}
Show output JSON schema
{
"description": "Result of an organization-level version override operation.\n\nThis model provides detailed information about the outcome of an organization-level\nversion pinning or unpinning operation.",
"properties": {
"success": {
"description": "Whether the operation succeeded",
"type": "boolean"
},
"message": {
"description": "Human-readable message describing the result",
"type": "string"
},
"organization_id": {
"description": "The organization ID",
"type": "string"
},
"connector_name": {
"description": "The connector name (e.g., 'source-github')",
"type": "string"
},
"connector_type": {
"description": "The type of connector (source or destination)",
"enum": [
"source",
"destination"
],
"type": "string"
},
"version": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The version that was pinned (None if cleared or failed)"
},
"customer_tier": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Customer tier of the organization (TIER_0, TIER_1, TIER_2). Included as a guardrail annotation."
},
"tier_warning": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Warning message if the operation targets a sensitive customer tier."
}
},
"required": [
"success",
"message",
"organization_id",
"connector_name",
"connector_type"
],
"type": "object"
}
set_workspace_connector_version_override
Hints: destructive · open-world
Set or clear a workspace-level version override for a connector type.
This pins ALL instances of a connector type within a workspace to a specific version. For example, pinning 'source-github' at workspace level means all GitHub sources in that workspace will use the pinned version.
Admin-only operation - Requires:
- AIRBYTE_INTERNAL_ADMIN_FLAG=airbyte.io environment variable
- issue_url parameter (GitHub issue URL for context)
- approval_comment_url (Slack approval record URL from
escalate_to_human)
You must specify EXACTLY ONE of version OR unset=True, but not both.
When setting a version, override_reason is required.
The customer_tier_filter parameter gates the operation: the call fails if
the actual tier of the workspace's organization does not match. Use ALL
to bypass the check (a warning is still emitted for sensitive tiers).
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
workspace_id |
string | enum("266ebdfe-0d7b-4540-9817-de7e4505ba61") |
yes | — | The Airbyte Cloud workspace ID (UUID) or alias. Accepts '@devin-ai-sandbox' as an alias for the Devin AI sandbox workspace. |
connector_name |
string |
yes | — | The connector name (e.g., 'source-github', 'destination-bigquery'). |
connector_type |
enum("source", "destination") |
yes | — | The type of connector (source or destination) |
approval_comment_url |
string | null |
no | null |
URL to the Slack approval record. Obtain this by calling the escalate_to_human tool with approval_requested=True; the backend delivers the approval record URL when a human clicks Approve. Format: https:// |
version |
string | null |
no | null |
The semver version string to pin to (e.g., '0.1.0'). Must be None if unset is True. |
unset |
boolean |
no | false |
If True, removes any existing version override. Cannot be True if version is provided. |
override_reason |
string | null |
no | null |
Required when setting a version. Explanation for the override (min 10 characters). |
override_reason_reference_url |
string | null |
no | null |
Optional URL with more context (e.g., issue link). |
issue_url |
string | null |
no | null |
URL to the GitHub issue providing context for this operation. Must be a valid GitHub URL (https://github.com/...). Required for authorization. |
ai_agent_session_url |
string | null |
no | null |
URL to the AI agent session driving this operation, if applicable. Provides additional auditability for AI-driven operations. |
force |
boolean |
no | false |
If True, allow overwriting an existing version pin. Existing pins may have been set by rollouts, breaking-change migrations, or other operators. Defaults to False. NOTE: force=True only bypasses the existing-pin check — major-version crossings are always blocked and cannot be overridden. |
config_api_root |
string | null |
no | null |
Optional API root URL override for the Config API. Defaults to Airbyte Cloud (https://cloud.airbyte.com/api/v1). Use this to target local or self-hosted deployments. |
customer_tier_filter |
enum("TIER_0", "TIER_1", "TIER_2", "ALL") |
no | "TIER_2" |
Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. The operation will be rejected if the actual customer tier does not match. Use 'ALL' to proceed regardless of tier (a warning is shown for sensitive tiers). |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"description": "Workspace ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@devin-ai-sandbox\") and its value\nis the actual workspace UUID. Use `WorkspaceAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
"enum": [
"266ebdfe-0d7b-4540-9817-de7e4505ba61"
],
"type": "string"
}
],
"description": "The Airbyte Cloud workspace ID (UUID) or alias. Accepts '@devin-ai-sandbox' as an alias for the Devin AI sandbox workspace."
},
"connector_name": {
"description": "The connector name (e.g., 'source-github', 'destination-bigquery').",
"type": "string"
},
"connector_type": {
"description": "The type of connector (source or destination)",
"enum": [
"source",
"destination"
],
"type": "string"
},
"approval_comment_url": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "URL to the Slack approval record. Obtain this by calling the `escalate_to_human` tool with `approval_requested=True`; the backend delivers the approval record URL when a human clicks Approve. Format: https://<workspace>.slack.com/archives/... The admin email is automatically resolved from the approver's identity via the team roster."
},
"version": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The semver version string to pin to (e.g., '0.1.0'). Must be None if unset is True."
},
"unset": {
"default": false,
"description": "If True, removes any existing version override. Cannot be True if version is provided.",
"type": "boolean"
},
"override_reason": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Required when setting a version. Explanation for the override (min 10 characters)."
},
"override_reason_reference_url": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional URL with more context (e.g., issue link)."
},
"issue_url": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "URL to the GitHub issue providing context for this operation. Must be a valid GitHub URL (https://github.com/...). Required for authorization."
},
"ai_agent_session_url": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "URL to the AI agent session driving this operation, if applicable. Provides additional auditability for AI-driven operations."
},
"force": {
"default": false,
"description": "If `True`, allow overwriting an existing version pin. Existing pins may have been set by rollouts, breaking-change migrations, or other operators. Defaults to `False`. NOTE: `force=True` only bypasses the existing-pin check \u2014 major-version crossings are always blocked and cannot be overridden.",
"type": "boolean"
},
"config_api_root": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional API root URL override for the Config API. Defaults to Airbyte Cloud (https://cloud.airbyte.com/api/v1). Use this to target local or self-hosted deployments."
},
"customer_tier_filter": {
"default": "TIER_2",
"description": "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. The operation will be rejected if the actual customer tier does not match. Use 'ALL' to proceed regardless of tier (a warning is shown for sensitive tiers).",
"enum": [
"TIER_0",
"TIER_1",
"TIER_2",
"ALL"
],
"type": "string"
}
},
"required": [
"workspace_id",
"connector_name",
"connector_type"
],
"type": "object"
}
Show output JSON schema
{
"description": "Result of a workspace-level version override operation.\n\nThis model provides detailed information about the outcome of a workspace-level\nversion pinning or unpinning operation.",
"properties": {
"success": {
"description": "Whether the operation succeeded",
"type": "boolean"
},
"message": {
"description": "Human-readable message describing the result",
"type": "string"
},
"workspace_id": {
"description": "The workspace ID",
"type": "string"
},
"connector_name": {
"description": "The connector name (e.g., 'source-github')",
"type": "string"
},
"connector_type": {
"description": "The type of connector (source or destination)",
"enum": [
"source",
"destination"
],
"type": "string"
},
"version": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The version that was pinned (None if cleared or failed)"
},
"customer_tier": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Customer tier of the workspace's organization (TIER_0, TIER_1, TIER_2). Included as a guardrail annotation."
},
"is_eu": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": null,
"description": "Whether the workspace is in the EU region."
},
"tier_warning": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Warning message if the operation targets a sensitive customer tier."
}
},
"required": [
"success",
"message",
"workspace_id",
"connector_name",
"connector_type"
],
"type": "object"
}
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2"""MCP tools for cloud connector version management. 3 4This module provides MCP tools for viewing and managing connector version 5overrides (pins) in Airbyte Cloud. These tools enable admins to pin connectors 6to specific versions for troubleshooting or stability purposes. 7 8Uses direct API client calls with either bearer token or client credentials auth. 9 10## MCP reference 11 12.. include:: ../../../docs/mcp-generated/cloud_connector_versions.md 13 :start-line: 2 14""" 15 16# NOTE: We intentionally do NOT use `from __future__ import annotations` here. 17# FastMCP has issues resolving forward references when PEP 563 deferred annotations 18# are used. See: https://github.com/jlowin/fastmcp/issues/905 19# Python 3.12+ supports modern type hint syntax natively, so this is not needed. 20 21__all__: list[str] = [] 22 23import logging 24from dataclasses import dataclass 25from typing import Annotated, Literal 26 27import requests as _requests 28from airbyte import constants 29from airbyte.exceptions import PyAirbyteInputError 30from fastmcp import Context, FastMCP 31from fastmcp_extensions import get_mcp_config, mcp_tool, register_mcp_tools 32from pydantic import Field 33 34from airbyte_ops_mcp.approval_resolution import ( 35 ApprovalResolutionError, 36 resolve_admin_email_from_approval, 37) 38from airbyte_ops_mcp.cloud_admin import api_client 39from airbyte_ops_mcp.cloud_admin.api_client import ( 40 _get_access_token, 41 _ScopeType, 42) 43from airbyte_ops_mcp.cloud_admin.auth import ( 44 CloudAuthError, 45 require_internal_admin_flag_only, 46) 47from airbyte_ops_mcp.cloud_admin.models import ( 48 ConnectorVersionInfo, 49 OrganizationVersionOverrideResult, 50 VersionOverrideOperationResult, 51 WorkspaceVersionOverrideResult, 52) 53from airbyte_ops_mcp.cloud_admin.version_guard import ( 54 check_existing_pins, 55) 56from airbyte_ops_mcp.constants import USER_AGENT, ServerConfigKey, WorkspaceAliasEnum 57from airbyte_ops_mcp.tier_cache import TierFilter, get_org_tier, resolve_workspace 58 59 60def _build_tier_warning(customer_tier: str) -> str | None: 61 """Build a warning message for sensitive customer tiers.""" 62 if customer_tier == "TIER_0": 63 return ( 64 "WARNING: This is a TIER_0 (highest-value) customer. " 65 "Proceed with extreme caution." 66 ) 67 if customer_tier == "TIER_1": 68 return "WARNING: This is a TIER_1 (high-value) customer. Proceed with caution." 69 return None 70 71 72def _validate_tier_filter( 73 actual_tier: str, 74 requested_filter: TierFilter, 75) -> tuple[bool, str | None]: 76 """Check if actual tier matches the requested filter. 77 78 Returns `(ok, error_message)`. When `ok` is `False` the caller should 79 reject the operation with `error_message`. 80 """ 81 if requested_filter == "ALL": 82 return True, None 83 if actual_tier != requested_filter: 84 return False, ( 85 f"Tier mismatch: the target entity is {actual_tier} but the requested " 86 f"tier filter is {requested_filter}. Either specify the correct tier " 87 f"or use 'ALL' to proceed with a warning." 88 ) 89 return True, None 90 91 92logger = logging.getLogger(__name__) 93 94 95@dataclass(frozen=True) 96class _ResolvedCloudAuth: 97 """Resolved authentication for Airbyte Cloud API calls. 98 99 Either bearer_token OR (client_id AND client_secret) will be set, not both. 100 """ 101 102 bearer_token: str | None = None 103 client_id: str | None = None 104 client_secret: str | None = None 105 106 107def _resolve_cloud_auth(ctx: Context) -> _ResolvedCloudAuth: 108 """Resolve authentication credentials for Airbyte Cloud API. 109 110 Credentials are resolved in priority order: 111 1. Bearer token (Authorization header or AIRBYTE_CLOUD_BEARER_TOKEN env var) 112 2. Client credentials (X-Airbyte-Cloud-Client-Id/Secret headers or env vars) 113 114 Args: 115 ctx: FastMCP Context object from the current tool invocation. 116 117 Returns: 118 _ResolvedCloudAuth with either bearer_token or client credentials set. 119 120 Raises: 121 CloudAuthError: If credentials cannot be resolved from headers or env vars. 122 """ 123 # Try bearer token first (preferred, but not required) 124 bearer_token = get_mcp_config(ctx, ServerConfigKey.BEARER_TOKEN) 125 if bearer_token: 126 return _ResolvedCloudAuth(bearer_token=bearer_token) 127 128 # Fall back to client credentials 129 try: 130 client_id = get_mcp_config(ctx, ServerConfigKey.CLIENT_ID) 131 client_secret = get_mcp_config(ctx, ServerConfigKey.CLIENT_SECRET) 132 return _ResolvedCloudAuth( 133 client_id=client_id, 134 client_secret=client_secret, 135 ) 136 except ValueError as e: 137 raise CloudAuthError( 138 f"Failed to resolve credentials. Ensure credentials are provided " 139 f"via Authorization header (Bearer token), " 140 f"HTTP headers (X-Airbyte-Cloud-Client-Id, X-Airbyte-Cloud-Client-Secret), " 141 f"or environment variables. Error: {e}" 142 ) from e 143 144 145@mcp_tool( 146 read_only=True, 147 idempotent=True, 148 open_world=True, 149) 150def get_cloud_connector_version( 151 workspace_id: Annotated[ 152 str | WorkspaceAliasEnum, 153 Field( 154 description="The Airbyte Cloud workspace ID (UUID) or alias. " 155 "Accepts '@devin-ai-sandbox' as an alias for the Devin AI sandbox workspace." 156 ), 157 ], 158 actor_id: Annotated[ 159 str, "The ID of the deployed connector (source or destination)" 160 ], 161 actor_type: Annotated[ 162 Literal["source", "destination"], 163 "The type of connector (source or destination)", 164 ], 165 config_api_root: Annotated[ 166 str | None, 167 Field( 168 description="Optional API root URL override for the Config API. " 169 "Defaults to Airbyte Cloud (https://cloud.airbyte.com/api/v1). " 170 "Use this to target local or self-hosted deployments.", 171 default=None, 172 ), 173 ] = None, 174 *, 175 ctx: Context, 176) -> ConnectorVersionInfo: 177 """Get the current version information for a deployed connector. 178 179 Returns version details including the current version string and whether 180 an override (pin) is applied. 181 182 Authentication credentials are resolved in priority order: 183 1. Bearer token (Authorization header or AIRBYTE_CLOUD_BEARER_TOKEN env var) 184 2. HTTP headers: X-Airbyte-Cloud-Client-Id, X-Airbyte-Cloud-Client-Secret 185 3. Environment variables: AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET 186 """ 187 # Resolve workspace ID alias 188 resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id) 189 190 auth = _resolve_cloud_auth(ctx) 191 192 # Use vendored API client instead of connector.get_connector_version() 193 # Use Config API root for version management operations 194 # Pass workspace_id to get detailed scoped configuration context 195 version_data = api_client.get_connector_version( 196 connector_id=actor_id, 197 connector_type=actor_type, 198 config_api_root=config_api_root or constants.CLOUD_CONFIG_API_ROOT, 199 client_id=auth.client_id, 200 client_secret=auth.client_secret, 201 bearer_token=auth.bearer_token, 202 workspace_id=resolved_workspace_id, 203 ) 204 205 # Determine if version is pinned from scoped config context (more reliable) 206 # The API's isVersionOverrideApplied only returns true for USER-created pins, 207 # not system-generated pins (e.g., breaking_change origin). Check scopedConfigs 208 # for a more accurate picture of whether ANY pin exists. 209 scoped_configs = version_data.get("scopedConfigs", {}) 210 has_any_pin = ( 211 any(config is not None for config in scoped_configs.values()) 212 if scoped_configs 213 else False 214 ) 215 216 # Use scoped config existence as the source of truth for "is pinned" 217 # Fall back to API's isVersionOverrideApplied if no scoped config data 218 is_pinned = ( 219 has_any_pin if scoped_configs else version_data["isVersionOverrideApplied"] 220 ) 221 222 return ConnectorVersionInfo( 223 connector_id=actor_id, 224 connector_type=actor_type, 225 version=version_data["dockerImageTag"], 226 is_version_pinned=is_pinned, 227 ) 228 229 230@mcp_tool( 231 destructive=True, 232 idempotent=False, 233 open_world=True, 234) 235def set_cloud_connector_version_override( 236 workspace_id: Annotated[ 237 str | WorkspaceAliasEnum, 238 Field( 239 description="The Airbyte Cloud workspace ID (UUID) or alias. " 240 "Accepts '@devin-ai-sandbox' as an alias for the Devin AI sandbox workspace." 241 ), 242 ], 243 actor_id: Annotated[ 244 str, "The ID of the deployed connector (source or destination)" 245 ], 246 actor_type: Annotated[ 247 Literal["source", "destination"], 248 "The type of connector (source or destination)", 249 ], 250 approval_comment_url: Annotated[ 251 str | None, 252 Field( 253 description="URL to the Slack approval record. Obtain this by calling the " 254 "`escalate_to_human` tool with `approval_requested=True`; the backend delivers " 255 "the approval record URL when a human clicks Approve. " 256 "Format: https://<workspace>.slack.com/archives/... " 257 "The admin email is automatically resolved from the approver's identity " 258 "via the team roster.", 259 default=None, 260 ), 261 ], 262 version: Annotated[ 263 str | None, 264 Field( 265 description="The semver version string to pin to (e.g., '0.1.0'). " 266 "Must be None if unset is True.", 267 default=None, 268 ), 269 ], 270 unset: Annotated[ 271 bool, 272 Field( 273 description="If True, removes any existing version override. " 274 "Cannot be True if version is provided.", 275 default=False, 276 ), 277 ], 278 override_reason: Annotated[ 279 str | None, 280 Field( 281 description="Required when setting a version. " 282 "Explanation for the override (min 10 characters).", 283 default=None, 284 ), 285 ], 286 override_reason_reference_url: Annotated[ 287 str | None, 288 Field( 289 description="Optional URL with more context (e.g., issue link).", 290 default=None, 291 ), 292 ], 293 issue_url: Annotated[ 294 str | None, 295 Field( 296 description="URL to the GitHub issue providing context for this operation. " 297 "Must be a valid GitHub URL (https://github.com/...). Required for authorization.", 298 default=None, 299 ), 300 ], 301 ai_agent_session_url: Annotated[ 302 str | None, 303 Field( 304 description="URL to the AI agent session driving this operation, if applicable. " 305 "Provides additional auditability for AI-driven operations.", 306 default=None, 307 ), 308 ] = None, 309 force: Annotated[ 310 bool, 311 Field( 312 description="If `True`, allow overwriting an existing version pin. " 313 "Existing pins may have been set by rollouts, breaking-change migrations, " 314 "or other operators. Defaults to `False`. NOTE: `force=True` only " 315 "bypasses the existing-pin check — major-version crossings are always " 316 "blocked and cannot be overridden.", 317 default=False, 318 ), 319 ] = False, 320 config_api_root: Annotated[ 321 str | None, 322 Field( 323 description="Optional API root URL override for the Config API. " 324 "Defaults to Airbyte Cloud (https://cloud.airbyte.com/api/v1). " 325 "Use this to target local or self-hosted deployments.", 326 default=None, 327 ), 328 ] = None, 329 customer_tier_filter: Annotated[ 330 TierFilter, 331 Field( 332 description=( 333 "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. " 334 "The operation will be rejected if the actual customer tier does not match. " 335 "Use 'ALL' to proceed regardless of tier (a warning is shown for sensitive tiers)." 336 ), 337 ), 338 ] = "TIER_2", 339 *, 340 ctx: Context, 341) -> VersionOverrideOperationResult: 342 """Set or clear a version override for a deployed connector. 343 344 **Admin-only operation** - Requires: 345 - AIRBYTE_INTERNAL_ADMIN_FLAG=airbyte.io environment variable 346 - issue_url parameter (GitHub issue URL for context) 347 - approval_comment_url (Slack approval record URL from `escalate_to_human`) 348 349 The admin user email is automatically derived from the Slack approval record, 350 resolving the approver's @airbyte.io email via the team roster. 351 352 You must specify EXACTLY ONE of `version` OR `unset=True`, but not both. 353 When setting a version, `override_reason` is required. 354 355 The `customer_tier_filter` parameter gates the operation: the call fails if 356 the actual tier of the workspace's organization does not match. Use `ALL` 357 to bypass the check (a warning is still emitted for sensitive tiers). 358 359 Business rules enforced: 360 - Dev versions (-dev): Only creator can unpin their own dev version override 361 - Production versions: Require strong justification mentioning customer/support/investigation 362 - Release candidates (-rc): Any admin can pin/unpin RC versions 363 364 Authentication credentials are resolved in priority order: 365 1. Bearer token (Authorization header or AIRBYTE_CLOUD_BEARER_TOKEN env var) 366 2. HTTP headers: X-Airbyte-Cloud-Client-Id, X-Airbyte-Cloud-Client-Secret 367 3. Environment variables: AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET 368 """ 369 # Resolve workspace ID alias (workspace_id is required, so resolved value is never None) 370 resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id) 371 assert resolved_workspace_id is not None # Type narrowing: workspace_id is required 372 373 # Validate admin access (check env var flag) 374 try: 375 require_internal_admin_flag_only() 376 except CloudAuthError as e: 377 return VersionOverrideOperationResult( 378 success=False, 379 message=f"Admin authentication failed: {e}", 380 connector_id=actor_id, 381 connector_type=actor_type, 382 ) 383 384 # Validate authorization parameters 385 validation_errors: list[str] = [] 386 387 if not issue_url: 388 validation_errors.append( 389 "issue_url is required for authorization (GitHub issue URL)" 390 ) 391 elif not issue_url.startswith("https://github.com/"): 392 validation_errors.append( 393 f"issue_url must be a valid GitHub URL (https://github.com/...), got: {issue_url}" 394 ) 395 396 if not approval_comment_url: 397 validation_errors.append( 398 "'approval_comment_url' is required. Use `escalate_to_human` with " 399 "`approval_requested=True` to obtain a Slack approval record URL." 400 ) 401 402 if validation_errors: 403 return VersionOverrideOperationResult( 404 success=False, 405 message="Authorization validation failed: " + "; ".join(validation_errors), 406 connector_id=actor_id, 407 connector_type=actor_type, 408 ) 409 410 # Derive admin email from the approval URL (domain-based dispatch) 411 try: 412 admin_user_email = resolve_admin_email_from_approval( 413 approval_comment_url=approval_comment_url, 414 ) 415 except ApprovalResolutionError as e: 416 return VersionOverrideOperationResult( 417 success=False, 418 message=str(e), 419 connector_id=actor_id, 420 connector_type=actor_type, 421 ) 422 423 # Tier guardrail: resolve workspace tier BEFORE the destructive operation 424 ws_resolution = resolve_workspace(resolved_workspace_id) 425 if not ws_resolution.organization_id: 426 return VersionOverrideOperationResult( 427 success=False, 428 message=f"Could not resolve organization for workspace {resolved_workspace_id}", 429 connector_id=actor_id, 430 connector_type=actor_type, 431 ) 432 433 customer_tier = ws_resolution.customer_tier 434 is_eu = ( 435 ws_resolution.dataplane_name == "EU" if ws_resolution.dataplane_name else None 436 ) 437 tier_warning = _build_tier_warning(customer_tier) 438 439 tier_ok, tier_error = _validate_tier_filter(customer_tier, customer_tier_filter) 440 if not tier_ok: 441 return VersionOverrideOperationResult( 442 success=False, 443 message=tier_error or "Tier filter mismatch", 444 connector_id=actor_id, 445 connector_type=actor_type, 446 customer_tier=customer_tier, 447 is_eu=is_eu, 448 tier_warning=tier_warning, 449 ) 450 451 # Build enhanced override reason with audit fields (only for 'set' operations) 452 enhanced_override_reason = override_reason 453 if not unset and override_reason: 454 audit_parts = [override_reason] 455 audit_parts.append(f"Issue: {issue_url}") 456 audit_parts.append(f"Approval: {approval_comment_url}") 457 if ai_agent_session_url: 458 audit_parts.append(f"AI Session: {ai_agent_session_url}") 459 enhanced_override_reason = " | ".join(audit_parts) 460 461 # Resolve auth and get current version info 462 resolved_config_api_root = config_api_root or constants.CLOUD_CONFIG_API_ROOT 463 try: 464 auth = _resolve_cloud_auth(ctx) 465 466 # Get current version info before the operation 467 current_version_data = api_client.get_connector_version( 468 connector_id=actor_id, 469 connector_type=actor_type, 470 config_api_root=resolved_config_api_root, 471 client_id=auth.client_id, 472 client_secret=auth.client_secret, 473 bearer_token=auth.bearer_token, 474 ) 475 current_version = current_version_data["dockerImageTag"] 476 was_pinned_before = current_version_data["isVersionOverrideApplied"] 477 478 except CloudAuthError as e: 479 return VersionOverrideOperationResult( 480 success=False, 481 message=f"Failed to resolve credentials or get connector: {e}", 482 connector_id=actor_id, 483 connector_type=actor_type, 484 ) 485 486 # Existing-pin guard: check actor, workspace, and org scopes (skip when unsetting) 487 if not unset and version: 488 try: 489 access_token = _get_access_token( 490 auth.client_id, auth.client_secret, auth.bearer_token 491 ) 492 # Get actor_definition_id from the connector info 493 get_endpoint = f"{resolved_config_api_root}/{actor_type}s/get" 494 get_resp = _requests.post( 495 get_endpoint, 496 json={f"{actor_type}Id": actor_id}, 497 headers={ 498 "Authorization": f"Bearer {access_token}", 499 "User-Agent": USER_AGENT, 500 "Content-Type": "application/json", 501 }, 502 timeout=30, 503 ) 504 get_resp.raise_for_status() 505 actor_definition_id = get_resp.json().get(f"{actor_type}DefinitionId") 506 if not actor_definition_id: 507 return VersionOverrideOperationResult( 508 success=False, 509 message=f"Could not find {actor_type}DefinitionId in connector info for actor {actor_id}", 510 connector_id=actor_id, 511 connector_type=actor_type, 512 previous_version=current_version, 513 was_pinned_before=was_pinned_before, 514 ) 515 516 guard = check_existing_pins( 517 scopes=[ 518 (_ScopeType.ACTOR, actor_id, "actor"), 519 (_ScopeType.WORKSPACE, resolved_workspace_id, "workspace"), 520 ( 521 _ScopeType.ORGANIZATION, 522 ws_resolution.organization_id, 523 "organization", 524 ), 525 ], 526 actor_definition_id=actor_definition_id, 527 config_api_root=resolved_config_api_root, 528 access_token=access_token, 529 target_version=version, 530 force=force, 531 ) 532 if guard.blocked: 533 assert guard.error_msg is not None # narrowing for type checker 534 return VersionOverrideOperationResult( 535 success=False, 536 message=guard.error_msg, 537 connector_id=actor_id, 538 connector_type=actor_type, 539 previous_version=current_version, 540 was_pinned_before=was_pinned_before, 541 ) 542 except ( 543 PyAirbyteInputError, 544 CloudAuthError, 545 _requests.exceptions.HTTPError, 546 ) as e: 547 return VersionOverrideOperationResult( 548 success=False, 549 message=f"Pin guard check failed: {e}", 550 connector_id=actor_id, 551 connector_type=actor_type, 552 previous_version=current_version, 553 was_pinned_before=was_pinned_before, 554 ) 555 556 # Call vendored API client's set_connector_version_override method 557 try: 558 result = api_client.set_connector_version_override( 559 connector_id=actor_id, 560 connector_type=actor_type, 561 config_api_root=resolved_config_api_root, 562 client_id=auth.client_id, 563 client_secret=auth.client_secret, 564 workspace_id=resolved_workspace_id, 565 version=version, 566 unset=unset, 567 override_reason=enhanced_override_reason, 568 override_reason_reference_url=override_reason_reference_url, 569 user_email=admin_user_email, 570 bearer_token=auth.bearer_token, 571 ) 572 573 # Get updated version info after the operation 574 updated_version_data = api_client.get_connector_version( 575 connector_id=actor_id, 576 connector_type=actor_type, 577 config_api_root=resolved_config_api_root, 578 client_id=auth.client_id, 579 client_secret=auth.client_secret, 580 bearer_token=auth.bearer_token, 581 ) 582 new_version = updated_version_data["dockerImageTag"] if not unset else None 583 is_pinned_after = updated_version_data["isVersionOverrideApplied"] 584 585 if unset: 586 if result: 587 message = "Successfully cleared version override. Connector will now use default version." 588 else: 589 message = "No version override was active (nothing to clear)" 590 else: 591 message = f"Successfully pinned connector to version {version}" 592 593 if tier_warning: 594 message = f"{tier_warning} {message}" 595 596 return VersionOverrideOperationResult( 597 success=True, 598 message=message, 599 connector_id=actor_id, 600 connector_type=actor_type, 601 previous_version=current_version, 602 new_version=new_version, 603 was_pinned_before=was_pinned_before, 604 is_pinned_after=is_pinned_after, 605 customer_tier=customer_tier, 606 is_eu=is_eu, 607 tier_warning=tier_warning, 608 ) 609 610 except PyAirbyteInputError as e: 611 # PyAirbyte raises this for validation errors and permission denials 612 return VersionOverrideOperationResult( 613 success=False, 614 message=str(e), 615 connector_id=actor_id, 616 connector_type=actor_type, 617 previous_version=current_version, 618 was_pinned_before=was_pinned_before, 619 ) 620 621 622@mcp_tool( 623 destructive=True, 624 idempotent=False, 625 open_world=True, 626) 627def set_workspace_connector_version_override( 628 workspace_id: Annotated[ 629 str | WorkspaceAliasEnum, 630 Field( 631 description="The Airbyte Cloud workspace ID (UUID) or alias. " 632 "Accepts '@devin-ai-sandbox' as an alias for the Devin AI sandbox workspace." 633 ), 634 ], 635 connector_name: Annotated[ 636 str, 637 Field( 638 description="The connector name (e.g., 'source-github', 'destination-bigquery')." 639 ), 640 ], 641 connector_type: Annotated[ 642 Literal["source", "destination"], 643 "The type of connector (source or destination)", 644 ], 645 approval_comment_url: Annotated[ 646 str | None, 647 Field( 648 description="URL to the Slack approval record. Obtain this by calling the " 649 "`escalate_to_human` tool with `approval_requested=True`; the backend delivers " 650 "the approval record URL when a human clicks Approve. " 651 "Format: https://<workspace>.slack.com/archives/... " 652 "The admin email is automatically resolved from the approver's identity " 653 "via the team roster.", 654 default=None, 655 ), 656 ], 657 version: Annotated[ 658 str | None, 659 Field( 660 description="The semver version string to pin to (e.g., '0.1.0'). " 661 "Must be None if unset is True.", 662 default=None, 663 ), 664 ], 665 unset: Annotated[ 666 bool, 667 Field( 668 description="If True, removes any existing version override. " 669 "Cannot be True if version is provided.", 670 default=False, 671 ), 672 ], 673 override_reason: Annotated[ 674 str | None, 675 Field( 676 description="Required when setting a version. " 677 "Explanation for the override (min 10 characters).", 678 default=None, 679 ), 680 ], 681 override_reason_reference_url: Annotated[ 682 str | None, 683 Field( 684 description="Optional URL with more context (e.g., issue link).", 685 default=None, 686 ), 687 ], 688 issue_url: Annotated[ 689 str | None, 690 Field( 691 description="URL to the GitHub issue providing context for this operation. " 692 "Must be a valid GitHub URL (https://github.com/...). Required for authorization.", 693 default=None, 694 ), 695 ], 696 ai_agent_session_url: Annotated[ 697 str | None, 698 Field( 699 description="URL to the AI agent session driving this operation, if applicable. " 700 "Provides additional auditability for AI-driven operations.", 701 default=None, 702 ), 703 ] = None, 704 force: Annotated[ 705 bool, 706 Field( 707 description="If `True`, allow overwriting an existing version pin. " 708 "Existing pins may have been set by rollouts, breaking-change migrations, " 709 "or other operators. Defaults to `False`. NOTE: `force=True` only " 710 "bypasses the existing-pin check — major-version crossings are always " 711 "blocked and cannot be overridden.", 712 default=False, 713 ), 714 ] = False, 715 config_api_root: Annotated[ 716 str | None, 717 Field( 718 description="Optional API root URL override for the Config API. " 719 "Defaults to Airbyte Cloud (https://cloud.airbyte.com/api/v1). " 720 "Use this to target local or self-hosted deployments.", 721 default=None, 722 ), 723 ] = None, 724 customer_tier_filter: Annotated[ 725 TierFilter, 726 Field( 727 description=( 728 "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. " 729 "The operation will be rejected if the actual customer tier does not match. " 730 "Use 'ALL' to proceed regardless of tier (a warning is shown for sensitive tiers)." 731 ), 732 ), 733 ] = "TIER_2", 734 *, 735 ctx: Context, 736) -> WorkspaceVersionOverrideResult: 737 """Set or clear a workspace-level version override for a connector type. 738 739 This pins ALL instances of a connector type within a workspace to a specific version. 740 For example, pinning 'source-github' at workspace level means all GitHub sources 741 in that workspace will use the pinned version. 742 743 **Admin-only operation** - Requires: 744 - AIRBYTE_INTERNAL_ADMIN_FLAG=airbyte.io environment variable 745 - issue_url parameter (GitHub issue URL for context) 746 - approval_comment_url (Slack approval record URL from `escalate_to_human`) 747 748 You must specify EXACTLY ONE of `version` OR `unset=True`, but not both. 749 When setting a version, `override_reason` is required. 750 751 The `customer_tier_filter` parameter gates the operation: the call fails if 752 the actual tier of the workspace's organization does not match. Use `ALL` 753 to bypass the check (a warning is still emitted for sensitive tiers). 754 """ 755 # Resolve workspace ID alias (workspace_id is required, so resolved value is never None) 756 resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id) 757 assert resolved_workspace_id is not None # Type narrowing: workspace_id is required 758 759 # Validate admin access (check env var flag) 760 try: 761 require_internal_admin_flag_only() 762 except CloudAuthError as e: 763 return WorkspaceVersionOverrideResult( 764 success=False, 765 message=f"Admin authentication failed: {e}", 766 workspace_id=resolved_workspace_id, 767 connector_name=connector_name, 768 connector_type=connector_type, 769 ) 770 771 # Validate authorization parameters 772 validation_errors: list[str] = [] 773 774 if not issue_url: 775 validation_errors.append( 776 "issue_url is required for authorization (GitHub issue URL)" 777 ) 778 elif not issue_url.startswith("https://github.com/"): 779 validation_errors.append( 780 f"issue_url must be a valid GitHub URL (https://github.com/...), got: {issue_url}" 781 ) 782 783 if not approval_comment_url: 784 validation_errors.append( 785 "'approval_comment_url' is required. Use `escalate_to_human` with " 786 "`approval_requested=True` to obtain a Slack approval record URL." 787 ) 788 789 if validation_errors: 790 return WorkspaceVersionOverrideResult( 791 success=False, 792 message="Authorization validation failed: " + "; ".join(validation_errors), 793 workspace_id=resolved_workspace_id, 794 connector_name=connector_name, 795 connector_type=connector_type, 796 ) 797 798 # Derive admin email from the approval URL (domain-based dispatch) 799 try: 800 admin_user_email = resolve_admin_email_from_approval( 801 approval_comment_url=approval_comment_url, 802 ) 803 except ApprovalResolutionError as e: 804 return WorkspaceVersionOverrideResult( 805 success=False, 806 message=str(e), 807 workspace_id=resolved_workspace_id, 808 connector_name=connector_name, 809 connector_type=connector_type, 810 ) 811 812 # Tier guardrail: resolve workspace tier BEFORE the destructive operation 813 ws_resolution = resolve_workspace(resolved_workspace_id) 814 if not ws_resolution.organization_id: 815 return WorkspaceVersionOverrideResult( 816 success=False, 817 message=f"Could not resolve organization for workspace {resolved_workspace_id}", 818 workspace_id=resolved_workspace_id, 819 connector_name=connector_name, 820 connector_type=connector_type, 821 ) 822 823 customer_tier = ws_resolution.customer_tier 824 is_eu = ( 825 ws_resolution.dataplane_name == "EU" if ws_resolution.dataplane_name else None 826 ) 827 tier_warning = _build_tier_warning(customer_tier) 828 829 tier_ok, tier_error = _validate_tier_filter(customer_tier, customer_tier_filter) 830 if not tier_ok: 831 return WorkspaceVersionOverrideResult( 832 success=False, 833 message=tier_error or "Tier filter mismatch", 834 workspace_id=resolved_workspace_id, 835 connector_name=connector_name, 836 connector_type=connector_type, 837 customer_tier=customer_tier, 838 is_eu=is_eu, 839 tier_warning=tier_warning, 840 ) 841 842 # Build enhanced override reason with audit fields (only for 'set' operations) 843 enhanced_override_reason = override_reason 844 if not unset and override_reason: 845 audit_parts = [override_reason] 846 audit_parts.append(f"Issue: {issue_url}") 847 audit_parts.append(f"Approval: {approval_comment_url}") 848 if ai_agent_session_url: 849 audit_parts.append(f"AI Session: {ai_agent_session_url}") 850 enhanced_override_reason = " | ".join(audit_parts) 851 852 # Existing-pin guard: check workspace and org scopes (skip when unsetting) 853 if not unset and version: 854 try: 855 auth = _resolve_cloud_auth(ctx) 856 resolved_config_api_root = ( 857 config_api_root or constants.CLOUD_CONFIG_API_ROOT 858 ) 859 access_token = _get_access_token( 860 auth.client_id, auth.client_secret, auth.bearer_token 861 ) 862 from airbyte_ops_mcp.mcp.prod_db_queries import ( 863 _resolve_canonical_name_to_definition_id, 864 ) 865 866 actor_definition_id = _resolve_canonical_name_to_definition_id( 867 connector_name 868 ) 869 870 guard = check_existing_pins( 871 scopes=[ 872 (_ScopeType.WORKSPACE, resolved_workspace_id, "workspace"), 873 ( 874 _ScopeType.ORGANIZATION, 875 ws_resolution.organization_id, 876 "organization", 877 ), 878 ], 879 actor_definition_id=actor_definition_id, 880 config_api_root=resolved_config_api_root, 881 access_token=access_token, 882 target_version=version, 883 force=force, 884 ) 885 if guard.blocked: 886 assert guard.error_msg is not None # narrowing for type checker 887 return WorkspaceVersionOverrideResult( 888 success=False, 889 message=guard.error_msg, 890 workspace_id=resolved_workspace_id, 891 connector_name=connector_name, 892 connector_type=connector_type, 893 ) 894 except (PyAirbyteInputError, CloudAuthError) as e: 895 return WorkspaceVersionOverrideResult( 896 success=False, 897 message=f"Pin guard check failed: {e}", 898 workspace_id=resolved_workspace_id, 899 connector_name=connector_name, 900 connector_type=connector_type, 901 ) 902 903 # Resolve auth and call API client 904 try: 905 auth = _resolve_cloud_auth(ctx) 906 907 result = api_client.set_workspace_connector_version_override( 908 workspace_id=resolved_workspace_id, 909 connector_name=connector_name, 910 connector_type=connector_type, 911 config_api_root=config_api_root or constants.CLOUD_CONFIG_API_ROOT, 912 client_id=auth.client_id, 913 client_secret=auth.client_secret, 914 bearer_token=auth.bearer_token, 915 version=version, 916 unset=unset, 917 override_reason=enhanced_override_reason, 918 override_reason_reference_url=override_reason_reference_url, 919 user_email=admin_user_email, 920 ) 921 922 if unset: 923 if result: 924 message = f"Successfully cleared workspace-level version override for {connector_name}." 925 else: 926 message = f"No workspace-level version override was active for {connector_name} (nothing to clear)" 927 else: 928 message = f"Successfully pinned {connector_name} to version {version} at workspace level." 929 930 if tier_warning: 931 message = f"{tier_warning} {message}" 932 933 return WorkspaceVersionOverrideResult( 934 success=True, 935 message=message, 936 workspace_id=resolved_workspace_id, 937 connector_name=connector_name, 938 connector_type=connector_type, 939 version=version if not unset else None, 940 customer_tier=customer_tier, 941 is_eu=is_eu, 942 tier_warning=tier_warning, 943 ) 944 945 except PyAirbyteInputError as e: 946 return WorkspaceVersionOverrideResult( 947 success=False, 948 message=str(e), 949 workspace_id=resolved_workspace_id, 950 connector_name=connector_name, 951 connector_type=connector_type, 952 ) 953 except CloudAuthError as e: 954 return WorkspaceVersionOverrideResult( 955 success=False, 956 message=f"Authentication failed: {e}", 957 workspace_id=resolved_workspace_id, 958 connector_name=connector_name, 959 connector_type=connector_type, 960 ) 961 962 963@mcp_tool( 964 destructive=True, 965 idempotent=False, 966 open_world=True, 967) 968def set_organization_connector_version_override( 969 organization_id: Annotated[ 970 str, 971 Field(description="The Airbyte Cloud organization ID."), 972 ], 973 connector_name: Annotated[ 974 str, 975 Field( 976 description="The connector name (e.g., 'source-github', 'destination-bigquery')." 977 ), 978 ], 979 connector_type: Annotated[ 980 Literal["source", "destination"], 981 "The type of connector (source or destination)", 982 ], 983 approval_comment_url: Annotated[ 984 str | None, 985 Field( 986 description="URL to the Slack approval record. Obtain this by calling the " 987 "`escalate_to_human` tool with `approval_requested=True`; the backend delivers " 988 "the approval record URL when a human clicks Approve. " 989 "Format: https://<workspace>.slack.com/archives/... " 990 "The admin email is automatically resolved from the approver's identity " 991 "via the team roster.", 992 default=None, 993 ), 994 ], 995 version: Annotated[ 996 str | None, 997 Field( 998 description="The semver version string to pin to (e.g., '0.1.0'). " 999 "Must be None if unset is True.", 1000 default=None, 1001 ), 1002 ], 1003 unset: Annotated[ 1004 bool, 1005 Field( 1006 description="If True, removes any existing version override. " 1007 "Cannot be True if version is provided.", 1008 default=False, 1009 ), 1010 ], 1011 override_reason: Annotated[ 1012 str | None, 1013 Field( 1014 description="Required when setting a version. " 1015 "Explanation for the override (min 10 characters).", 1016 default=None, 1017 ), 1018 ], 1019 override_reason_reference_url: Annotated[ 1020 str | None, 1021 Field( 1022 description="Optional URL with more context (e.g., issue link).", 1023 default=None, 1024 ), 1025 ], 1026 issue_url: Annotated[ 1027 str | None, 1028 Field( 1029 description="URL to the GitHub issue providing context for this operation. " 1030 "Must be a valid GitHub URL (https://github.com/...). Required for authorization.", 1031 default=None, 1032 ), 1033 ], 1034 ai_agent_session_url: Annotated[ 1035 str | None, 1036 Field( 1037 description="URL to the AI agent session driving this operation, if applicable. " 1038 "Provides additional auditability for AI-driven operations.", 1039 default=None, 1040 ), 1041 ] = None, 1042 force: Annotated[ 1043 bool, 1044 Field( 1045 description="If `True`, allow overwriting an existing version pin. " 1046 "Existing pins may have been set by rollouts, breaking-change migrations, " 1047 "or other operators. Defaults to `False`. NOTE: `force=True` only " 1048 "bypasses the existing-pin check — major-version crossings are always " 1049 "blocked and cannot be overridden.", 1050 default=False, 1051 ), 1052 ] = False, 1053 config_api_root: Annotated[ 1054 str | None, 1055 Field( 1056 description="Optional API root URL override for the Config API. " 1057 "Defaults to Airbyte Cloud (https://cloud.airbyte.com/api/v1). " 1058 "Use this to target local or self-hosted deployments.", 1059 default=None, 1060 ), 1061 ] = None, 1062 customer_tier_filter: Annotated[ 1063 TierFilter, 1064 Field( 1065 description=( 1066 "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. " 1067 "The operation will be rejected if the actual customer tier does not match. " 1068 "Use 'ALL' to proceed regardless of tier (a warning is shown for sensitive tiers)." 1069 ), 1070 ), 1071 ] = "TIER_2", 1072 *, 1073 ctx: Context, 1074) -> OrganizationVersionOverrideResult: 1075 """Set or clear an organization-level version override for a connector type. 1076 1077 This pins ALL instances of a connector type across an entire organization to a 1078 specific version. For example, pinning 'source-github' at organization level means 1079 all GitHub sources in all workspaces within that organization will use the pinned version. 1080 1081 **Admin-only operation** - Requires: 1082 - AIRBYTE_INTERNAL_ADMIN_FLAG=airbyte.io environment variable 1083 - issue_url parameter (GitHub issue URL for context) 1084 - approval_comment_url (Slack approval record URL from `escalate_to_human`) 1085 1086 You must specify EXACTLY ONE of `version` OR `unset=True`, but not both. 1087 When setting a version, `override_reason` is required. 1088 1089 The `customer_tier_filter` parameter gates the operation: the call fails if 1090 the actual tier of the organization does not match. Use `ALL` to bypass 1091 the check (a warning is still emitted for sensitive tiers). 1092 """ 1093 # Validate admin access (check env var flag) 1094 try: 1095 require_internal_admin_flag_only() 1096 except CloudAuthError as e: 1097 return OrganizationVersionOverrideResult( 1098 success=False, 1099 message=f"Admin authentication failed: {e}", 1100 organization_id=organization_id, 1101 connector_name=connector_name, 1102 connector_type=connector_type, 1103 ) 1104 1105 # Validate authorization parameters 1106 validation_errors: list[str] = [] 1107 1108 if not issue_url: 1109 validation_errors.append( 1110 "issue_url is required for authorization (GitHub issue URL)" 1111 ) 1112 elif not issue_url.startswith("https://github.com/"): 1113 validation_errors.append( 1114 f"issue_url must be a valid GitHub URL (https://github.com/...), got: {issue_url}" 1115 ) 1116 1117 if not approval_comment_url: 1118 validation_errors.append( 1119 "'approval_comment_url' is required. Use `escalate_to_human` with " 1120 "`approval_requested=True` to obtain a Slack approval record URL." 1121 ) 1122 1123 if validation_errors: 1124 return OrganizationVersionOverrideResult( 1125 success=False, 1126 message="Authorization validation failed: " + "; ".join(validation_errors), 1127 organization_id=organization_id, 1128 connector_name=connector_name, 1129 connector_type=connector_type, 1130 ) 1131 1132 # Derive admin email from the approval URL (domain-based dispatch) 1133 try: 1134 admin_user_email = resolve_admin_email_from_approval( 1135 approval_comment_url=approval_comment_url, 1136 ) 1137 except ApprovalResolutionError as e: 1138 return OrganizationVersionOverrideResult( 1139 success=False, 1140 message=str(e), 1141 organization_id=organization_id, 1142 connector_name=connector_name, 1143 connector_type=connector_type, 1144 ) 1145 1146 # Tier guardrail: resolve org tier BEFORE the destructive operation 1147 # (placed after auth checks to avoid exposing tier info to unauthenticated callers) 1148 tier_result = get_org_tier(organization_id) 1149 customer_tier = tier_result.customer_tier 1150 tier_warning = _build_tier_warning(customer_tier) 1151 1152 tier_ok, tier_error = _validate_tier_filter(customer_tier, customer_tier_filter) 1153 if not tier_ok: 1154 return OrganizationVersionOverrideResult( 1155 success=False, 1156 message=tier_error or "Tier filter mismatch", 1157 organization_id=organization_id, 1158 connector_name=connector_name, 1159 connector_type=connector_type, 1160 customer_tier=customer_tier, 1161 tier_warning=tier_warning, 1162 ) 1163 1164 # Build enhanced override reason with audit fields (only for 'set' operations) 1165 enhanced_override_reason = override_reason 1166 if not unset and override_reason: 1167 audit_parts = [override_reason] 1168 audit_parts.append(f"Issue: {issue_url}") 1169 audit_parts.append(f"Approval: {approval_comment_url}") 1170 if ai_agent_session_url: 1171 audit_parts.append(f"AI Session: {ai_agent_session_url}") 1172 enhanced_override_reason = " | ".join(audit_parts) 1173 1174 # Existing-pin guard: check org scope (skip when unsetting) 1175 if not unset and version: 1176 try: 1177 auth = _resolve_cloud_auth(ctx) 1178 resolved_config_api_root = ( 1179 config_api_root or constants.CLOUD_CONFIG_API_ROOT 1180 ) 1181 access_token = _get_access_token( 1182 auth.client_id, auth.client_secret, auth.bearer_token 1183 ) 1184 from airbyte_ops_mcp.mcp.prod_db_queries import ( 1185 _resolve_canonical_name_to_definition_id, 1186 ) 1187 1188 actor_definition_id = _resolve_canonical_name_to_definition_id( 1189 connector_name 1190 ) 1191 1192 guard = check_existing_pins( 1193 scopes=[ 1194 (_ScopeType.ORGANIZATION, organization_id, "organization"), 1195 ], 1196 actor_definition_id=actor_definition_id, 1197 config_api_root=resolved_config_api_root, 1198 access_token=access_token, 1199 target_version=version, 1200 force=force, 1201 ) 1202 if guard.blocked: 1203 assert guard.error_msg is not None # narrowing for type checker 1204 return OrganizationVersionOverrideResult( 1205 success=False, 1206 message=guard.error_msg, 1207 organization_id=organization_id, 1208 connector_name=connector_name, 1209 connector_type=connector_type, 1210 ) 1211 except (PyAirbyteInputError, CloudAuthError) as e: 1212 return OrganizationVersionOverrideResult( 1213 success=False, 1214 message=f"Pin guard check failed: {e}", 1215 organization_id=organization_id, 1216 connector_name=connector_name, 1217 connector_type=connector_type, 1218 ) 1219 1220 # Resolve auth and call API client 1221 try: 1222 auth = _resolve_cloud_auth(ctx) 1223 1224 result = api_client.set_organization_connector_version_override( 1225 organization_id=organization_id, 1226 connector_name=connector_name, 1227 connector_type=connector_type, 1228 config_api_root=config_api_root or constants.CLOUD_CONFIG_API_ROOT, 1229 client_id=auth.client_id, 1230 client_secret=auth.client_secret, 1231 bearer_token=auth.bearer_token, 1232 version=version, 1233 unset=unset, 1234 override_reason=enhanced_override_reason, 1235 override_reason_reference_url=override_reason_reference_url, 1236 user_email=admin_user_email, 1237 ) 1238 1239 if unset: 1240 if result: 1241 message = f"Successfully cleared organization-level version override for {connector_name}." 1242 else: 1243 message = f"No organization-level version override was active for {connector_name} (nothing to clear)" 1244 else: 1245 message = f"Successfully pinned {connector_name} to version {version} at organization level." 1246 1247 if tier_warning: 1248 message = f"{tier_warning} {message}" 1249 1250 return OrganizationVersionOverrideResult( 1251 success=True, 1252 message=message, 1253 organization_id=organization_id, 1254 connector_name=connector_name, 1255 connector_type=connector_type, 1256 version=version if not unset else None, 1257 customer_tier=customer_tier, 1258 tier_warning=tier_warning, 1259 ) 1260 1261 except PyAirbyteInputError as e: 1262 return OrganizationVersionOverrideResult( 1263 success=False, 1264 message=str(e), 1265 organization_id=organization_id, 1266 connector_name=connector_name, 1267 connector_type=connector_type, 1268 ) 1269 except CloudAuthError as e: 1270 return OrganizationVersionOverrideResult( 1271 success=False, 1272 message=f"Authentication failed: {e}", 1273 organization_id=organization_id, 1274 connector_name=connector_name, 1275 connector_type=connector_type, 1276 ) 1277 1278 1279def register_cloud_connector_version_tools(app: FastMCP) -> None: 1280 """Register cloud connector version management tools with the FastMCP app. 1281 1282 Args: 1283 app: FastMCP application instance 1284 """ 1285 register_mcp_tools(app, mcp_module=__name__)