airbyte_ops_mcp.mcp.connection_state

MCP tools for connection state and catalog management (read-only).

This module provides MCP tools for reading connection state and catalog in Airbyte Cloud. Connection state tracks the sync progress for each stream and is used for incremental syncs. The configured catalog controls which streams are synced and their sync configuration.

MCP reference

MCP primitives registered by the connection_state module of the airbyte-internal-ops server: 2 tool(s), 0 prompt(s), 0 resource(s).

Tools (2)

get_connection_catalog

Hints: read-only · idempotent · open-world

Get the configured catalog for an Airbyte connection.

Returns the connection's configured catalog, which defines which streams are synced, their sync modes, primary keys, and cursor fields.

Parameters:

Name Type Required Default Description
workspace_id string | enum("266ebdfe-0d7b-4540-9817-de7e4505ba61") yes The Airbyte Cloud workspace ID (UUID) or alias. Aliases: '@devin-ai-sandbox'.
connection_id string yes The connection ID (UUID) to fetch catalog for.
config_api_root string | null no null Optional API root URL override. Defaults to Airbyte Cloud. Use this to target local or self-hosted deployments.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "description": "Workspace ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@devin-ai-sandbox\") and its value\nis the actual workspace UUID. Use `WorkspaceAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
          "enum": [
            "266ebdfe-0d7b-4540-9817-de7e4505ba61"
          ],
          "type": "string"
        }
      ],
      "description": "The Airbyte Cloud workspace ID (UUID) or alias. Aliases: '@devin-ai-sandbox'."
    },
    "connection_id": {
      "description": "The connection ID (UUID) to fetch catalog for.",
      "type": "string"
    },
    "config_api_root": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional API root URL override. Defaults to Airbyte Cloud. Use this to target local or self-hosted deployments."
    }
  },
  "required": [
    "workspace_id",
    "connection_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "additionalProperties": true,
  "type": "object"
}

get_connection_state

Hints: read-only · idempotent · open-world

Get the current state for an Airbyte connection.

Returns the connection's sync state, which tracks progress for incremental syncs. The state can be one of: stream (per-stream), global, legacy, or not_set.

When stream_name is provided, returns only the matching stream's inner state blob.

Parameters:

Name Type Required Default Description
workspace_id string | enum("266ebdfe-0d7b-4540-9817-de7e4505ba61") yes The Airbyte Cloud workspace ID (UUID) or alias. Aliases: '@devin-ai-sandbox'.
connection_id string yes The connection ID (UUID) to fetch state for.
stream_name string | null no null Optional stream name to filter state for a single stream. When provided, only the matching stream's inner state blob is returned.
stream_namespace string | null no null Optional stream namespace to narrow the stream filter. Only used when stream_name is also provided.
config_api_root string | null no null Optional API root URL override. Defaults to Airbyte Cloud. Use this to target local or self-hosted deployments.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "description": "Workspace ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@devin-ai-sandbox\") and its value\nis the actual workspace UUID. Use `WorkspaceAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
          "enum": [
            "266ebdfe-0d7b-4540-9817-de7e4505ba61"
          ],
          "type": "string"
        }
      ],
      "description": "The Airbyte Cloud workspace ID (UUID) or alias. Aliases: '@devin-ai-sandbox'."
    },
    "connection_id": {
      "description": "The connection ID (UUID) to fetch state for.",
      "type": "string"
    },
    "stream_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional stream name to filter state for a single stream. When provided, only the matching stream's inner state blob is returned."
    },
    "stream_namespace": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional stream namespace to narrow the stream filter. Only used when stream_name is also provided."
    },
    "config_api_root": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional API root URL override. Defaults to Airbyte Cloud. Use this to target local or self-hosted deployments."
    }
  },
  "required": [
    "workspace_id",
    "connection_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "additionalProperties": true,
  "type": "object"
}

  1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
  2"""MCP tools for connection state and catalog management (read-only).
  3
  4This module provides MCP tools for reading connection state and catalog
  5in Airbyte Cloud. Connection state tracks the sync progress for each stream
  6and is used for incremental syncs. The configured catalog controls which
  7streams are synced and their sync configuration.
  8
  9## MCP reference
 10
 11.. include:: ../../../docs/mcp-generated/connection_state.md
 12    :start-line: 2
 13"""
 14
 15# NOTE: We intentionally do NOT use `from __future__ import annotations` here.
 16# FastMCP has issues resolving forward references when PEP 563 deferred annotations
 17# are used. See: https://github.com/jlowin/fastmcp/issues/905
 18# Python 3.12+ supports modern type hint syntax natively, so this is not needed.
 19
 20__all__: list[str] = []
 21
 22from typing import Annotated, Any
 23
 24from airbyte import constants
 25from airbyte.cloud.connections import CloudConnection
 26from airbyte.cloud.workspaces import CloudWorkspace
 27from airbyte.secrets.base import SecretString
 28from fastmcp import Context, FastMCP
 29from fastmcp_extensions import get_mcp_config, mcp_tool, register_mcp_tools
 30from pydantic import Field
 31
 32from airbyte_ops_mcp.cloud_admin.auth import CloudAuthError
 33from airbyte_ops_mcp.constants import ServerConfigKey, WorkspaceAliasEnum
 34
 35
 36def _get_cloud_connection(
 37    workspace_id: str,
 38    connection_id: str,
 39    api_root: str,
 40    bearer_token: str | None = None,
 41    client_id: str | None = None,
 42    client_secret: str | None = None,
 43) -> CloudConnection:
 44    """Create a CloudConnection from credentials."""
 45    workspace = CloudWorkspace(
 46        workspace_id=workspace_id,
 47        api_root=api_root,
 48        client_id=SecretString(client_id) if client_id else None,
 49        client_secret=SecretString(client_secret) if client_secret else None,
 50        bearer_token=SecretString(bearer_token) if bearer_token else None,
 51    )
 52    return workspace.get_connection(connection_id)
 53
 54
 55def _resolve_cloud_auth(
 56    ctx: Context,
 57) -> tuple[str | None, str | None, str | None]:
 58    """Resolve authentication credentials for API calls.
 59
 60    Returns:
 61        Tuple of (bearer_token, client_id, client_secret).
 62    """
 63    bearer_token = get_mcp_config(ctx, ServerConfigKey.BEARER_TOKEN)
 64    if bearer_token:
 65        return bearer_token, None, None
 66
 67    try:
 68        client_id = get_mcp_config(ctx, ServerConfigKey.CLIENT_ID)
 69        client_secret = get_mcp_config(ctx, ServerConfigKey.CLIENT_SECRET)
 70        return None, client_id, client_secret
 71    except ValueError as e:
 72        raise CloudAuthError(
 73            f"Failed to resolve credentials. Ensure credentials are provided "
 74            f"via Authorization header (Bearer token), "
 75            f"HTTP headers (X-Airbyte-Cloud-Client-Id, X-Airbyte-Cloud-Client-Secret), "
 76            f"or environment variables. Error: {e}"
 77        ) from e
 78
 79
 80@mcp_tool(
 81    read_only=True,
 82    idempotent=True,
 83    open_world=True,
 84)
 85def get_connection_state(
 86    workspace_id: Annotated[
 87        str | WorkspaceAliasEnum,
 88        Field(
 89            description="The Airbyte Cloud workspace ID (UUID) or alias. "
 90            "Aliases: '@devin-ai-sandbox'.",
 91        ),
 92    ],
 93    connection_id: Annotated[
 94        str,
 95        Field(description="The connection ID (UUID) to fetch state for."),
 96    ],
 97    stream_name: Annotated[
 98        str | None,
 99        Field(
100            description="Optional stream name to filter state for a single stream. "
101            "When provided, only the matching stream's inner state blob is returned.",
102            default=None,
103        ),
104    ] = None,
105    stream_namespace: Annotated[
106        str | None,
107        Field(
108            description="Optional stream namespace to narrow the stream filter. "
109            "Only used when stream_name is also provided.",
110            default=None,
111        ),
112    ] = None,
113    config_api_root: Annotated[
114        str | None,
115        Field(
116            description="Optional API root URL override. "
117            "Defaults to Airbyte Cloud. "
118            "Use this to target local or self-hosted deployments.",
119            default=None,
120        ),
121    ] = None,
122    *,
123    ctx: Context,
124) -> dict[str, Any]:
125    """Get the current state for an Airbyte connection.
126
127    Returns the connection's sync state, which tracks progress for incremental syncs.
128    The state can be one of: stream (per-stream), global, legacy, or not_set.
129
130    When stream_name is provided, returns only the matching stream's inner state blob.
131    """
132    resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id)
133    if resolved_workspace_id is None:
134        raise ValueError(
135            f"Invalid workspace ID or alias: {workspace_id!r}. "
136            f"Supported aliases: {', '.join(sorted(alias.name.lower().replace('_', '-') for alias in WorkspaceAliasEnum))}"
137        )
138    bearer_token, client_id, client_secret = _resolve_cloud_auth(ctx)
139
140    conn = _get_cloud_connection(
141        workspace_id=resolved_workspace_id,
142        connection_id=connection_id,
143        api_root=config_api_root or constants.CLOUD_API_ROOT,
144        bearer_token=bearer_token,
145        client_id=client_id,
146        client_secret=client_secret,
147    )
148
149    if stream_name is not None:
150        stream_state = conn.get_stream_state(
151            stream_name=stream_name,
152            stream_namespace=stream_namespace,
153        )
154        return {
155            "connection_id": connection_id,
156            "stream_name": stream_name,
157            "stream_namespace": stream_namespace,
158            "stream_state": stream_state,
159        }
160
161    return conn.dump_raw_state()
162
163
164@mcp_tool(
165    read_only=True,
166    idempotent=True,
167    open_world=True,
168)
169def get_connection_catalog(
170    workspace_id: Annotated[
171        str | WorkspaceAliasEnum,
172        Field(
173            description="The Airbyte Cloud workspace ID (UUID) or alias. "
174            "Aliases: '@devin-ai-sandbox'.",
175        ),
176    ],
177    connection_id: Annotated[
178        str,
179        Field(description="The connection ID (UUID) to fetch catalog for."),
180    ],
181    config_api_root: Annotated[
182        str | None,
183        Field(
184            description="Optional API root URL override. "
185            "Defaults to Airbyte Cloud. "
186            "Use this to target local or self-hosted deployments.",
187            default=None,
188        ),
189    ] = None,
190    *,
191    ctx: Context,
192) -> dict[str, Any]:
193    """Get the configured catalog for an Airbyte connection.
194
195    Returns the connection's configured catalog, which defines which streams
196    are synced, their sync modes, primary keys, and cursor fields.
197    """
198    resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id)
199    if resolved_workspace_id is None:
200        raise ValueError(
201            f"Invalid workspace ID or alias: {workspace_id!r}. "
202            f"Supported aliases: {', '.join(sorted(alias.name.lower().replace('_', '-') for alias in WorkspaceAliasEnum))}"
203        )
204    bearer_token, client_id, client_secret = _resolve_cloud_auth(ctx)
205
206    conn = _get_cloud_connection(
207        workspace_id=resolved_workspace_id,
208        connection_id=connection_id,
209        api_root=config_api_root or constants.CLOUD_API_ROOT,
210        bearer_token=bearer_token,
211        client_id=client_id,
212        client_secret=client_secret,
213    )
214
215    result = conn.dump_raw_catalog()
216    if result is None:
217        raise ValueError("No configured catalog found for this connection.")
218    return result
219
220
221def register_connection_state_tools(app: FastMCP) -> None:
222    """Register connection state and catalog management tools with the FastMCP app."""
223    register_mcp_tools(app, mcp_module=__name__)