airbyte_ops_mcp.mcp.connection_state
MCP tools for connection state and catalog management (read-only).
This module provides MCP tools for reading connection state and catalog in Airbyte Cloud. Connection state tracks the sync progress for each stream and is used for incremental syncs. The configured catalog controls which streams are synced and their sync configuration.
MCP reference
MCP primitives registered by the connection_state module of the airbyte-internal-ops server: 2 tool(s), 0 prompt(s), 0 resource(s).
Tools (2)
get_connection_catalog
Hints: read-only · idempotent · open-world
Get the configured catalog for an Airbyte connection.
Returns the connection's configured catalog, which defines which streams are synced, their sync modes, primary keys, and cursor fields.
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
workspace_id |
string | enum("266ebdfe-0d7b-4540-9817-de7e4505ba61") |
yes | — | The Airbyte Cloud workspace ID (UUID) or alias. Aliases: '@devin-ai-sandbox'. |
connection_id |
string |
yes | — | The connection ID (UUID) to fetch catalog for. |
config_api_root |
string | null |
no | null |
Optional API root URL override. Defaults to Airbyte Cloud. Use this to target local or self-hosted deployments. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"description": "Workspace ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@devin-ai-sandbox\") and its value\nis the actual workspace UUID. Use `WorkspaceAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
"enum": [
"266ebdfe-0d7b-4540-9817-de7e4505ba61"
],
"type": "string"
}
],
"description": "The Airbyte Cloud workspace ID (UUID) or alias. Aliases: '@devin-ai-sandbox'."
},
"connection_id": {
"description": "The connection ID (UUID) to fetch catalog for.",
"type": "string"
},
"config_api_root": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional API root URL override. Defaults to Airbyte Cloud. Use this to target local or self-hosted deployments."
}
},
"required": [
"workspace_id",
"connection_id"
],
"type": "object"
}
Show output JSON schema
{
"additionalProperties": true,
"type": "object"
}
get_connection_state
Hints: read-only · idempotent · open-world
Get the current state for an Airbyte connection.
Returns the connection's sync state, which tracks progress for incremental syncs. The state can be one of: stream (per-stream), global, legacy, or not_set.
When stream_name is provided, returns only the matching stream's inner state blob.
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
workspace_id |
string | enum("266ebdfe-0d7b-4540-9817-de7e4505ba61") |
yes | — | The Airbyte Cloud workspace ID (UUID) or alias. Aliases: '@devin-ai-sandbox'. |
connection_id |
string |
yes | — | The connection ID (UUID) to fetch state for. |
stream_name |
string | null |
no | null |
Optional stream name to filter state for a single stream. When provided, only the matching stream's inner state blob is returned. |
stream_namespace |
string | null |
no | null |
Optional stream namespace to narrow the stream filter. Only used when stream_name is also provided. |
config_api_root |
string | null |
no | null |
Optional API root URL override. Defaults to Airbyte Cloud. Use this to target local or self-hosted deployments. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"description": "Workspace ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@devin-ai-sandbox\") and its value\nis the actual workspace UUID. Use `WorkspaceAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
"enum": [
"266ebdfe-0d7b-4540-9817-de7e4505ba61"
],
"type": "string"
}
],
"description": "The Airbyte Cloud workspace ID (UUID) or alias. Aliases: '@devin-ai-sandbox'."
},
"connection_id": {
"description": "The connection ID (UUID) to fetch state for.",
"type": "string"
},
"stream_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional stream name to filter state for a single stream. When provided, only the matching stream's inner state blob is returned."
},
"stream_namespace": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional stream namespace to narrow the stream filter. Only used when stream_name is also provided."
},
"config_api_root": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional API root URL override. Defaults to Airbyte Cloud. Use this to target local or self-hosted deployments."
}
},
"required": [
"workspace_id",
"connection_id"
],
"type": "object"
}
Show output JSON schema
{
"additionalProperties": true,
"type": "object"
}
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2"""MCP tools for connection state and catalog management (read-only). 3 4This module provides MCP tools for reading connection state and catalog 5in Airbyte Cloud. Connection state tracks the sync progress for each stream 6and is used for incremental syncs. The configured catalog controls which 7streams are synced and their sync configuration. 8 9## MCP reference 10 11.. include:: ../../../docs/mcp-generated/connection_state.md 12 :start-line: 2 13""" 14 15# NOTE: We intentionally do NOT use `from __future__ import annotations` here. 16# FastMCP has issues resolving forward references when PEP 563 deferred annotations 17# are used. See: https://github.com/jlowin/fastmcp/issues/905 18# Python 3.12+ supports modern type hint syntax natively, so this is not needed. 19 20__all__: list[str] = [] 21 22from typing import Annotated, Any 23 24from airbyte import constants 25from airbyte.cloud.connections import CloudConnection 26from airbyte.cloud.workspaces import CloudWorkspace 27from airbyte.secrets.base import SecretString 28from fastmcp import Context, FastMCP 29from fastmcp_extensions import get_mcp_config, mcp_tool, register_mcp_tools 30from pydantic import Field 31 32from airbyte_ops_mcp.cloud_admin.auth import CloudAuthError 33from airbyte_ops_mcp.constants import ServerConfigKey, WorkspaceAliasEnum 34 35 36def _get_cloud_connection( 37 workspace_id: str, 38 connection_id: str, 39 api_root: str, 40 bearer_token: str | None = None, 41 client_id: str | None = None, 42 client_secret: str | None = None, 43) -> CloudConnection: 44 """Create a CloudConnection from credentials.""" 45 workspace = CloudWorkspace( 46 workspace_id=workspace_id, 47 api_root=api_root, 48 client_id=SecretString(client_id) if client_id else None, 49 client_secret=SecretString(client_secret) if client_secret else None, 50 bearer_token=SecretString(bearer_token) if bearer_token else None, 51 ) 52 return workspace.get_connection(connection_id) 53 54 55def _resolve_cloud_auth( 56 ctx: Context, 57) -> tuple[str | None, str | None, str | None]: 58 """Resolve authentication credentials for API calls. 59 60 Returns: 61 Tuple of (bearer_token, client_id, client_secret). 62 """ 63 bearer_token = get_mcp_config(ctx, ServerConfigKey.BEARER_TOKEN) 64 if bearer_token: 65 return bearer_token, None, None 66 67 try: 68 client_id = get_mcp_config(ctx, ServerConfigKey.CLIENT_ID) 69 client_secret = get_mcp_config(ctx, ServerConfigKey.CLIENT_SECRET) 70 return None, client_id, client_secret 71 except ValueError as e: 72 raise CloudAuthError( 73 f"Failed to resolve credentials. Ensure credentials are provided " 74 f"via Authorization header (Bearer token), " 75 f"HTTP headers (X-Airbyte-Cloud-Client-Id, X-Airbyte-Cloud-Client-Secret), " 76 f"or environment variables. Error: {e}" 77 ) from e 78 79 80@mcp_tool( 81 read_only=True, 82 idempotent=True, 83 open_world=True, 84) 85def get_connection_state( 86 workspace_id: Annotated[ 87 str | WorkspaceAliasEnum, 88 Field( 89 description="The Airbyte Cloud workspace ID (UUID) or alias. " 90 "Aliases: '@devin-ai-sandbox'.", 91 ), 92 ], 93 connection_id: Annotated[ 94 str, 95 Field(description="The connection ID (UUID) to fetch state for."), 96 ], 97 stream_name: Annotated[ 98 str | None, 99 Field( 100 description="Optional stream name to filter state for a single stream. " 101 "When provided, only the matching stream's inner state blob is returned.", 102 default=None, 103 ), 104 ] = None, 105 stream_namespace: Annotated[ 106 str | None, 107 Field( 108 description="Optional stream namespace to narrow the stream filter. " 109 "Only used when stream_name is also provided.", 110 default=None, 111 ), 112 ] = None, 113 config_api_root: Annotated[ 114 str | None, 115 Field( 116 description="Optional API root URL override. " 117 "Defaults to Airbyte Cloud. " 118 "Use this to target local or self-hosted deployments.", 119 default=None, 120 ), 121 ] = None, 122 *, 123 ctx: Context, 124) -> dict[str, Any]: 125 """Get the current state for an Airbyte connection. 126 127 Returns the connection's sync state, which tracks progress for incremental syncs. 128 The state can be one of: stream (per-stream), global, legacy, or not_set. 129 130 When stream_name is provided, returns only the matching stream's inner state blob. 131 """ 132 resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id) 133 if resolved_workspace_id is None: 134 raise ValueError( 135 f"Invalid workspace ID or alias: {workspace_id!r}. " 136 f"Supported aliases: {', '.join(sorted(alias.name.lower().replace('_', '-') for alias in WorkspaceAliasEnum))}" 137 ) 138 bearer_token, client_id, client_secret = _resolve_cloud_auth(ctx) 139 140 conn = _get_cloud_connection( 141 workspace_id=resolved_workspace_id, 142 connection_id=connection_id, 143 api_root=config_api_root or constants.CLOUD_API_ROOT, 144 bearer_token=bearer_token, 145 client_id=client_id, 146 client_secret=client_secret, 147 ) 148 149 if stream_name is not None: 150 stream_state = conn.get_stream_state( 151 stream_name=stream_name, 152 stream_namespace=stream_namespace, 153 ) 154 return { 155 "connection_id": connection_id, 156 "stream_name": stream_name, 157 "stream_namespace": stream_namespace, 158 "stream_state": stream_state, 159 } 160 161 return conn.dump_raw_state() 162 163 164@mcp_tool( 165 read_only=True, 166 idempotent=True, 167 open_world=True, 168) 169def get_connection_catalog( 170 workspace_id: Annotated[ 171 str | WorkspaceAliasEnum, 172 Field( 173 description="The Airbyte Cloud workspace ID (UUID) or alias. " 174 "Aliases: '@devin-ai-sandbox'.", 175 ), 176 ], 177 connection_id: Annotated[ 178 str, 179 Field(description="The connection ID (UUID) to fetch catalog for."), 180 ], 181 config_api_root: Annotated[ 182 str | None, 183 Field( 184 description="Optional API root URL override. " 185 "Defaults to Airbyte Cloud. " 186 "Use this to target local or self-hosted deployments.", 187 default=None, 188 ), 189 ] = None, 190 *, 191 ctx: Context, 192) -> dict[str, Any]: 193 """Get the configured catalog for an Airbyte connection. 194 195 Returns the connection's configured catalog, which defines which streams 196 are synced, their sync modes, primary keys, and cursor fields. 197 """ 198 resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id) 199 if resolved_workspace_id is None: 200 raise ValueError( 201 f"Invalid workspace ID or alias: {workspace_id!r}. " 202 f"Supported aliases: {', '.join(sorted(alias.name.lower().replace('_', '-') for alias in WorkspaceAliasEnum))}" 203 ) 204 bearer_token, client_id, client_secret = _resolve_cloud_auth(ctx) 205 206 conn = _get_cloud_connection( 207 workspace_id=resolved_workspace_id, 208 connection_id=connection_id, 209 api_root=config_api_root or constants.CLOUD_API_ROOT, 210 bearer_token=bearer_token, 211 client_id=client_id, 212 client_secret=client_secret, 213 ) 214 215 result = conn.dump_raw_catalog() 216 if result is None: 217 raise ValueError("No configured catalog found for this connection.") 218 return result 219 220 221def register_connection_state_tools(app: FastMCP) -> None: 222 """Register connection state and catalog management tools with the FastMCP app.""" 223 register_mcp_tools(app, mcp_module=__name__)