airbyte_ops_mcp.mcp.connection_medic
Emergency MCP tools for connection state and catalog writes.
These tools are destructive "break glass" operations for modifying
connection state and catalog in Airbyte Cloud. They are only registered
when the AIRBYTE_OPS_MEDIC_MODE environment variable is set to 1 or true.
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2"""Emergency MCP tools for connection state and catalog writes. 3 4These tools are destructive "break glass" operations for modifying 5connection state and catalog in Airbyte Cloud. They are only registered 6when the `AIRBYTE_OPS_MEDIC_MODE` environment variable is set to `1` or `true`. 7""" 8 9# NOTE: We intentionally do NOT use `from __future__ import annotations` here. 10# FastMCP has issues resolving forward references when PEP 563 deferred annotations 11# are used. See: https://github.com/jlowin/fastmcp/issues/905 12# Python 3.12+ supports modern type hint syntax natively, so this is not needed. 13 14import json 15import os 16from typing import Annotated, Any 17 18from airbyte import constants 19from fastmcp import Context, FastMCP 20from fastmcp_extensions import get_mcp_config, mcp_tool, register_mcp_tools 21from pydantic import Field 22 23from airbyte_ops_mcp.cloud_admin.auth import CloudAuthError 24from airbyte_ops_mcp.constants import ( 25 MEDIC_MODE_ENV_VAR, 26 ServerConfigKey, 27 WorkspaceAliasEnum, 28) 29from airbyte_ops_mcp.mcp.connection_state import _get_cloud_connection 30 31 32def _is_medic_mode_enabled() -> bool: 33 return os.getenv(MEDIC_MODE_ENV_VAR, "").lower() in ("1", "true") 34 35 36def _resolve_cloud_auth( 37 ctx: Context, 38) -> tuple[str | None, str | None, str | None]: 39 """Resolve authentication credentials for API calls. 40 41 Returns: 42 Tuple of (bearer_token, client_id, client_secret). 43 """ 44 bearer_token = get_mcp_config(ctx, ServerConfigKey.BEARER_TOKEN) 45 if bearer_token: 46 return bearer_token, None, None 47 48 try: 49 client_id = get_mcp_config(ctx, ServerConfigKey.CLIENT_ID) 50 client_secret = get_mcp_config(ctx, ServerConfigKey.CLIENT_SECRET) 51 return None, client_id, client_secret 52 except ValueError as e: 53 raise CloudAuthError( 54 f"Failed to resolve credentials. Ensure credentials are provided " 55 f"via Authorization header (Bearer token), " 56 f"HTTP headers (X-Airbyte-Cloud-Client-Id, X-Airbyte-Cloud-Client-Secret), " 57 f"or environment variables. Error: {e}" 58 ) from e 59 60 61@mcp_tool( 62 destructive=True, 63 idempotent=False, 64 open_world=True, 65) 66def update_connection_state( 67 workspace_id: Annotated[ 68 str | WorkspaceAliasEnum, 69 Field( 70 description="The Airbyte Cloud workspace ID (UUID) or alias. " 71 "Aliases: '@devin-ai-sandbox'.", 72 ), 73 ], 74 connection_id: Annotated[ 75 str, 76 Field(description="The connection ID (UUID) to update state for."), 77 ], 78 connection_state_json: Annotated[ 79 str, 80 Field( 81 description="The connection state as a JSON string. Must include: " 82 "'stateType' (one of: 'global', 'stream', 'legacy'), " 83 "and one of: 'state' (for legacy), 'streamState' (for stream), " 84 "'globalState' (for global). " 85 "Tip: Use get_connection_state first to see the current format." 86 ), 87 ], 88 config_api_root: Annotated[ 89 str | None, 90 Field( 91 description="Optional API root URL override. " 92 "Defaults to Airbyte Cloud. " 93 "Use this to target local or self-hosted deployments.", 94 default=None, 95 ), 96 ] = None, 97 *, 98 ctx: Context, 99) -> dict[str, Any]: 100 """Update the full state for an Airbyte connection. 101 102 WARNING: This is a destructive emergency operation. Use with caution. 103 Uses the safe variant that prevents updates while a sync is running (HTTP 423). 104 Get the current state first with get_connection_state, modify it, then pass 105 the full state object back here. 106 """ 107 resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id) 108 if resolved_workspace_id is None: 109 raise ValueError( 110 f"Unable to resolve workspace_id {workspace_id!r} to a concrete workspace ID. " 111 "Ensure you provided a valid UUID or supported alias." 112 ) 113 bearer_token, client_id, client_secret = _resolve_cloud_auth(ctx) 114 115 conn = _get_cloud_connection( 116 workspace_id=resolved_workspace_id, 117 connection_id=connection_id, 118 api_root=config_api_root or constants.CLOUD_API_ROOT, 119 bearer_token=bearer_token, 120 client_id=client_id, 121 client_secret=client_secret, 122 ) 123 124 try: 125 connection_state = json.loads(connection_state_json) 126 except json.JSONDecodeError as e: 127 raise ValueError( 128 "Invalid JSON for parameter 'connection_state_json'. " 129 "Expected a JSON object representing Airbyte connection state " 130 "(for example, an object with a 'stateType' field and related state data)." 131 ) from e 132 conn.import_raw_state(connection_state) 133 return conn.dump_raw_state() 134 135 136@mcp_tool( 137 destructive=True, 138 idempotent=False, 139 open_world=True, 140) 141def update_stream_state( 142 workspace_id: Annotated[ 143 str | WorkspaceAliasEnum, 144 Field( 145 description="The Airbyte Cloud workspace ID (UUID) or alias. " 146 "Aliases: '@devin-ai-sandbox'.", 147 ), 148 ], 149 connection_id: Annotated[ 150 str, 151 Field(description="The connection ID (UUID) to update state for."), 152 ], 153 stream_name: Annotated[ 154 str, 155 Field(description="The name of the stream to update state for."), 156 ], 157 stream_state_json: Annotated[ 158 str, 159 Field( 160 description="The state blob for this stream as a JSON string. " 161 "This is the inner state object (e.g., {'cursor': '2024-01-01'}), " 162 "not the full connection state. " 163 "Tip: Use get_connection_state with stream_name first to see the current format." 164 ), 165 ], 166 stream_namespace: Annotated[ 167 str | None, 168 Field( 169 description="Optional stream namespace to identify the stream.", 170 default=None, 171 ), 172 ] = None, 173 config_api_root: Annotated[ 174 str | None, 175 Field( 176 description="Optional API root URL override. " 177 "Defaults to Airbyte Cloud. " 178 "Use this to target local or self-hosted deployments.", 179 default=None, 180 ), 181 ] = None, 182 *, 183 ctx: Context, 184) -> dict[str, Any]: 185 """Update the state for a single stream within a connection. 186 187 WARNING: This is a destructive emergency operation. Use with caution. 188 Fetches the current full state, replaces only the specified stream's state, 189 then sends the updated state back. If the stream doesn't exist, it is appended. 190 Uses the safe variant that prevents updates while a sync is running (HTTP 423). 191 """ 192 resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id) 193 if resolved_workspace_id is None: 194 raise ValueError( 195 f"Failed to resolve workspace_id '{workspace_id}' to a concrete workspace ID. " 196 "Use a valid workspace UUID or a supported alias." 197 ) 198 bearer_token, client_id, client_secret = _resolve_cloud_auth(ctx) 199 200 conn = _get_cloud_connection( 201 workspace_id=resolved_workspace_id, 202 connection_id=connection_id, 203 api_root=config_api_root or constants.CLOUD_API_ROOT, 204 bearer_token=bearer_token, 205 client_id=client_id, 206 client_secret=client_secret, 207 ) 208 209 try: 210 stream_state_dict = json.loads(stream_state_json) 211 except json.JSONDecodeError as exc: 212 raise ValueError( 213 "Invalid JSON provided for 'stream_state_json'. " 214 "Please supply a valid JSON object representing the stream state." 215 ) from exc 216 conn.set_stream_state( 217 stream_name=stream_name, 218 state_blob_dict=stream_state_dict, 219 stream_namespace=stream_namespace, 220 ) 221 return conn.dump_raw_state() 222 223 224@mcp_tool( 225 destructive=True, 226 idempotent=False, 227 open_world=True, 228) 229def update_connection_catalog( 230 workspace_id: Annotated[ 231 str | WorkspaceAliasEnum, 232 Field( 233 description="The Airbyte Cloud workspace ID (UUID) or alias. " 234 "Aliases: '@devin-ai-sandbox'.", 235 ), 236 ], 237 connection_id: Annotated[ 238 str, 239 Field(description="The connection ID (UUID) to update catalog for."), 240 ], 241 configured_catalog_json: Annotated[ 242 str, 243 Field( 244 description="The configured catalog as a JSON string. " 245 "This replaces the entire configured catalog for the connection. " 246 "Tip: Use get_connection_catalog first to see the current format." 247 ), 248 ], 249 config_api_root: Annotated[ 250 str | None, 251 Field( 252 description="Optional API root URL override. " 253 "Defaults to Airbyte Cloud. " 254 "Use this to target local or self-hosted deployments.", 255 default=None, 256 ), 257 ] = None, 258 *, 259 ctx: Context, 260) -> dict[str, Any]: 261 """Replace the configured catalog for an Airbyte connection. 262 263 WARNING: This is a destructive emergency operation. Use with extreme caution. 264 This replaces the entire configured catalog, which controls which streams 265 are synced, their sync modes, primary keys, and cursor fields. 266 Get the current catalog first, modify it, then pass the full catalog back here. 267 """ 268 resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id) 269 if resolved_workspace_id is None: 270 raise ValueError( 271 f"Unable to resolve workspace_id {workspace_id!r} to a concrete workspace ID. " 272 "Ensure you provided a valid UUID or supported alias." 273 ) 274 bearer_token, client_id, client_secret = _resolve_cloud_auth(ctx) 275 276 conn = _get_cloud_connection( 277 workspace_id=resolved_workspace_id, 278 connection_id=connection_id, 279 api_root=config_api_root or constants.CLOUD_API_ROOT, 280 bearer_token=bearer_token, 281 client_id=client_id, 282 client_secret=client_secret, 283 ) 284 285 try: 286 configured_catalog = json.loads(configured_catalog_json) 287 except json.JSONDecodeError as e: 288 raise ValueError( 289 "Invalid JSON for parameter 'configured_catalog_json'. " 290 "Expected a JSON object representing an Airbyte configured catalog." 291 ) from e 292 conn.import_raw_catalog(configured_catalog) 293 result = conn.dump_raw_catalog() 294 if result is None: 295 raise RuntimeError( 296 "Failed to retrieve catalog after import. " 297 f"workspace_id={resolved_workspace_id!r}, connection_id={connection_id!r}" 298 ) 299 return result 300 301 302def register_connection_medic_tools(app: FastMCP) -> None: 303 """Register connection medic tools if AIRBYTE_OPS_MEDIC_MODE is enabled.""" 304 if not _is_medic_mode_enabled(): 305 return 306 307 register_mcp_tools(app, mcp_module=__name__)
62@mcp_tool( 63 destructive=True, 64 idempotent=False, 65 open_world=True, 66) 67def update_connection_state( 68 workspace_id: Annotated[ 69 str | WorkspaceAliasEnum, 70 Field( 71 description="The Airbyte Cloud workspace ID (UUID) or alias. " 72 "Aliases: '@devin-ai-sandbox'.", 73 ), 74 ], 75 connection_id: Annotated[ 76 str, 77 Field(description="The connection ID (UUID) to update state for."), 78 ], 79 connection_state_json: Annotated[ 80 str, 81 Field( 82 description="The connection state as a JSON string. Must include: " 83 "'stateType' (one of: 'global', 'stream', 'legacy'), " 84 "and one of: 'state' (for legacy), 'streamState' (for stream), " 85 "'globalState' (for global). " 86 "Tip: Use get_connection_state first to see the current format." 87 ), 88 ], 89 config_api_root: Annotated[ 90 str | None, 91 Field( 92 description="Optional API root URL override. " 93 "Defaults to Airbyte Cloud. " 94 "Use this to target local or self-hosted deployments.", 95 default=None, 96 ), 97 ] = None, 98 *, 99 ctx: Context, 100) -> dict[str, Any]: 101 """Update the full state for an Airbyte connection. 102 103 WARNING: This is a destructive emergency operation. Use with caution. 104 Uses the safe variant that prevents updates while a sync is running (HTTP 423). 105 Get the current state first with get_connection_state, modify it, then pass 106 the full state object back here. 107 """ 108 resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id) 109 if resolved_workspace_id is None: 110 raise ValueError( 111 f"Unable to resolve workspace_id {workspace_id!r} to a concrete workspace ID. " 112 "Ensure you provided a valid UUID or supported alias." 113 ) 114 bearer_token, client_id, client_secret = _resolve_cloud_auth(ctx) 115 116 conn = _get_cloud_connection( 117 workspace_id=resolved_workspace_id, 118 connection_id=connection_id, 119 api_root=config_api_root or constants.CLOUD_API_ROOT, 120 bearer_token=bearer_token, 121 client_id=client_id, 122 client_secret=client_secret, 123 ) 124 125 try: 126 connection_state = json.loads(connection_state_json) 127 except json.JSONDecodeError as e: 128 raise ValueError( 129 "Invalid JSON for parameter 'connection_state_json'. " 130 "Expected a JSON object representing Airbyte connection state " 131 "(for example, an object with a 'stateType' field and related state data)." 132 ) from e 133 conn.import_raw_state(connection_state) 134 return conn.dump_raw_state()
Update the full state for an Airbyte connection.
WARNING: This is a destructive emergency operation. Use with caution. Uses the safe variant that prevents updates while a sync is running (HTTP 423). Get the current state first with get_connection_state, modify it, then pass the full state object back here.
137@mcp_tool( 138 destructive=True, 139 idempotent=False, 140 open_world=True, 141) 142def update_stream_state( 143 workspace_id: Annotated[ 144 str | WorkspaceAliasEnum, 145 Field( 146 description="The Airbyte Cloud workspace ID (UUID) or alias. " 147 "Aliases: '@devin-ai-sandbox'.", 148 ), 149 ], 150 connection_id: Annotated[ 151 str, 152 Field(description="The connection ID (UUID) to update state for."), 153 ], 154 stream_name: Annotated[ 155 str, 156 Field(description="The name of the stream to update state for."), 157 ], 158 stream_state_json: Annotated[ 159 str, 160 Field( 161 description="The state blob for this stream as a JSON string. " 162 "This is the inner state object (e.g., {'cursor': '2024-01-01'}), " 163 "not the full connection state. " 164 "Tip: Use get_connection_state with stream_name first to see the current format." 165 ), 166 ], 167 stream_namespace: Annotated[ 168 str | None, 169 Field( 170 description="Optional stream namespace to identify the stream.", 171 default=None, 172 ), 173 ] = None, 174 config_api_root: Annotated[ 175 str | None, 176 Field( 177 description="Optional API root URL override. " 178 "Defaults to Airbyte Cloud. " 179 "Use this to target local or self-hosted deployments.", 180 default=None, 181 ), 182 ] = None, 183 *, 184 ctx: Context, 185) -> dict[str, Any]: 186 """Update the state for a single stream within a connection. 187 188 WARNING: This is a destructive emergency operation. Use with caution. 189 Fetches the current full state, replaces only the specified stream's state, 190 then sends the updated state back. If the stream doesn't exist, it is appended. 191 Uses the safe variant that prevents updates while a sync is running (HTTP 423). 192 """ 193 resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id) 194 if resolved_workspace_id is None: 195 raise ValueError( 196 f"Failed to resolve workspace_id '{workspace_id}' to a concrete workspace ID. " 197 "Use a valid workspace UUID or a supported alias." 198 ) 199 bearer_token, client_id, client_secret = _resolve_cloud_auth(ctx) 200 201 conn = _get_cloud_connection( 202 workspace_id=resolved_workspace_id, 203 connection_id=connection_id, 204 api_root=config_api_root or constants.CLOUD_API_ROOT, 205 bearer_token=bearer_token, 206 client_id=client_id, 207 client_secret=client_secret, 208 ) 209 210 try: 211 stream_state_dict = json.loads(stream_state_json) 212 except json.JSONDecodeError as exc: 213 raise ValueError( 214 "Invalid JSON provided for 'stream_state_json'. " 215 "Please supply a valid JSON object representing the stream state." 216 ) from exc 217 conn.set_stream_state( 218 stream_name=stream_name, 219 state_blob_dict=stream_state_dict, 220 stream_namespace=stream_namespace, 221 ) 222 return conn.dump_raw_state()
Update the state for a single stream within a connection.
WARNING: This is a destructive emergency operation. Use with caution. Fetches the current full state, replaces only the specified stream's state, then sends the updated state back. If the stream doesn't exist, it is appended. Uses the safe variant that prevents updates while a sync is running (HTTP 423).
225@mcp_tool( 226 destructive=True, 227 idempotent=False, 228 open_world=True, 229) 230def update_connection_catalog( 231 workspace_id: Annotated[ 232 str | WorkspaceAliasEnum, 233 Field( 234 description="The Airbyte Cloud workspace ID (UUID) or alias. " 235 "Aliases: '@devin-ai-sandbox'.", 236 ), 237 ], 238 connection_id: Annotated[ 239 str, 240 Field(description="The connection ID (UUID) to update catalog for."), 241 ], 242 configured_catalog_json: Annotated[ 243 str, 244 Field( 245 description="The configured catalog as a JSON string. " 246 "This replaces the entire configured catalog for the connection. " 247 "Tip: Use get_connection_catalog first to see the current format." 248 ), 249 ], 250 config_api_root: Annotated[ 251 str | None, 252 Field( 253 description="Optional API root URL override. " 254 "Defaults to Airbyte Cloud. " 255 "Use this to target local or self-hosted deployments.", 256 default=None, 257 ), 258 ] = None, 259 *, 260 ctx: Context, 261) -> dict[str, Any]: 262 """Replace the configured catalog for an Airbyte connection. 263 264 WARNING: This is a destructive emergency operation. Use with extreme caution. 265 This replaces the entire configured catalog, which controls which streams 266 are synced, their sync modes, primary keys, and cursor fields. 267 Get the current catalog first, modify it, then pass the full catalog back here. 268 """ 269 resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id) 270 if resolved_workspace_id is None: 271 raise ValueError( 272 f"Unable to resolve workspace_id {workspace_id!r} to a concrete workspace ID. " 273 "Ensure you provided a valid UUID or supported alias." 274 ) 275 bearer_token, client_id, client_secret = _resolve_cloud_auth(ctx) 276 277 conn = _get_cloud_connection( 278 workspace_id=resolved_workspace_id, 279 connection_id=connection_id, 280 api_root=config_api_root or constants.CLOUD_API_ROOT, 281 bearer_token=bearer_token, 282 client_id=client_id, 283 client_secret=client_secret, 284 ) 285 286 try: 287 configured_catalog = json.loads(configured_catalog_json) 288 except json.JSONDecodeError as e: 289 raise ValueError( 290 "Invalid JSON for parameter 'configured_catalog_json'. " 291 "Expected a JSON object representing an Airbyte configured catalog." 292 ) from e 293 conn.import_raw_catalog(configured_catalog) 294 result = conn.dump_raw_catalog() 295 if result is None: 296 raise RuntimeError( 297 "Failed to retrieve catalog after import. " 298 f"workspace_id={resolved_workspace_id!r}, connection_id={connection_id!r}" 299 ) 300 return result
Replace the configured catalog for an Airbyte connection.
WARNING: This is a destructive emergency operation. Use with extreme caution. This replaces the entire configured catalog, which controls which streams are synced, their sync modes, primary keys, and cursor fields. Get the current catalog first, modify it, then pass the full catalog back here.
303def register_connection_medic_tools(app: FastMCP) -> None: 304 """Register connection medic tools if AIRBYTE_OPS_MEDIC_MODE is enabled.""" 305 if not _is_medic_mode_enabled(): 306 return 307 308 register_mcp_tools(app, mcp_module=__name__)
Register connection medic tools if AIRBYTE_OPS_MEDIC_MODE is enabled.