airbyte_ops_mcp.mcp.connection_medic

Emergency MCP tools for connection state and catalog writes.

These tools are destructive "break glass" operations for modifying connection state and catalog in Airbyte Cloud. They are only registered when the AIRBYTE_OPS_MEDIC_MODE environment variable is set to 1 or true.

  1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
  2"""Emergency MCP tools for connection state and catalog writes.
  3
  4These tools are destructive "break glass" operations for modifying
  5connection state and catalog in Airbyte Cloud. They are only registered
  6when the `AIRBYTE_OPS_MEDIC_MODE` environment variable is set to `1` or `true`.
  7"""
  8
  9# NOTE: We intentionally do NOT use `from __future__ import annotations` here.
 10# FastMCP has issues resolving forward references when PEP 563 deferred annotations
 11# are used. See: https://github.com/jlowin/fastmcp/issues/905
 12# Python 3.12+ supports modern type hint syntax natively, so this is not needed.
 13
 14import json
 15import os
 16from typing import Annotated, Any
 17
 18from airbyte import constants
 19from fastmcp import Context, FastMCP
 20from fastmcp_extensions import get_mcp_config, mcp_tool, register_mcp_tools
 21from pydantic import Field
 22
 23from airbyte_ops_mcp.cloud_admin.auth import CloudAuthError
 24from airbyte_ops_mcp.constants import (
 25    MEDIC_MODE_ENV_VAR,
 26    ServerConfigKey,
 27    WorkspaceAliasEnum,
 28)
 29from airbyte_ops_mcp.mcp.connection_state import _get_cloud_connection
 30
 31
 32def _is_medic_mode_enabled() -> bool:
 33    return os.getenv(MEDIC_MODE_ENV_VAR, "").lower() in ("1", "true")
 34
 35
 36def _resolve_cloud_auth(
 37    ctx: Context,
 38) -> tuple[str | None, str | None, str | None]:
 39    """Resolve authentication credentials for API calls.
 40
 41    Returns:
 42        Tuple of (bearer_token, client_id, client_secret).
 43    """
 44    bearer_token = get_mcp_config(ctx, ServerConfigKey.BEARER_TOKEN)
 45    if bearer_token:
 46        return bearer_token, None, None
 47
 48    try:
 49        client_id = get_mcp_config(ctx, ServerConfigKey.CLIENT_ID)
 50        client_secret = get_mcp_config(ctx, ServerConfigKey.CLIENT_SECRET)
 51        return None, client_id, client_secret
 52    except ValueError as e:
 53        raise CloudAuthError(
 54            f"Failed to resolve credentials. Ensure credentials are provided "
 55            f"via Authorization header (Bearer token), "
 56            f"HTTP headers (X-Airbyte-Cloud-Client-Id, X-Airbyte-Cloud-Client-Secret), "
 57            f"or environment variables. Error: {e}"
 58        ) from e
 59
 60
 61@mcp_tool(
 62    destructive=True,
 63    idempotent=False,
 64    open_world=True,
 65)
 66def update_connection_state(
 67    workspace_id: Annotated[
 68        str | WorkspaceAliasEnum,
 69        Field(
 70            description="The Airbyte Cloud workspace ID (UUID) or alias. "
 71            "Aliases: '@devin-ai-sandbox'.",
 72        ),
 73    ],
 74    connection_id: Annotated[
 75        str,
 76        Field(description="The connection ID (UUID) to update state for."),
 77    ],
 78    connection_state_json: Annotated[
 79        str,
 80        Field(
 81            description="The connection state as a JSON string. Must include: "
 82            "'stateType' (one of: 'global', 'stream', 'legacy'), "
 83            "and one of: 'state' (for legacy), 'streamState' (for stream), "
 84            "'globalState' (for global). "
 85            "Tip: Use get_connection_state first to see the current format."
 86        ),
 87    ],
 88    config_api_root: Annotated[
 89        str | None,
 90        Field(
 91            description="Optional API root URL override. "
 92            "Defaults to Airbyte Cloud. "
 93            "Use this to target local or self-hosted deployments.",
 94            default=None,
 95        ),
 96    ] = None,
 97    *,
 98    ctx: Context,
 99) -> dict[str, Any]:
100    """Update the full state for an Airbyte connection.
101
102    WARNING: This is a destructive emergency operation. Use with caution.
103    Uses the safe variant that prevents updates while a sync is running (HTTP 423).
104    Get the current state first with get_connection_state, modify it, then pass
105    the full state object back here.
106    """
107    resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id)
108    if resolved_workspace_id is None:
109        raise ValueError(
110            f"Unable to resolve workspace_id {workspace_id!r} to a concrete workspace ID. "
111            "Ensure you provided a valid UUID or supported alias."
112        )
113    bearer_token, client_id, client_secret = _resolve_cloud_auth(ctx)
114
115    conn = _get_cloud_connection(
116        workspace_id=resolved_workspace_id,
117        connection_id=connection_id,
118        api_root=config_api_root or constants.CLOUD_API_ROOT,
119        bearer_token=bearer_token,
120        client_id=client_id,
121        client_secret=client_secret,
122    )
123
124    try:
125        connection_state = json.loads(connection_state_json)
126    except json.JSONDecodeError as e:
127        raise ValueError(
128            "Invalid JSON for parameter 'connection_state_json'. "
129            "Expected a JSON object representing Airbyte connection state "
130            "(for example, an object with a 'stateType' field and related state data)."
131        ) from e
132    conn.import_raw_state(connection_state)
133    return conn.dump_raw_state()
134
135
136@mcp_tool(
137    destructive=True,
138    idempotent=False,
139    open_world=True,
140)
141def update_stream_state(
142    workspace_id: Annotated[
143        str | WorkspaceAliasEnum,
144        Field(
145            description="The Airbyte Cloud workspace ID (UUID) or alias. "
146            "Aliases: '@devin-ai-sandbox'.",
147        ),
148    ],
149    connection_id: Annotated[
150        str,
151        Field(description="The connection ID (UUID) to update state for."),
152    ],
153    stream_name: Annotated[
154        str,
155        Field(description="The name of the stream to update state for."),
156    ],
157    stream_state_json: Annotated[
158        str,
159        Field(
160            description="The state blob for this stream as a JSON string. "
161            "This is the inner state object (e.g., {'cursor': '2024-01-01'}), "
162            "not the full connection state. "
163            "Tip: Use get_connection_state with stream_name first to see the current format."
164        ),
165    ],
166    stream_namespace: Annotated[
167        str | None,
168        Field(
169            description="Optional stream namespace to identify the stream.",
170            default=None,
171        ),
172    ] = None,
173    config_api_root: Annotated[
174        str | None,
175        Field(
176            description="Optional API root URL override. "
177            "Defaults to Airbyte Cloud. "
178            "Use this to target local or self-hosted deployments.",
179            default=None,
180        ),
181    ] = None,
182    *,
183    ctx: Context,
184) -> dict[str, Any]:
185    """Update the state for a single stream within a connection.
186
187    WARNING: This is a destructive emergency operation. Use with caution.
188    Fetches the current full state, replaces only the specified stream's state,
189    then sends the updated state back. If the stream doesn't exist, it is appended.
190    Uses the safe variant that prevents updates while a sync is running (HTTP 423).
191    """
192    resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id)
193    if resolved_workspace_id is None:
194        raise ValueError(
195            f"Failed to resolve workspace_id '{workspace_id}' to a concrete workspace ID. "
196            "Use a valid workspace UUID or a supported alias."
197        )
198    bearer_token, client_id, client_secret = _resolve_cloud_auth(ctx)
199
200    conn = _get_cloud_connection(
201        workspace_id=resolved_workspace_id,
202        connection_id=connection_id,
203        api_root=config_api_root or constants.CLOUD_API_ROOT,
204        bearer_token=bearer_token,
205        client_id=client_id,
206        client_secret=client_secret,
207    )
208
209    try:
210        stream_state_dict = json.loads(stream_state_json)
211    except json.JSONDecodeError as exc:
212        raise ValueError(
213            "Invalid JSON provided for 'stream_state_json'. "
214            "Please supply a valid JSON object representing the stream state."
215        ) from exc
216    conn.set_stream_state(
217        stream_name=stream_name,
218        state_blob_dict=stream_state_dict,
219        stream_namespace=stream_namespace,
220    )
221    return conn.dump_raw_state()
222
223
224@mcp_tool(
225    destructive=True,
226    idempotent=False,
227    open_world=True,
228)
229def update_connection_catalog(
230    workspace_id: Annotated[
231        str | WorkspaceAliasEnum,
232        Field(
233            description="The Airbyte Cloud workspace ID (UUID) or alias. "
234            "Aliases: '@devin-ai-sandbox'.",
235        ),
236    ],
237    connection_id: Annotated[
238        str,
239        Field(description="The connection ID (UUID) to update catalog for."),
240    ],
241    configured_catalog_json: Annotated[
242        str,
243        Field(
244            description="The configured catalog as a JSON string. "
245            "This replaces the entire configured catalog for the connection. "
246            "Tip: Use get_connection_catalog first to see the current format."
247        ),
248    ],
249    config_api_root: Annotated[
250        str | None,
251        Field(
252            description="Optional API root URL override. "
253            "Defaults to Airbyte Cloud. "
254            "Use this to target local or self-hosted deployments.",
255            default=None,
256        ),
257    ] = None,
258    *,
259    ctx: Context,
260) -> dict[str, Any]:
261    """Replace the configured catalog for an Airbyte connection.
262
263    WARNING: This is a destructive emergency operation. Use with extreme caution.
264    This replaces the entire configured catalog, which controls which streams
265    are synced, their sync modes, primary keys, and cursor fields.
266    Get the current catalog first, modify it, then pass the full catalog back here.
267    """
268    resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id)
269    if resolved_workspace_id is None:
270        raise ValueError(
271            f"Unable to resolve workspace_id {workspace_id!r} to a concrete workspace ID. "
272            "Ensure you provided a valid UUID or supported alias."
273        )
274    bearer_token, client_id, client_secret = _resolve_cloud_auth(ctx)
275
276    conn = _get_cloud_connection(
277        workspace_id=resolved_workspace_id,
278        connection_id=connection_id,
279        api_root=config_api_root or constants.CLOUD_API_ROOT,
280        bearer_token=bearer_token,
281        client_id=client_id,
282        client_secret=client_secret,
283    )
284
285    try:
286        configured_catalog = json.loads(configured_catalog_json)
287    except json.JSONDecodeError as e:
288        raise ValueError(
289            "Invalid JSON for parameter 'configured_catalog_json'. "
290            "Expected a JSON object representing an Airbyte configured catalog."
291        ) from e
292    conn.import_raw_catalog(configured_catalog)
293    result = conn.dump_raw_catalog()
294    if result is None:
295        raise RuntimeError(
296            "Failed to retrieve catalog after import. "
297            f"workspace_id={resolved_workspace_id!r}, connection_id={connection_id!r}"
298        )
299    return result
300
301
302def register_connection_medic_tools(app: FastMCP) -> None:
303    """Register connection medic tools if AIRBYTE_OPS_MEDIC_MODE is enabled."""
304    if not _is_medic_mode_enabled():
305        return
306
307    register_mcp_tools(app, mcp_module=__name__)
@mcp_tool(destructive=True, idempotent=False, open_world=True)
def update_connection_state( workspace_id: typing.Annotated[str | airbyte_ops_mcp.constants.WorkspaceAliasEnum, FieldInfo(annotation=NoneType, required=True, description="The Airbyte Cloud workspace ID (UUID) or alias. Aliases: '@devin-ai-sandbox'.")], connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The connection ID (UUID) to update state for.')], connection_state_json: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description="The connection state as a JSON string. Must include: 'stateType' (one of: 'global', 'stream', 'legacy'), and one of: 'state' (for legacy), 'streamState' (for stream), 'globalState' (for global). Tip: Use get_connection_state first to see the current format.")], config_api_root: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional API root URL override. Defaults to Airbyte Cloud. Use this to target local or self-hosted deployments.')] = None, *, ctx: fastmcp.server.context.Context) -> dict[str, typing.Any]:
 62@mcp_tool(
 63    destructive=True,
 64    idempotent=False,
 65    open_world=True,
 66)
 67def update_connection_state(
 68    workspace_id: Annotated[
 69        str | WorkspaceAliasEnum,
 70        Field(
 71            description="The Airbyte Cloud workspace ID (UUID) or alias. "
 72            "Aliases: '@devin-ai-sandbox'.",
 73        ),
 74    ],
 75    connection_id: Annotated[
 76        str,
 77        Field(description="The connection ID (UUID) to update state for."),
 78    ],
 79    connection_state_json: Annotated[
 80        str,
 81        Field(
 82            description="The connection state as a JSON string. Must include: "
 83            "'stateType' (one of: 'global', 'stream', 'legacy'), "
 84            "and one of: 'state' (for legacy), 'streamState' (for stream), "
 85            "'globalState' (for global). "
 86            "Tip: Use get_connection_state first to see the current format."
 87        ),
 88    ],
 89    config_api_root: Annotated[
 90        str | None,
 91        Field(
 92            description="Optional API root URL override. "
 93            "Defaults to Airbyte Cloud. "
 94            "Use this to target local or self-hosted deployments.",
 95            default=None,
 96        ),
 97    ] = None,
 98    *,
 99    ctx: Context,
100) -> dict[str, Any]:
101    """Update the full state for an Airbyte connection.
102
103    WARNING: This is a destructive emergency operation. Use with caution.
104    Uses the safe variant that prevents updates while a sync is running (HTTP 423).
105    Get the current state first with get_connection_state, modify it, then pass
106    the full state object back here.
107    """
108    resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id)
109    if resolved_workspace_id is None:
110        raise ValueError(
111            f"Unable to resolve workspace_id {workspace_id!r} to a concrete workspace ID. "
112            "Ensure you provided a valid UUID or supported alias."
113        )
114    bearer_token, client_id, client_secret = _resolve_cloud_auth(ctx)
115
116    conn = _get_cloud_connection(
117        workspace_id=resolved_workspace_id,
118        connection_id=connection_id,
119        api_root=config_api_root or constants.CLOUD_API_ROOT,
120        bearer_token=bearer_token,
121        client_id=client_id,
122        client_secret=client_secret,
123    )
124
125    try:
126        connection_state = json.loads(connection_state_json)
127    except json.JSONDecodeError as e:
128        raise ValueError(
129            "Invalid JSON for parameter 'connection_state_json'. "
130            "Expected a JSON object representing Airbyte connection state "
131            "(for example, an object with a 'stateType' field and related state data)."
132        ) from e
133    conn.import_raw_state(connection_state)
134    return conn.dump_raw_state()

