airbyte_ops_mcp.mcp.connection_medic

Emergency MCP tools for connection state and catalog writes.

These tools are destructive "break glass" operations for modifying connection state and catalog in Airbyte Cloud. They are only registered when the AIRBYTE_OPS_MEDIC_MODE environment variable is set to 1 or true.

  1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
  2"""Emergency MCP tools for connection state and catalog writes.
  3
  4These tools are destructive "break glass" operations for modifying
  5connection state and catalog in Airbyte Cloud. They are only registered
  6when the `AIRBYTE_OPS_MEDIC_MODE` environment variable is set to `1` or `true`.
  7"""
  8
  9# NOTE: We intentionally do NOT use `from __future__ import annotations` here.
 10# FastMCP has issues resolving forward references when PEP 563 deferred annotations
 11# are used. See: https://github.com/jlowin/fastmcp/issues/905
 12# Python 3.12+ supports modern type hint syntax natively, so this is not needed.
 13
 14import json
 15import os
 16from typing import Annotated, Any
 17
 18from airbyte import constants
 19from fastmcp import Context, FastMCP
 20from fastmcp_extensions import get_mcp_config, mcp_tool, register_mcp_tools
 21from pydantic import Field
 22
 23import airbyte_ops_mcp.cloud_admin.connection_state as cloud_connection_state
 24from airbyte_ops_mcp.cloud_admin.auth import CloudAuthError
 25from airbyte_ops_mcp.constants import (
 26    MEDIC_MODE_ENV_VAR,
 27    ServerConfigKey,
 28    WorkspaceAliasEnum,
 29)
 30from airbyte_ops_mcp.mcp.connection_state import _get_cloud_connection
 31
 32
 33def _is_medic_mode_enabled() -> bool:
 34    return os.getenv(MEDIC_MODE_ENV_VAR, "").lower() in ("1", "true")
 35
 36
 37def _resolve_cloud_auth(
 38    ctx: Context,
 39) -> tuple[str | None, str | None, str | None]:
 40    """Resolve authentication credentials for API calls.
 41
 42    Returns:
 43        Tuple of (bearer_token, client_id, client_secret).
 44    """
 45    bearer_token = get_mcp_config(ctx, ServerConfigKey.BEARER_TOKEN)
 46    if bearer_token:
 47        return bearer_token, None, None
 48
 49    try:
 50        client_id = get_mcp_config(ctx, ServerConfigKey.CLIENT_ID)
 51        client_secret = get_mcp_config(ctx, ServerConfigKey.CLIENT_SECRET)
 52        return None, client_id, client_secret
 53    except ValueError as e:
 54        raise CloudAuthError(
 55            f"Failed to resolve credentials. Ensure credentials are provided "
 56            f"via Authorization header (Bearer token), "
 57            f"HTTP headers (X-Airbyte-Cloud-Client-Id, X-Airbyte-Cloud-Client-Secret), "
 58            f"or environment variables. Error: {e}"
 59        ) from e
 60
 61
 62@mcp_tool(
 63    destructive=True,
 64    idempotent=False,
 65    open_world=True,
 66)
 67def update_connection_state(
 68    workspace_id: Annotated[
 69        str | WorkspaceAliasEnum,
 70        Field(
 71            description="The Airbyte Cloud workspace ID (UUID) or alias. "
 72            "Aliases: '@devin-ai-sandbox'.",
 73        ),
 74    ],
 75    connection_id: Annotated[
 76        str,
 77        Field(description="The connection ID (UUID) to update state for."),
 78    ],
 79    connection_state_json: Annotated[
 80        str,
 81        Field(
 82            description="The connection state as a JSON string. Must include: "
 83            "'stateType' (one of: 'global', 'stream', 'legacy'), "
 84            "and one of: 'state' (for legacy), 'streamState' (for stream), "
 85            "'globalState' (for global). "
 86            "Tip: Use get_connection_state first to see the current format."
 87        ),
 88    ],
 89    config_api_root: Annotated[
 90        str | None,
 91        Field(
 92            description="Optional API root URL override. "
 93            "Defaults to Airbyte Cloud. "
 94            "Use this to target local or self-hosted deployments.",
 95            default=None,
 96        ),
 97    ] = None,
 98    *,
 99    ctx: Context,
100) -> list[dict[str, Any]]:
101    """Update the full state for an Airbyte connection.
102
103    WARNING: This is a destructive emergency operation. Use with caution.
104    Uses the safe variant that prevents updates while a sync is running (HTTP 423).
105    Get the current state first with get_connection_state, modify it, then pass
106    the full state object back here.
107    """
108    resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id)
109    if resolved_workspace_id is None:
110        raise ValueError(
111            f"Unable to resolve workspace_id {workspace_id!r} to a concrete workspace ID. "
112            "Ensure you provided a valid UUID or supported alias."
113        )
114    bearer_token, client_id, client_secret = _resolve_cloud_auth(ctx)
115
116    conn = _get_cloud_connection(
117        workspace_id=resolved_workspace_id,
118        connection_id=connection_id,
119        api_root=config_api_root or constants.CLOUD_API_ROOT,
120        bearer_token=bearer_token,
121        client_id=client_id,
122        client_secret=client_secret,
123    )
124
125    try:
126        connection_state = json.loads(connection_state_json)
127    except json.JSONDecodeError as e:
128        raise ValueError(
129            "Invalid JSON for parameter 'connection_state_json'. "
130            "Expected a JSON object representing Airbyte connection state "
131            "(for example, an object with a 'stateType' field and related state data)."
132        ) from e
133    conn.import_raw_state(connection_state)
134    return conn.dump_raw_state()
135
136
137@mcp_tool(
138    destructive=True,
139    idempotent=False,
140    open_world=True,
141)
142def update_stream_state(
143    workspace_id: Annotated[
144        str | WorkspaceAliasEnum,
145        Field(
146            description="The Airbyte Cloud workspace ID (UUID) or alias. "
147            "Aliases: '@devin-ai-sandbox'.",
148        ),
149    ],
150    connection_id: Annotated[
151        str,
152        Field(description="The connection ID (UUID) to update state for."),
153    ],
154    stream_name: Annotated[
155        str,
156        Field(description="The name of the stream to update state for."),
157    ],
158    stream_state_json: Annotated[
159        str,
160        Field(
161            description="The state blob for this stream as a JSON string. "
162            "This is the inner state object (e.g., {'cursor': '2024-01-01'}), "
163            "not the full connection state. "
164            "Tip: Use get_connection_state with stream_name first to see the current format."
165        ),
166    ],
167    stream_namespace: Annotated[
168        str | None,
169        Field(
170            description="Optional stream namespace to identify the stream.",
171            default=None,
172        ),
173    ] = None,
174    config_api_root: Annotated[
175        str | None,
176        Field(
177            description="Optional API root URL override. "
178            "Defaults to Airbyte Cloud. "
179            "Use this to target local or self-hosted deployments.",
180            default=None,
181        ),
182    ] = None,
183    *,
184    ctx: Context,
185) -> list[dict[str, Any]]:
186    """Update the state for a single stream within a connection.
187
188    WARNING: This is a destructive emergency operation. Use with caution.
189    Fetches the current full state, replaces only the specified stream's state,
190    then sends the updated state back. If the stream doesn't exist, it is appended.
191    Uses the safe variant that prevents updates while a sync is running (HTTP 423).
192    """
193    resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id)
194    if resolved_workspace_id is None:
195        raise ValueError(
196            f"Failed to resolve workspace_id '{workspace_id}' to a concrete workspace ID. "
197            "Use a valid workspace UUID or a supported alias."
198        )
199    bearer_token, client_id, client_secret = _resolve_cloud_auth(ctx)
200
201    conn = _get_cloud_connection(
202        workspace_id=resolved_workspace_id,
203        connection_id=connection_id,
204        api_root=config_api_root or constants.CLOUD_API_ROOT,
205        bearer_token=bearer_token,
206        client_id=client_id,
207        client_secret=client_secret,
208    )
209
210    try:
211        stream_state_dict = json.loads(stream_state_json)
212    except json.JSONDecodeError as exc:
213        raise ValueError(
214            "Invalid JSON provided for 'stream_state_json'. "
215            "Please supply a valid JSON object representing the stream state."
216        ) from exc
217    conn.set_stream_state(
218        stream_name=stream_name,
219        state_blob_dict=stream_state_dict,
220        stream_namespace=stream_namespace,
221    )
222    return conn.dump_raw_state()
223
224
225@mcp_tool(
226    destructive=True,
227    idempotent=False,
228    open_world=True,
229)
230def reset_stream_state(
231    workspace_id: Annotated[
232        str | WorkspaceAliasEnum,
233        Field(
234            description="The Airbyte Cloud workspace ID (UUID) or alias. "
235            "Aliases: '@devin-ai-sandbox'.",
236        ),
237    ],
238    connection_id: Annotated[
239        str,
240        Field(description="The connection ID (UUID) to update state for."),
241    ],
242    stream_name: Annotated[
243        str,
244        Field(description="The configured stream name whose state should be reset."),
245    ],
246    stream_namespace: Annotated[
247        str | None,
248        Field(
249            description="Optional stream namespace to identify the stream.",
250            default=None,
251        ),
252    ] = None,
253    config_api_root: Annotated[
254        str | None,
255        Field(
256            description="Optional API root URL override. "
257            "Defaults to Airbyte Cloud. "
258            "Use this to target local or self-hosted deployments.",
259            default=None,
260        ),
261    ] = None,
262    *,
263    ctx: Context,
264) -> cloud_connection_state.ResetStreamResult:
265    """Reset a configured stream's state so the next sync full-refreshes it.
266
267    WARNING: This is a destructive emergency operation. Use with caution.
268    Uses the safe variant that prevents updates while a sync is running (HTTP 423).
269    Returns `previous_state_backup` in raw Config API format so the state can be restored.
270    """
271    resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id)
272    if resolved_workspace_id is None:
273        raise ValueError(
274            f"Failed to resolve workspace_id '{workspace_id}' to a concrete workspace ID. "
275            "Use a valid workspace UUID or a supported alias."
276        )
277    bearer_token, client_id, client_secret = _resolve_cloud_auth(ctx)
278
279    conn = _get_cloud_connection(
280        workspace_id=resolved_workspace_id,
281        connection_id=connection_id,
282        api_root=config_api_root or constants.CLOUD_API_ROOT,
283        bearer_token=bearer_token,
284        client_id=client_id,
285        client_secret=client_secret,
286    )
287    return cloud_connection_state.reset_stream_state(
288        conn,
289        stream_name=stream_name,
290        stream_namespace=stream_namespace,
291    )
292
293
294@mcp_tool(
295    destructive=True,
296    idempotent=False,
297    open_world=True,
298)
299def update_connection_catalog(
300    workspace_id: Annotated[
301        str | WorkspaceAliasEnum,
302        Field(
303            description="The Airbyte Cloud workspace ID (UUID) or alias. "
304            "Aliases: '@devin-ai-sandbox'.",
305        ),
306    ],
307    connection_id: Annotated[
308        str,
309        Field(description="The connection ID (UUID) to update catalog for."),
310    ],
311    configured_catalog_json: Annotated[
312        str,
313        Field(
314            description="The configured catalog as a JSON string. "
315            "This replaces the entire configured catalog for the connection. "
316            "Tip: Use get_connection_catalog first to see the current format."
317        ),
318    ],
319    config_api_root: Annotated[
320        str | None,
321        Field(
322            description="Optional API root URL override. "
323            "Defaults to Airbyte Cloud. "
324            "Use this to target local or self-hosted deployments.",
325            default=None,
326        ),
327    ] = None,
328    *,
329    ctx: Context,
330) -> dict[str, Any]:
331    """Replace the configured catalog for an Airbyte connection.
332
333    WARNING: This is a destructive emergency operation. Use with extreme caution.
334    This replaces the entire configured catalog, which controls which streams
335    are synced, their sync modes, primary keys, and cursor fields.
336    Get the current catalog first, modify it, then pass the full catalog back here.
337    """
338    resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id)
339    if resolved_workspace_id is None:
340        raise ValueError(
341            f"Unable to resolve workspace_id {workspace_id!r} to a concrete workspace ID. "
342            "Ensure you provided a valid UUID or supported alias."
343        )
344    bearer_token, client_id, client_secret = _resolve_cloud_auth(ctx)
345
346    conn = _get_cloud_connection(
347        workspace_id=resolved_workspace_id,
348        connection_id=connection_id,
349        api_root=config_api_root or constants.CLOUD_API_ROOT,
350        bearer_token=bearer_token,
351        client_id=client_id,
352        client_secret=client_secret,
353    )
354
355    try:
356        configured_catalog = json.loads(configured_catalog_json)
357    except json.JSONDecodeError as e:
358        raise ValueError(
359            "Invalid JSON for parameter 'configured_catalog_json'. "
360            "Expected a JSON object representing an Airbyte configured catalog."
361        ) from e
362    conn.import_raw_catalog(configured_catalog)
363    result = conn.dump_raw_catalog()
364    if result is None:
365        raise RuntimeError(
366            "Failed to retrieve catalog after import. "
367            f"workspace_id={resolved_workspace_id!r}, connection_id={connection_id!r}"
368        )
369    return result
370
371
372def register_connection_medic_tools(app: FastMCP) -> None:
373    """Register connection medic tools if AIRBYTE_OPS_MEDIC_MODE is enabled."""
374    if not _is_medic_mode_enabled():
375        return
376
377    register_mcp_tools(app, mcp_module=__name__)
@mcp_tool(destructive=True, idempotent=False, open_world=True)
def update_connection_state( workspace_id: typing.Annotated[str | airbyte_ops_mcp.constants.WorkspaceAliasEnum, FieldInfo(annotation=NoneType, required=True, description="The Airbyte Cloud workspace ID (UUID) or alias. Aliases: '@devin-ai-sandbox'.")], connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The connection ID (UUID) to update state for.')], connection_state_json: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description="The connection state as a JSON string. Must include: 'stateType' (one of: 'global', 'stream', 'legacy'), and one of: 'state' (for legacy), 'streamState' (for stream), 'globalState' (for global). Tip: Use get_connection_state first to see the current format.")], config_api_root: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional API root URL override. Defaults to Airbyte Cloud. Use this to target local or self-hosted deployments.')] = None, *, ctx: fastmcp.server.context.Context) -> list[dict[str, typing.Any]]:
 63@mcp_tool(
 64    destructive=True,
 65    idempotent=False,
 66    open_world=True,
 67)
 68def update_connection_state(
 69    workspace_id: Annotated[
 70        str | WorkspaceAliasEnum,
 71        Field(
 72            description="The Airbyte Cloud workspace ID (UUID) or alias. "
 73            "Aliases: '@devin-ai-sandbox'.",
 74        ),
 75    ],
 76    connection_id: Annotated[
 77        str,
 78        Field(description="The connection ID (UUID) to update state for."),
 79    ],
 80    connection_state_json: Annotated[
 81        str,
 82        Field(
 83            description="The connection state as a JSON string. Must include: "
 84            "'stateType' (one of: 'global', 'stream', 'legacy'), "
 85            "and one of: 'state' (for legacy), 'streamState' (for stream), "
 86            "'globalState' (for global). "
 87            "Tip: Use get_connection_state first to see the current format."
 88        ),
 89    ],
 90    config_api_root: Annotated[
 91        str | None,
 92        Field(
 93            description="Optional API root URL override. "
 94            "Defaults to Airbyte Cloud. "
 95            "Use this to target local or self-hosted deployments.",
 96            default=None,
 97        ),
 98    ] = None,
 99    *,
100    ctx: Context,
101) -> list[dict[str, Any]]:
102    """Update the full state for an Airbyte connection.
103
104    WARNING: This is a destructive emergency operation. Use with caution.
105    Uses the safe variant that prevents updates while a sync is running (HTTP 423).
106    Get the current state first with get_connection_state, modify it, then pass
107    the full state object back here.
108    """
109    resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id)
110    if resolved_workspace_id is None:
111        raise ValueError(
112            f"Unable to resolve workspace_id {workspace_id!r} to a concrete workspace ID. "
113            "Ensure you provided a valid UUID or supported alias."
114        )
115    bearer_token, client_id, client_secret = _resolve_cloud_auth(ctx)
116
117    conn = _get_cloud_connection(
118        workspace_id=resolved_workspace_id,
119        connection_id=connection_id,
120        api_root=config_api_root or constants.CLOUD_API_ROOT,
121        bearer_token=bearer_token,
122        client_id=client_id,
123        client_secret=client_secret,
124    )
125
126    try:
127        connection_state = json.loads(connection_state_json)
128    except json.JSONDecodeError as e:
129        raise ValueError(
130            "Invalid JSON for parameter 'connection_state_json'. "
131            "Expected a JSON object representing Airbyte connection state "
132            "(for example, an object with a 'stateType' field and related state data)."
133        ) from e
134    conn.import_raw_state(connection_state)
135    return conn.dump_raw_state()

