airbyte_ops_mcp.mcp.connection_state
MCP tools for connection state and catalog management (read-only).
This module provides MCP tools for reading connection state and catalog in Airbyte Cloud. Connection state tracks the sync progress for each stream and is used for incremental syncs. The configured catalog controls which streams are synced and their sync configuration.
MCP reference
MCP primitives registered by the connection_state module of the airbyte-internal-ops server: 2 tool(s), 0 prompt(s), 0 resource(s).
Tools (2)
get_connection_catalog
Hints: read-only · idempotent · open-world
Get the configured catalog for an Airbyte connection.
Returns the connection's configured catalog, which defines which streams are synced, their sync modes, primary keys, and cursor fields.
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
workspace_id |
string | enum("266ebdfe-0d7b-4540-9817-de7e4505ba61") |
yes | — | The Airbyte Cloud workspace ID (UUID) or alias. Aliases: '@devin-ai-sandbox'. |
connection_id |
string |
yes | — | The connection ID (UUID) to fetch catalog for. |
config_api_root |
string | null |
no | null |
Optional API root URL override. Defaults to Airbyte Cloud. Use this to target local or self-hosted deployments. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"description": "Workspace ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@devin-ai-sandbox\") and its value\nis the actual workspace UUID. Use `WorkspaceAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
"enum": [
"266ebdfe-0d7b-4540-9817-de7e4505ba61"
],
"type": "string"
}
],
"description": "The Airbyte Cloud workspace ID (UUID) or alias. Aliases: '@devin-ai-sandbox'."
},
"connection_id": {
"description": "The connection ID (UUID) to fetch catalog for.",
"type": "string"
},
"config_api_root": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional API root URL override. Defaults to Airbyte Cloud. Use this to target local or self-hosted deployments."
}
},
"required": [
"workspace_id",
"connection_id"
],
"type": "object"
}
Show output JSON schema
{
"additionalProperties": true,
"type": "object"
}
get_connection_state
Hints: read-only · idempotent · open-world
Get the current state for an Airbyte connection.
Returns the connection's sync state in Airbyte protocol format (snake_case). The state can be one of: stream (per-stream), global, legacy, or not_set.
When stream_name is provided, returns a dict with connection_id,
stream_name, stream_namespace, and stream_state keys.
Otherwise returns the full state as a list of AirbyteStateMessage dicts.
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
workspace_id |
string | enum("266ebdfe-0d7b-4540-9817-de7e4505ba61") |
yes | — | The Airbyte Cloud workspace ID (UUID) or alias. Aliases: '@devin-ai-sandbox'. |
connection_id |
string |
yes | — | The connection ID (UUID) to fetch state for. |
stream_name |
string | null |
no | null |
Optional stream name to filter state for a single stream. When provided, only the matching stream's inner state blob is returned. |
stream_namespace |
string | null |
no | null |
Optional stream namespace to narrow the stream filter. Only used when stream_name is also provided. |
config_api_root |
string | null |
no | null |
Optional API root URL override. Defaults to Airbyte Cloud. Use this to target local or self-hosted deployments. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"description": "Workspace ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@devin-ai-sandbox\") and its value\nis the actual workspace UUID. Use `WorkspaceAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
"enum": [
"266ebdfe-0d7b-4540-9817-de7e4505ba61"
],
"type": "string"
}
],
"description": "The Airbyte Cloud workspace ID (UUID) or alias. Aliases: '@devin-ai-sandbox'."
},
"connection_id": {
"description": "The connection ID (UUID) to fetch state for.",
"type": "string"
},
"stream_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional stream name to filter state for a single stream. When provided, only the matching stream's inner state blob is returned."
},
"stream_namespace": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional stream namespace to narrow the stream filter. Only used when stream_name is also provided."
},
"config_api_root": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional API root URL override. Defaults to Airbyte Cloud. Use this to target local or self-hosted deployments."
}
},
"required": [
"workspace_id",
"connection_id"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"anyOf": [
{
"additionalProperties": true,
"type": "object"
},
{
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
}
]
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2"""MCP tools for connection state and catalog management (read-only). 3 4This module provides MCP tools for reading connection state and catalog 5in Airbyte Cloud. Connection state tracks the sync progress for each stream 6and is used for incremental syncs. The configured catalog controls which 7streams are synced and their sync configuration. 8 9## MCP reference 10 11.. include:: ../../../docs/mcp-generated/connection_state.md 12 :start-line: 2 13""" 14 15# NOTE: We intentionally do NOT use `from __future__ import annotations` here. 16# FastMCP has issues resolving forward references when PEP 563 deferred annotations 17# are used. See: https://github.com/jlowin/fastmcp/issues/905 18# Python 3.12+ supports modern type hint syntax natively, so this is not needed. 19 20__all__: list[str] = [] 21 22from typing import Annotated, Any 23 24from airbyte import constants 25from airbyte.cloud.connections import CloudConnection 26from airbyte.cloud.workspaces import CloudWorkspace 27from airbyte.secrets.base import SecretString 28from fastmcp import Context, FastMCP 29from fastmcp_extensions import get_mcp_config, mcp_tool, register_mcp_tools 30from pydantic import Field 31 32from airbyte_ops_mcp.cloud_admin.auth import CloudAuthError 33from airbyte_ops_mcp.constants import ServerConfigKey, WorkspaceAliasEnum 34 35 36def _get_cloud_connection( 37 workspace_id: str, 38 connection_id: str, 39 api_root: str, 40 bearer_token: str | None = None, 41 client_id: str | None = None, 42 client_secret: str | None = None, 43) -> CloudConnection: 44 """Create a CloudConnection from credentials.""" 45 workspace = CloudWorkspace( 46 workspace_id=workspace_id, 47 api_root=api_root, 48 client_id=SecretString(client_id) if client_id else None, 49 client_secret=SecretString(client_secret) if client_secret else None, 50 bearer_token=SecretString(bearer_token) if bearer_token else None, 51 ) 52 return workspace.get_connection(connection_id) 53 54 55def _resolve_cloud_auth( 56 ctx: Context, 57) -> tuple[str | None, str | None, str | None]: 58 """Resolve authentication credentials for API calls. 59 60 Returns: 61 Tuple of (bearer_token, client_id, client_secret). 62 """ 63 bearer_token = get_mcp_config(ctx, ServerConfigKey.BEARER_TOKEN) 64 if bearer_token: 65 return bearer_token, None, None 66 67 try: 68 client_id = get_mcp_config(ctx, ServerConfigKey.CLIENT_ID) 69 client_secret = get_mcp_config(ctx, ServerConfigKey.CLIENT_SECRET) 70 return None, client_id, client_secret 71 except ValueError as e: 72 raise CloudAuthError( 73 f"Failed to resolve credentials. Ensure credentials are provided " 74 f"via Authorization header (Bearer token), " 75 f"HTTP headers (X-Airbyte-Cloud-Client-Id, X-Airbyte-Cloud-Client-Secret), " 76 f"or environment variables. Error: {e}" 77 ) from e 78 79 80@mcp_tool( 81 read_only=True, 82 idempotent=True, 83 open_world=True, 84) 85def get_connection_state( 86 workspace_id: Annotated[ 87 str | WorkspaceAliasEnum, 88 Field( 89 description="The Airbyte Cloud workspace ID (UUID) or alias. " 90 "Aliases: '@devin-ai-sandbox'.", 91 ), 92 ], 93 connection_id: Annotated[ 94 str, 95 Field(description="The connection ID (UUID) to fetch state for."), 96 ], 97 stream_name: Annotated[ 98 str | None, 99 Field( 100 description="Optional stream name to filter state for a single stream. " 101 "When provided, only the matching stream's inner state blob is returned.", 102 default=None, 103 ), 104 ] = None, 105 stream_namespace: Annotated[ 106 str | None, 107 Field( 108 description="Optional stream namespace to narrow the stream filter. " 109 "Only used when stream_name is also provided.", 110 default=None, 111 ), 112 ] = None, 113 config_api_root: Annotated[ 114 str | None, 115 Field( 116 description="Optional API root URL override. " 117 "Defaults to Airbyte Cloud. " 118 "Use this to target local or self-hosted deployments.", 119 default=None, 120 ), 121 ] = None, 122 *, 123 ctx: Context, 124) -> dict[str, Any] | list[dict[str, Any]]: 125 """Get the current state for an Airbyte connection. 126 127 Returns the connection's sync state in Airbyte protocol format (snake_case). 128 The state can be one of: stream (per-stream), global, legacy, or not_set. 129 130 When `stream_name` is provided, returns a dict with `connection_id`, 131 `stream_name`, `stream_namespace`, and `stream_state` keys. 132 Otherwise returns the full state as a list of `AirbyteStateMessage` dicts. 133 """ 134 resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id) 135 if resolved_workspace_id is None: 136 raise ValueError( 137 f"Invalid workspace ID or alias: {workspace_id!r}. " 138 f"Supported aliases: {', '.join(sorted(alias.name.lower().replace('_', '-') for alias in WorkspaceAliasEnum))}" 139 ) 140 bearer_token, client_id, client_secret = _resolve_cloud_auth(ctx) 141 142 conn = _get_cloud_connection( 143 workspace_id=resolved_workspace_id, 144 connection_id=connection_id, 145 api_root=config_api_root or constants.CLOUD_API_ROOT, 146 bearer_token=bearer_token, 147 client_id=client_id, 148 client_secret=client_secret, 149 ) 150 151 if stream_name is not None: 152 stream_state = conn.get_stream_state( 153 stream_name=stream_name, 154 stream_namespace=stream_namespace, 155 ) 156 return { 157 "connection_id": connection_id, 158 "stream_name": stream_name, 159 "stream_namespace": stream_namespace, 160 "stream_state": stream_state, 161 } 162 163 return conn.dump_raw_state() 164 165 166@mcp_tool( 167 read_only=True, 168 idempotent=True, 169 open_world=True, 170) 171def get_connection_catalog( 172 workspace_id: Annotated[ 173 str | WorkspaceAliasEnum, 174 Field( 175 description="The Airbyte Cloud workspace ID (UUID) or alias. " 176 "Aliases: '@devin-ai-sandbox'.", 177 ), 178 ], 179 connection_id: Annotated[ 180 str, 181 Field(description="The connection ID (UUID) to fetch catalog for."), 182 ], 183 config_api_root: Annotated[ 184 str | None, 185 Field( 186 description="Optional API root URL override. " 187 "Defaults to Airbyte Cloud. " 188 "Use this to target local or self-hosted deployments.", 189 default=None, 190 ), 191 ] = None, 192 *, 193 ctx: Context, 194) -> dict[str, Any]: 195 """Get the configured catalog for an Airbyte connection. 196 197 Returns the connection's configured catalog, which defines which streams 198 are synced, their sync modes, primary keys, and cursor fields. 199 """ 200 resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id) 201 if resolved_workspace_id is None: 202 raise ValueError( 203 f"Invalid workspace ID or alias: {workspace_id!r}. " 204 f"Supported aliases: {', '.join(sorted(alias.name.lower().replace('_', '-') for alias in WorkspaceAliasEnum))}" 205 ) 206 bearer_token, client_id, client_secret = _resolve_cloud_auth(ctx) 207 208 conn = _get_cloud_connection( 209 workspace_id=resolved_workspace_id, 210 connection_id=connection_id, 211 api_root=config_api_root or constants.CLOUD_API_ROOT, 212 bearer_token=bearer_token, 213 client_id=client_id, 214 client_secret=client_secret, 215 ) 216 217 result = conn.dump_raw_catalog() 218 if result is None: 219 raise ValueError("No configured catalog found for this connection.") 220 return result 221 222 223def register_connection_state_tools(app: FastMCP) -> None: 224 """Register connection state and catalog management tools with the FastMCP app.""" 225 register_mcp_tools(app, mcp_module=__name__)