Update the full state for an Airbyte connection.

WARNING: This is a destructive emergency operation. Use with caution. Uses the safe variant that prevents updates while a sync is running (HTTP 423). Get the current state first with get_connection_state, modify it, then pass the full state object back here.

@mcp_tool(destructive=True, idempotent=False, open_world=True)
def update_stream_state( workspace_id: typing.Annotated[str | airbyte_ops_mcp.constants.WorkspaceAliasEnum, FieldInfo(annotation=NoneType, required=True, description="The Airbyte Cloud workspace ID (UUID) or alias. Aliases: '@devin-ai-sandbox'.")], connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The connection ID (UUID) to update state for.')], stream_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the stream to update state for.')], stream_state_json: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description="The state blob for this stream as a JSON string. This is the inner state object (e.g., {'cursor': '2024-01-01'}), not the full connection state. Tip: Use get_connection_state with stream_name first to see the current format.")], stream_namespace: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional stream namespace to identify the stream.')] = None, config_api_root: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional API root URL override. Defaults to Airbyte Cloud. Use this to target local or self-hosted deployments.')] = None, *, ctx: fastmcp.server.context.Context) -> dict[str, typing.Any]:
137@mcp_tool(
138    destructive=True,
139    idempotent=False,
140    open_world=True,
141)
142def update_stream_state(
143    workspace_id: Annotated[
144        str | WorkspaceAliasEnum,
145        Field(
146            description="The Airbyte Cloud workspace ID (UUID) or alias. "
147            "Aliases: '@devin-ai-sandbox'.",
148        ),
149    ],
150    connection_id: Annotated[
151        str,
152        Field(description="The connection ID (UUID) to update state for."),
153    ],
154    stream_name: Annotated[
155        str,
156        Field(description="The name of the stream to update state for."),
157    ],
158    stream_state_json: Annotated[
159        str,
160        Field(
161            description="The state blob for this stream as a JSON string. "
162            "This is the inner state object (e.g., {'cursor': '2024-01-01'}), "
163            "not the full connection state. "
164            "Tip: Use get_connection_state with stream_name first to see the current format."
165        ),
166    ],
167    stream_namespace: Annotated[
168        str | None,
169        Field(
170            description="Optional stream namespace to identify the stream.",
171            default=None,
172        ),
173    ] = None,
174    config_api_root: Annotated[
175        str | None,
176        Field(
177            description="Optional API root URL override. "
178            "Defaults to Airbyte Cloud. "
179            "Use this to target local or self-hosted deployments.",
180            default=None,
181        ),
182    ] = None,
183    *,
184    ctx: Context,
185) -> dict[str, Any]:
186    """Update the state for a single stream within a connection.
187
188    WARNING: This is a destructive emergency operation. Use with caution.
189    Fetches the current full state, replaces only the specified stream's state,
190    then sends the updated state back. If the stream doesn't exist, it is appended.
191    Uses the safe variant that prevents updates while a sync is running (HTTP 423).
192    """
193    resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id)
194    if resolved_workspace_id is None:
195        raise ValueError(
196            f"Failed to resolve workspace_id '{workspace_id}' to a concrete workspace ID. "
197            "Use a valid workspace UUID or a supported alias."
198        )
199    bearer_token, client_id, client_secret = _resolve_cloud_auth(ctx)
200
201    conn = _get_cloud_connection(
202        workspace_id=resolved_workspace_id,
203        connection_id=connection_id,
204        api_root=config_api_root or constants.CLOUD_API_ROOT,
205        bearer_token=bearer_token,
206        client_id=client_id,
207        client_secret=client_secret,
208    )
209
210    try:
211        stream_state_dict = json.loads(stream_state_json)
212    except json.JSONDecodeError as exc:
213        raise ValueError(
214            "Invalid JSON provided for 'stream_state_json'. "
215            "Please supply a valid JSON object representing the stream state."
216        ) from exc
217    conn.set_stream_state(
218        stream_name=stream_name,
219        state_blob_dict=stream_state_dict,
220        stream_namespace=stream_namespace,
221    )
222    return conn.dump_raw_state()

