airbyte_ops_mcp.mcp.connection_state

MCP tools for connection state and catalog management (read-only).

This module provides MCP tools for reading connection state and catalog in Airbyte Cloud. Connection state tracks the sync progress for each stream and is used for incremental syncs. The configured catalog controls which streams are synced and their sync configuration.

MCP reference

MCP primitives registered by the connection_state module of the airbyte-internal-ops server: 2 tool(s), 0 prompt(s), 0 resource(s).

Tools (2)

get_connection_catalog

Hints: read-only · idempotent · open-world

Get the configured catalog for an Airbyte connection.

Returns the connection's configured catalog, which defines which streams are synced, their sync modes, primary keys, and cursor fields.

Parameters:

Name Type Required Default Description
workspace_id string | enum("266ebdfe-0d7b-4540-9817-de7e4505ba61") yes The Airbyte Cloud workspace ID (UUID) or alias. Aliases: '@devin-ai-sandbox'.
connection_id string yes The connection ID (UUID) to fetch catalog for.
config_api_root string | null no null Optional API root URL override. Defaults to Airbyte Cloud. Use this to target local or self-hosted deployments.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "description": "Workspace ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@devin-ai-sandbox\") and its value\nis the actual workspace UUID. Use `WorkspaceAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
          "enum": [
            "266ebdfe-0d7b-4540-9817-de7e4505ba61"
          ],
          "type": "string"
        }
      ],
      "description": "The Airbyte Cloud workspace ID (UUID) or alias. Aliases: '@devin-ai-sandbox'."
    },
    "connection_id": {
      "description": "The connection ID (UUID) to fetch catalog for.",
      "type": "string"
    },
    "config_api_root": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional API root URL override. Defaults to Airbyte Cloud. Use this to target local or self-hosted deployments."
    }
  },
  "required": [
    "workspace_id",
    "connection_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "additionalProperties": true,
  "type": "object"
}

get_connection_state

Hints: read-only · idempotent · open-world

Get the current state for an Airbyte connection.

Returns the connection's sync state in Airbyte protocol format (snake_case). The state can be one of: stream (per-stream), global, legacy, or not_set.

When stream_name is provided, returns a dict with connection_id, stream_name, stream_namespace, and stream_state keys. Otherwise returns the full state as a list of AirbyteStateMessage dicts.

Parameters:

Name Type Required Default Description
workspace_id string | enum("266ebdfe-0d7b-4540-9817-de7e4505ba61") yes The Airbyte Cloud workspace ID (UUID) or alias. Aliases: '@devin-ai-sandbox'.
connection_id string yes The connection ID (UUID) to fetch state for.
stream_name string | null no null Optional stream name to filter state for a single stream. When provided, only the matching stream's inner state blob is returned.
stream_namespace string | null no null Optional stream namespace to narrow the stream filter. Only used when stream_name is also provided.
config_api_root string | null no null Optional API root URL override. Defaults to Airbyte Cloud. Use this to target local or self-hosted deployments.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "description": "Workspace ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@devin-ai-sandbox\") and its value\nis the actual workspace UUID. Use `WorkspaceAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
          "enum": [
            "266ebdfe-0d7b-4540-9817-de7e4505ba61"
          ],
          "type": "string"
        }
      ],
      "description": "The Airbyte Cloud workspace ID (UUID) or alias. Aliases: '@devin-ai-sandbox'."
    },
    "connection_id": {
      "description": "The connection ID (UUID) to fetch state for.",
      "type": "string"
    },
    "stream_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional stream name to filter state for a single stream. When provided, only the matching stream's inner state blob is returned."
    },
    "stream_namespace": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional stream namespace to narrow the stream filter. Only used when stream_name is also provided."
    },
    "config_api_root": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional API root URL override. Defaults to Airbyte Cloud. Use this to target local or self-hosted deployments."
    }
  },
  "required": [
    "workspace_id",
    "connection_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "anyOf": [
        {
          "additionalProperties": true,
          "type": "object"
        },
        {
          "items": {
            "additionalProperties": true,
            "type": "object"
          },
          "type": "array"
        }
      ]
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

  1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
  2"""MCP tools for connection state and catalog management (read-only).
  3
  4This module provides MCP tools for reading connection state and catalog
  5in Airbyte Cloud. Connection state tracks the sync progress for each stream
  6and is used for incremental syncs. The configured catalog controls which
  7streams are synced and their sync configuration.
  8
  9## MCP reference
 10
 11.. include:: ../../../docs/mcp-generated/connection_state.md
 12    :start-line: 2
 13"""
 14
 15# NOTE: We intentionally do NOT use `from __future__ import annotations` here.
 16# FastMCP has issues resolving forward references when PEP 563 deferred annotations
 17# are used. See: https://github.com/jlowin/fastmcp/issues/905
 18# Python 3.12+ supports modern type hint syntax natively, so this is not needed.
 19
 20__all__: list[str] = []
 21
 22from typing import Annotated, Any
 23
 24from airbyte import constants
 25from airbyte.cloud.connections import CloudConnection
 26from airbyte.cloud.workspaces import CloudWorkspace
 27from airbyte.secrets.base import SecretString
 28from fastmcp import Context, FastMCP
 29from fastmcp_extensions import get_mcp_config, mcp_tool, register_mcp_tools
 30from pydantic import Field
 31
 32from airbyte_ops_mcp.cloud_admin.auth import CloudAuthError
 33from airbyte_ops_mcp.constants import ServerConfigKey, WorkspaceAliasEnum
 34
 35
 36def _get_cloud_connection(
 37    workspace_id: str,
 38    connection_id: str,
 39    api_root: str,
 40    bearer_token: str | None = None,
 41    client_id: str | None = None,
 42    client_secret: str | None = None,
 43) -> CloudConnection:
 44    """Create a CloudConnection from credentials."""
 45    workspace = CloudWorkspace(
 46        workspace_id=workspace_id,
 47        api_root=api_root,
 48        client_id=SecretString(client_id) if client_id else None,
 49        client_secret=SecretString(client_secret) if client_secret else None,
 50        bearer_token=SecretString(bearer_token) if bearer_token else None,
 51    )
 52    return workspace.get_connection(connection_id)
 53
 54
 55def _resolve_cloud_auth(
 56    ctx: Context,
 57) -> tuple[str | None, str | None, str | None]:
 58    """Resolve authentication credentials for API calls.
 59
 60    Returns:
 61        Tuple of (bearer_token, client_id, client_secret).
 62    """
 63    bearer_token = get_mcp_config(ctx, ServerConfigKey.BEARER_TOKEN)
 64    if bearer_token:
 65        return bearer_token, None, None
 66
 67    try:
 68        client_id = get_mcp_config(ctx, ServerConfigKey.CLIENT_ID)
 69        client_secret = get_mcp_config(ctx, ServerConfigKey.CLIENT_SECRET)
 70        return None, client_id, client_secret
 71    except ValueError as e:
 72        raise CloudAuthError(
 73            f"Failed to resolve credentials. Ensure credentials are provided "
 74            f"via Authorization header (Bearer token), "
 75            f"HTTP headers (X-Airbyte-Cloud-Client-Id, X-Airbyte-Cloud-Client-Secret), "
 76            f"or environment variables. Error: {e}"
 77        ) from e
 78
 79
 80@mcp_tool(
 81    read_only=True,
 82    idempotent=True,
 83    open_world=True,
 84)
 85def get_connection_state(
 86    workspace_id: Annotated[
 87        str | WorkspaceAliasEnum,
 88        Field(
 89            description="The Airbyte Cloud workspace ID (UUID) or alias. "
 90            "Aliases: '@devin-ai-sandbox'.",
 91        ),
 92    ],
 93    connection_id: Annotated[
 94        str,
 95        Field(description="The connection ID (UUID) to fetch state for."),
 96    ],
 97    stream_name: Annotated[
 98        str | None,
 99        Field(
100            description="Optional stream name to filter state for a single stream. "
101            "When provided, only the matching stream's inner state blob is returned.",
102            default=None,
103        ),
104    ] = None,
105    stream_namespace: Annotated[
106        str | None,
107        Field(
108            description="Optional stream namespace to narrow the stream filter. "
109            "Only used when stream_name is also provided.",
110            default=None,
111        ),
112    ] = None,
113    config_api_root: Annotated[
114        str | None,
115        Field(
116            description="Optional API root URL override. "
117            "Defaults to Airbyte Cloud. "
118            "Use this to target local or self-hosted deployments.",
119            default=None,
120        ),
121    ] = None,
122    *,
123    ctx: Context,
124) -> dict[str, Any] | list[dict[str, Any]]:
125    """Get the current state for an Airbyte connection.
126
127    Returns the connection's sync state in Airbyte protocol format (snake_case).
128    The state can be one of: stream (per-stream), global, legacy, or not_set.
129
130    When `stream_name` is provided, returns a dict with `connection_id`,
131    `stream_name`, `stream_namespace`, and `stream_state` keys.
132    Otherwise returns the full state as a list of `AirbyteStateMessage` dicts.
133    """
134    resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id)
135    if resolved_workspace_id is None:
136        raise ValueError(
137            f"Invalid workspace ID or alias: {workspace_id!r}. "
138            f"Supported aliases: {', '.join(sorted(alias.name.lower().replace('_', '-') for alias in WorkspaceAliasEnum))}"
139        )
140    bearer_token, client_id, client_secret = _resolve_cloud_auth(ctx)
141
142    conn = _get_cloud_connection(
143        workspace_id=resolved_workspace_id,
144        connection_id=connection_id,
145        api_root=config_api_root or constants.CLOUD_API_ROOT,
146        bearer_token=bearer_token,
147        client_id=client_id,
148        client_secret=client_secret,
149    )
150
151    if stream_name is not None:
152        stream_state = conn.get_stream_state(
153            stream_name=stream_name,
154            stream_namespace=stream_namespace,
155        )
156        return {
157            "connection_id": connection_id,
158            "stream_name": stream_name,
159            "stream_namespace": stream_namespace,
160            "stream_state": stream_state,
161        }
162
163    return conn.dump_raw_state()
164
165
166@mcp_tool(
167    read_only=True,
168    idempotent=True,
169    open_world=True,
170)
171def get_connection_catalog(
172    workspace_id: Annotated[
173        str | WorkspaceAliasEnum,
174        Field(
175            description="The Airbyte Cloud workspace ID (UUID) or alias. "
176            "Aliases: '@devin-ai-sandbox'.",
177        ),
178    ],
179    connection_id: Annotated[
180        str,
181        Field(description="The connection ID (UUID) to fetch catalog for."),
182    ],
183    config_api_root: Annotated[
184        str | None,
185        Field(
186            description="Optional API root URL override. "
187            "Defaults to Airbyte Cloud. "
188            "Use this to target local or self-hosted deployments.",
189            default=None,
190        ),
191    ] = None,
192    *,
193    ctx: Context,
194) -> dict[str, Any]:
195    """Get the configured catalog for an Airbyte connection.
196
197    Returns the connection's configured catalog, which defines which streams
198    are synced, their sync modes, primary keys, and cursor fields.
199    """
200    resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id)
201    if resolved_workspace_id is None:
202        raise ValueError(
203            f"Invalid workspace ID or alias: {workspace_id!r}. "
204            f"Supported aliases: {', '.join(sorted(alias.name.lower().replace('_', '-') for alias in WorkspaceAliasEnum))}"
205        )
206    bearer_token, client_id, client_secret = _resolve_cloud_auth(ctx)
207
208    conn = _get_cloud_connection(
209        workspace_id=resolved_workspace_id,
210        connection_id=connection_id,
211        api_root=config_api_root or constants.CLOUD_API_ROOT,
212        bearer_token=bearer_token,
213        client_id=client_id,
214        client_secret=client_secret,
215    )
216
217    result = conn.dump_raw_catalog()
218    if result is None:
219        raise ValueError("No configured catalog found for this connection.")
220    return result
221
222
223def register_connection_state_tools(app: FastMCP) -> None:
224    """Register connection state and catalog management tools with the FastMCP app."""
225    register_mcp_tools(app, mcp_module=__name__)