Update the full state for an Airbyte connection.

WARNING: This is a destructive emergency operation. Use with caution. Uses the safe variant that prevents updates while a sync is running (HTTP 423). Get the current state first with get_connection_state, modify it, then pass the full state object back here.

@mcp_tool(destructive=True, idempotent=False, open_world=True)
def update_stream_state( workspace_id: typing.Annotated[str | airbyte_ops_mcp.constants.WorkspaceAliasEnum, FieldInfo(annotation=NoneType, required=True, description="The Airbyte Cloud workspace ID (UUID) or alias. Aliases: '@devin-ai-sandbox'.")], connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The connection ID (UUID) to update state for.')], stream_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the stream to update state for.')], stream_state_json: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description="The state blob for this stream as a JSON string. This is the inner state object (e.g., {'cursor': '2024-01-01'}), not the full connection state. Tip: Use get_connection_state with stream_name first to see the current format.")], stream_namespace: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional stream namespace to identify the stream.')] = None, config_api_root: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional API root URL override. Defaults to Airbyte Cloud. Use this to target local or self-hosted deployments.')] = None, *, ctx: fastmcp.server.context.Context) -> list[dict[str, typing.Any]]:
138@mcp_tool(
139    destructive=True,
140    idempotent=False,
141    open_world=True,
142)
143def update_stream_state(
144    workspace_id: Annotated[
145        str | WorkspaceAliasEnum,
146        Field(
147            description="The Airbyte Cloud workspace ID (UUID) or alias. "
148            "Aliases: '@devin-ai-sandbox'.",
149        ),
150    ],
151    connection_id: Annotated[
152        str,
153        Field(description="The connection ID (UUID) to update state for."),
154    ],
155    stream_name: Annotated[
156        str,
157        Field(description="The name of the stream to update state for."),
158    ],
159    stream_state_json: Annotated[
160        str,
161        Field(
162            description="The state blob for this stream as a JSON string. "
163            "This is the inner state object (e.g., {'cursor': '2024-01-01'}), "
164            "not the full connection state. "
165            "Tip: Use get_connection_state with stream_name first to see the current format."
166        ),
167    ],
168    stream_namespace: Annotated[
169        str | None,
170        Field(
171            description="Optional stream namespace to identify the stream.",
172            default=None,
173        ),
174    ] = None,
175    config_api_root: Annotated[
176        str | None,
177        Field(
178            description="Optional API root URL override. "
179            "Defaults to Airbyte Cloud. "
180            "Use this to target local or self-hosted deployments.",
181            default=None,
182        ),
183    ] = None,
184    *,
185    ctx: Context,
186) -> list[dict[str, Any]]:
187    """Update the state for a single stream within a connection.
188
189    WARNING: This is a destructive emergency operation. Use with caution.
190    Fetches the current full state, replaces only the specified stream's state,
191    then sends the updated state back. If the stream doesn't exist, it is appended.
192    Uses the safe variant that prevents updates while a sync is running (HTTP 423).
193    """
194    resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id)
195    if resolved_workspace_id is None:
196        raise ValueError(
197            f"Failed to resolve workspace_id '{workspace_id}' to a concrete workspace ID. "
198            "Use a valid workspace UUID or a supported alias."
199        )
200    bearer_token, client_id, client_secret = _resolve_cloud_auth(ctx)
201
202    conn = _get_cloud_connection(
203        workspace_id=resolved_workspace_id,
204        connection_id=connection_id,
205        api_root=config_api_root or constants.CLOUD_API_ROOT,
206        bearer_token=bearer_token,
207        client_id=client_id,
208        client_secret=client_secret,
209    )
210
211    try:
212        stream_state_dict = json.loads(stream_state_json)
213    except json.JSONDecodeError as exc:
214        raise ValueError(
215            "Invalid JSON provided for 'stream_state_json'. "
216            "Please supply a valid JSON object representing the stream state."
217        ) from exc
218    conn.set_stream_state(
219        stream_name=stream_name,
220        state_blob_dict=stream_state_dict,
221        stream_namespace=stream_namespace,
222    )
223    return conn.dump_raw_state()