Update the state for a single stream within a connection.

WARNING: This is a destructive emergency operation. Use with caution. Fetches the current full state, replaces only the specified stream's state, then sends the updated state back. If the stream doesn't exist, it is appended. Uses the safe variant that prevents updates while a sync is running (HTTP 423).

@mcp_tool(destructive=True, idempotent=False, open_world=True)
def update_connection_catalog( workspace_id: typing.Annotated[str | airbyte_ops_mcp.constants.WorkspaceAliasEnum, FieldInfo(annotation=NoneType, required=True, description="The Airbyte Cloud workspace ID (UUID) or alias. Aliases: '@devin-ai-sandbox'.")], connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The connection ID (UUID) to update catalog for.')], configured_catalog_json: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The configured catalog as a JSON string. This replaces the entire configured catalog for the connection. Tip: Use get_connection_catalog first to see the current format.')], config_api_root: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional API root URL override. Defaults to Airbyte Cloud. Use this to target local or self-hosted deployments.')] = None, *, ctx: fastmcp.server.context.Context) -> dict[str, typing.Any]:
225@mcp_tool(
226    destructive=True,
227    idempotent=False,
228    open_world=True,
229)
230def update_connection_catalog(
231    workspace_id: Annotated[
232        str | WorkspaceAliasEnum,
233        Field(
234            description="The Airbyte Cloud workspace ID (UUID) or alias. "
235            "Aliases: '@devin-ai-sandbox'.",
236        ),
237    ],
238    connection_id: Annotated[
239        str,
240        Field(description="The connection ID (UUID) to update catalog for."),
241    ],
242    configured_catalog_json: Annotated[
243        str,
244        Field(
245            description="The configured catalog as a JSON string. "
246            "This replaces the entire configured catalog for the connection. "
247            "Tip: Use get_connection_catalog first to see the current format."
248        ),
249    ],
250    config_api_root: Annotated[
251        str | None,
252        Field(
253            description="Optional API root URL override. "
254            "Defaults to Airbyte Cloud. "
255            "Use this to target local or self-hosted deployments.",
256            default=None,
257        ),
258    ] = None,
259    *,
260    ctx: Context,
261) -> dict[str, Any]:
262    """Replace the configured catalog for an Airbyte connection.
263
264    WARNING: This is a destructive emergency operation. Use with extreme caution.
265    This replaces the entire configured catalog, which controls which streams
266    are synced, their sync modes, primary keys, and cursor fields.
267    Get the current catalog first, modify it, then pass the full catalog back here.
268    """
269    resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id)
270    if resolved_workspace_id is None:
271        raise ValueError(
272            f"Unable to resolve workspace_id {workspace_id!r} to a concrete workspace ID. "
273            "Ensure you provided a valid UUID or supported alias."
274        )
275    bearer_token, client_id, client_secret = _resolve_cloud_auth(ctx)
276
277    conn = _get_cloud_connection(
278        workspace_id=resolved_workspace_id,
279        connection_id=connection_id,
280        api_root=config_api_root or constants.CLOUD_API_ROOT,
281        bearer_token=bearer_token,
282        client_id=client_id,
283        client_secret=client_secret,
284    )
285
286    try:
287        configured_catalog = json.loads(configured_catalog_json)
288    except json.JSONDecodeError as e:
289        raise ValueError(
290            "Invalid JSON for parameter 'configured_catalog_json'. "
291            "Expected a JSON object representing an Airbyte configured catalog."
292        ) from e
293    conn.import_raw_catalog(configured_catalog)
294    result = conn.dump_raw_catalog()
295    if result is None:
296        raise RuntimeError(
297            "Failed to retrieve catalog after import. "
298            f"workspace_id={resolved_workspace_id!r}, connection_id={connection_id!r}"
299        )
300    return result

Replace the configured catalog for an Airbyte connection.

WARNING: This is a destructive emergency operation. Use with extreme caution. This replaces the entire configured catalog, which controls which streams are synced, their sync modes, primary keys, and cursor fields. Get the current catalog first, modify it, then pass the full catalog back here.

def register_connection_medic_tools(app: fastmcp.server.server.FastMCP) -> None:
303def register_connection_medic_tools(app: FastMCP) -> None:
304    """Register connection medic tools if AIRBYTE_OPS_MEDIC_MODE is enabled."""
305    if not _is_medic_mode_enabled():
306        return
307
308    register_mcp_tools(app, mcp_module=__name__)

Register connection medic tools if AIRBYTE_OPS_MEDIC_MODE is enabled.