Update the state for a single stream within a connection.

WARNING: This is a destructive emergency operation. Use with caution. Fetches the current full state, replaces only the specified stream's state, then sends the updated state back. If the stream doesn't exist, it is appended. Uses the safe variant that prevents updates while a sync is running (HTTP 423).

@mcp_tool(destructive=True, idempotent=False, open_world=True)
def reset_stream_state( workspace_id: typing.Annotated[str | airbyte_ops_mcp.constants.WorkspaceAliasEnum, FieldInfo(annotation=NoneType, required=True, description="The Airbyte Cloud workspace ID (UUID) or alias. Aliases: '@devin-ai-sandbox'.")], connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The connection ID (UUID) to update state for.')], stream_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The configured stream name whose state should be reset.')], stream_namespace: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional stream namespace to identify the stream.')] = None, config_api_root: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional API root URL override. Defaults to Airbyte Cloud. Use this to target local or self-hosted deployments.')] = None, *, ctx: fastmcp.server.context.Context) -> airbyte_ops_mcp.cloud_admin.connection_state.ResetStreamResult:
226@mcp_tool(
227    destructive=True,
228    idempotent=False,
229    open_world=True,
230)
231def reset_stream_state(
232    workspace_id: Annotated[
233        str | WorkspaceAliasEnum,
234        Field(
235            description="The Airbyte Cloud workspace ID (UUID) or alias. "
236            "Aliases: '@devin-ai-sandbox'.",
237        ),
238    ],
239    connection_id: Annotated[
240        str,
241        Field(description="The connection ID (UUID) to update state for."),
242    ],
243    stream_name: Annotated[
244        str,
245        Field(description="The configured stream name whose state should be reset."),
246    ],
247    stream_namespace: Annotated[
248        str | None,
249        Field(
250            description="Optional stream namespace to identify the stream.",
251            default=None,
252        ),
253    ] = None,
254    config_api_root: Annotated[
255        str | None,
256        Field(
257            description="Optional API root URL override. "
258            "Defaults to Airbyte Cloud. "
259            "Use this to target local or self-hosted deployments.",
260            default=None,
261        ),
262    ] = None,
263    *,
264    ctx: Context,
265) -> cloud_connection_state.ResetStreamResult:
266    """Reset a configured stream's state so the next sync full-refreshes it.
267
268    WARNING: This is a destructive emergency operation. Use with caution.
269    Uses the safe variant that prevents updates while a sync is running (HTTP 423).
270    Returns `previous_state_backup` in raw Config API format so the state can be restored.
271    """
272    resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id)
273    if resolved_workspace_id is None:
274        raise ValueError(
275            f"Failed to resolve workspace_id '{workspace_id}' to a concrete workspace ID. "
276            "Use a valid workspace UUID or a supported alias."
277        )
278    bearer_token, client_id, client_secret = _resolve_cloud_auth(ctx)
279
280    conn = _get_cloud_connection(
281        workspace_id=resolved_workspace_id,
282        connection_id=connection_id,
283        api_root=config_api_root or constants.CLOUD_API_ROOT,
284        bearer_token=bearer_token,
285        client_id=client_id,
286        client_secret=client_secret,
287    )
288    return cloud_connection_state.reset_stream_state(
289        conn,
290        stream_name=stream_name,
291        stream_namespace=stream_namespace,
292    )

Reset a configured stream's state so the next sync full-refreshes it.

WARNING: This is a destructive emergency operation. Use with caution. Uses the safe variant that prevents updates while a sync is running (HTTP 423). Returns previous_state_backup in raw Config API format so the state can be restored.

@mcp_tool(destructive=True, idempotent=False, open_world=True)
def update_connection_catalog( workspace_id: typing.Annotated[str | airbyte_ops_mcp.constants.WorkspaceAliasEnum, FieldInfo(annotation=NoneType, required=True, description="The Airbyte Cloud workspace ID (UUID) or alias. Aliases: '@devin-ai-sandbox'.")], connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The connection ID (UUID) to update catalog for.')], configured_catalog_json: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The configured catalog as a JSON string. This replaces the entire configured catalog for the connection. Tip: Use get_connection_catalog first to see the current format.')], config_api_root: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional API root URL override. Defaults to Airbyte Cloud. Use this to target local or self-hosted deployments.')] = None, *, ctx: fastmcp.server.context.Context) -> dict[str, typing.Any]:
295@mcp_tool(
296    destructive=True,
297    idempotent=False,
298    open_world=True,
299)
300def update_connection_catalog(
301    workspace_id: Annotated[
302        str | WorkspaceAliasEnum,
303        Field(
304            description="The Airbyte Cloud workspace ID (UUID) or alias. "
305            "Aliases: '@devin-ai-sandbox'.",
306        ),
307    ],
308    connection_id: Annotated[
309        str,
310        Field(description="The connection ID (UUID) to update catalog for."),
311    ],
312    configured_catalog_json: Annotated[
313        str,
314        Field(
315            description="The configured catalog as a JSON string. "
316            "This replaces the entire configured catalog for the connection. "
317            "Tip: Use get_connection_catalog first to see the current format."
318        ),
319    ],
320    config_api_root: Annotated[
321        str | None,
322        Field(
323            description="Optional API root URL override. "
324            "Defaults to Airbyte Cloud. "
325            "Use this to target local or self-hosted deployments.",
326            default=None,
327        ),
328    ] = None,
329    *,
330    ctx: Context,
331) -> dict[str, Any]:
332    """Replace the configured catalog for an Airbyte connection.
333
334    WARNING: This is a destructive emergency operation. Use with extreme caution.
335    This replaces the entire configured catalog, which controls which streams
336    are synced, their sync modes, primary keys, and cursor fields.
337    Get the current catalog first, modify it, then pass the full catalog back here.
338    """
339    resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id)
340    if resolved_workspace_id is None:
341        raise ValueError(
342            f"Unable to resolve workspace_id {workspace_id!r} to a concrete workspace ID. "
343            "Ensure you provided a valid UUID or supported alias."
344        )
345    bearer_token, client_id, client_secret = _resolve_cloud_auth(ctx)
346
347    conn = _get_cloud_connection(
348        workspace_id=resolved_workspace_id,
349        connection_id=connection_id,
350        api_root=config_api_root or constants.CLOUD_API_ROOT,
351        bearer_token=bearer_token,
352        client_id=client_id,
353        client_secret=client_secret,
354    )
355
356    try:
357        configured_catalog = json.loads(configured_catalog_json)
358    except json.JSONDecodeError as e:
359        raise ValueError(
360            "Invalid JSON for parameter 'configured_catalog_json'. "
361            "Expected a JSON object representing an Airbyte configured catalog."
362        ) from e
363    conn.import_raw_catalog(configured_catalog)
364    result = conn.dump_raw_catalog()
365    if result is None:
366        raise RuntimeError(
367            "Failed to retrieve catalog after import. "
368            f"workspace_id={resolved_workspace_id!r}, connection_id={connection_id!r}"
369        )
370    return result

Replace the configured catalog for an Airbyte connection.

WARNING: This is a destructive emergency operation. Use with extreme caution. This replaces the entire configured catalog, which controls which streams are synced, their sync modes, primary keys, and cursor fields. Get the current catalog first, modify it, then pass the full catalog back here.

def register_connection_medic_tools(app: fastmcp.server.server.FastMCP) -> None:
373def register_connection_medic_tools(app: FastMCP) -> None:
374    """Register connection medic tools if AIRBYTE_OPS_MEDIC_MODE is enabled."""
375    if not _is_medic_mode_enabled():
376        return
377
378    register_mcp_tools(app, mcp_module=__name__)

Register connection medic tools if AIRBYTE_OPS_MEDIC_MODE is enabled.