airbyte_ops_mcp.mcp.connection_medic
Emergency MCP tools for connection state and catalog writes.
These tools are destructive "break glass" operations for modifying
connection state and catalog in Airbyte Cloud. They are only registered
when the AIRBYTE_OPS_MEDIC_MODE environment variable is set to 1 or true.
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2"""Emergency MCP tools for connection state and catalog writes. 3 4These tools are destructive "break glass" operations for modifying 5connection state and catalog in Airbyte Cloud. They are only registered 6when the `AIRBYTE_OPS_MEDIC_MODE` environment variable is set to `1` or `true`. 7""" 8 9# NOTE: We intentionally do NOT use `from __future__ import annotations` here. 10# FastMCP has issues resolving forward references when PEP 563 deferred annotations 11# are used. See: https://github.com/jlowin/fastmcp/issues/905 12# Python 3.12+ supports modern type hint syntax natively, so this is not needed. 13 14import json 15import os 16from typing import Annotated, Any 17 18from airbyte import constants 19from fastmcp import Context, FastMCP 20from fastmcp_extensions import get_mcp_config, mcp_tool, register_mcp_tools 21from pydantic import Field 22 23import airbyte_ops_mcp.cloud_admin.connection_state as cloud_connection_state 24from airbyte_ops_mcp.cloud_admin.auth import CloudAuthError 25from airbyte_ops_mcp.constants import ( 26 MEDIC_MODE_ENV_VAR, 27 ServerConfigKey, 28 WorkspaceAliasEnum, 29) 30from airbyte_ops_mcp.mcp.connection_state import _get_cloud_connection 31 32 33def _is_medic_mode_enabled() -> bool: 34 return os.getenv(MEDIC_MODE_ENV_VAR, "").lower() in ("1", "true") 35 36 37def _resolve_cloud_auth( 38 ctx: Context, 39) -> tuple[str | None, str | None, str | None]: 40 """Resolve authentication credentials for API calls. 41 42 Returns: 43 Tuple of (bearer_token, client_id, client_secret). 44 """ 45 bearer_token = get_mcp_config(ctx, ServerConfigKey.BEARER_TOKEN) 46 if bearer_token: 47 return bearer_token, None, None 48 49 try: 50 client_id = get_mcp_config(ctx, ServerConfigKey.CLIENT_ID) 51 client_secret = get_mcp_config(ctx, ServerConfigKey.CLIENT_SECRET) 52 return None, client_id, client_secret 53 except ValueError as e: 54 raise CloudAuthError( 55 f"Failed to resolve credentials. Ensure credentials are provided " 56 f"via Authorization header (Bearer token), " 57 f"HTTP headers (X-Airbyte-Cloud-Client-Id, X-Airbyte-Cloud-Client-Secret), " 58 f"or environment variables. Error: {e}" 59 ) from e 60 61 62@mcp_tool( 63 destructive=True, 64 idempotent=False, 65 open_world=True, 66) 67def update_connection_state( 68 workspace_id: Annotated[ 69 str | WorkspaceAliasEnum, 70 Field( 71 description="The Airbyte Cloud workspace ID (UUID) or alias. " 72 "Aliases: '@devin-ai-sandbox'.", 73 ), 74 ], 75 connection_id: Annotated[ 76 str, 77 Field(description="The connection ID (UUID) to update state for."), 78 ], 79 connection_state_json: Annotated[ 80 str, 81 Field( 82 description="The connection state as a JSON string. Must include: " 83 "'stateType' (one of: 'global', 'stream', 'legacy'), " 84 "and one of: 'state' (for legacy), 'streamState' (for stream), " 85 "'globalState' (for global). " 86 "Tip: Use get_connection_state first to see the current format." 87 ), 88 ], 89 config_api_root: Annotated[ 90 str | None, 91 Field( 92 description="Optional API root URL override. " 93 "Defaults to Airbyte Cloud. " 94 "Use this to target local or self-hosted deployments.", 95 default=None, 96 ), 97 ] = None, 98 *, 99 ctx: Context, 100) -> list[dict[str, Any]]: 101 """Update the full state for an Airbyte connection. 102 103 WARNING: This is a destructive emergency operation. Use with caution. 104 Uses the safe variant that prevents updates while a sync is running (HTTP 423). 105 Get the current state first with get_connection_state, modify it, then pass 106 the full state object back here. 107 """ 108 resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id) 109 if resolved_workspace_id is None: 110 raise ValueError( 111 f"Unable to resolve workspace_id {workspace_id!r} to a concrete workspace ID. " 112 "Ensure you provided a valid UUID or supported alias." 113 ) 114 bearer_token, client_id, client_secret = _resolve_cloud_auth(ctx) 115 116 conn = _get_cloud_connection( 117 workspace_id=resolved_workspace_id, 118 connection_id=connection_id, 119 api_root=config_api_root or constants.CLOUD_API_ROOT, 120 bearer_token=bearer_token, 121 client_id=client_id, 122 client_secret=client_secret, 123 ) 124 125 try: 126 connection_state = json.loads(connection_state_json) 127 except json.JSONDecodeError as e: 128 raise ValueError( 129 "Invalid JSON for parameter 'connection_state_json'. " 130 "Expected a JSON object representing Airbyte connection state " 131 "(for example, an object with a 'stateType' field and related state data)." 132 ) from e 133 conn.import_raw_state(connection_state) 134 return conn.dump_raw_state() 135 136 137@mcp_tool( 138 destructive=True, 139 idempotent=False, 140 open_world=True, 141) 142def update_stream_state( 143 workspace_id: Annotated[ 144 str | WorkspaceAliasEnum, 145 Field( 146 description="The Airbyte Cloud workspace ID (UUID) or alias. " 147 "Aliases: '@devin-ai-sandbox'.", 148 ), 149 ], 150 connection_id: Annotated[ 151 str, 152 Field(description="The connection ID (UUID) to update state for."), 153 ], 154 stream_name: Annotated[ 155 str, 156 Field(description="The name of the stream to update state for."), 157 ], 158 stream_state_json: Annotated[ 159 str, 160 Field( 161 description="The state blob for this stream as a JSON string. " 162 "This is the inner state object (e.g., {'cursor': '2024-01-01'}), " 163 "not the full connection state. " 164 "Tip: Use get_connection_state with stream_name first to see the current format." 165 ), 166 ], 167 stream_namespace: Annotated[ 168 str | None, 169 Field( 170 description="Optional stream namespace to identify the stream.", 171 default=None, 172 ), 173 ] = None, 174 config_api_root: Annotated[ 175 str | None, 176 Field( 177 description="Optional API root URL override. " 178 "Defaults to Airbyte Cloud. " 179 "Use this to target local or self-hosted deployments.", 180 default=None, 181 ), 182 ] = None, 183 *, 184 ctx: Context, 185) -> list[dict[str, Any]]: 186 """Update the state for a single stream within a connection. 187 188 WARNING: This is a destructive emergency operation. Use with caution. 189 Fetches the current full state, replaces only the specified stream's state, 190 then sends the updated state back. If the stream doesn't exist, it is appended. 191 Uses the safe variant that prevents updates while a sync is running (HTTP 423). 192 """ 193 resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id) 194 if resolved_workspace_id is None: 195 raise ValueError( 196 f"Failed to resolve workspace_id '{workspace_id}' to a concrete workspace ID. " 197 "Use a valid workspace UUID or a supported alias." 198 ) 199 bearer_token, client_id, client_secret = _resolve_cloud_auth(ctx) 200 201 conn = _get_cloud_connection( 202 workspace_id=resolved_workspace_id, 203 connection_id=connection_id, 204 api_root=config_api_root or constants.CLOUD_API_ROOT, 205 bearer_token=bearer_token, 206 client_id=client_id, 207 client_secret=client_secret, 208 ) 209 210 try: 211 stream_state_dict = json.loads(stream_state_json) 212 except json.JSONDecodeError as exc: 213 raise ValueError( 214 "Invalid JSON provided for 'stream_state_json'. " 215 "Please supply a valid JSON object representing the stream state." 216 ) from exc 217 conn.set_stream_state( 218 stream_name=stream_name, 219 state_blob_dict=stream_state_dict, 220 stream_namespace=stream_namespace, 221 ) 222 return conn.dump_raw_state() 223 224 225@mcp_tool( 226 destructive=True, 227 idempotent=False, 228 open_world=True, 229) 230def reset_stream_state( 231 workspace_id: Annotated[ 232 str | WorkspaceAliasEnum, 233 Field( 234 description="The Airbyte Cloud workspace ID (UUID) or alias. " 235 "Aliases: '@devin-ai-sandbox'.", 236 ), 237 ], 238 connection_id: Annotated[ 239 str, 240 Field(description="The connection ID (UUID) to update state for."), 241 ], 242 stream_name: Annotated[ 243 str, 244 Field(description="The configured stream name whose state should be reset."), 245 ], 246 stream_namespace: Annotated[ 247 str | None, 248 Field( 249 description="Optional stream namespace to identify the stream.", 250 default=None, 251 ), 252 ] = None, 253 config_api_root: Annotated[ 254 str | None, 255 Field( 256 description="Optional API root URL override. " 257 "Defaults to Airbyte Cloud. " 258 "Use this to target local or self-hosted deployments.", 259 default=None, 260 ), 261 ] = None, 262 *, 263 ctx: Context, 264) -> cloud_connection_state.ResetStreamResult: 265 """Reset a configured stream's state so the next sync full-refreshes it. 266 267 WARNING: This is a destructive emergency operation. Use with caution. 268 Uses the safe variant that prevents updates while a sync is running (HTTP 423). 269 Returns `previous_state_backup` in raw Config API format so the state can be restored. 270 """ 271 resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id) 272 if resolved_workspace_id is None: 273 raise ValueError( 274 f"Failed to resolve workspace_id '{workspace_id}' to a concrete workspace ID. " 275 "Use a valid workspace UUID or a supported alias." 276 ) 277 bearer_token, client_id, client_secret = _resolve_cloud_auth(ctx) 278 279 conn = _get_cloud_connection( 280 workspace_id=resolved_workspace_id, 281 connection_id=connection_id, 282 api_root=config_api_root or constants.CLOUD_API_ROOT, 283 bearer_token=bearer_token, 284 client_id=client_id, 285 client_secret=client_secret, 286 ) 287 return cloud_connection_state.reset_stream_state( 288 conn, 289 stream_name=stream_name, 290 stream_namespace=stream_namespace, 291 ) 292 293 294@mcp_tool( 295 destructive=True, 296 idempotent=False, 297 open_world=True, 298) 299def update_connection_catalog( 300 workspace_id: Annotated[ 301 str | WorkspaceAliasEnum, 302 Field( 303 description="The Airbyte Cloud workspace ID (UUID) or alias. " 304 "Aliases: '@devin-ai-sandbox'.", 305 ), 306 ], 307 connection_id: Annotated[ 308 str, 309 Field(description="The connection ID (UUID) to update catalog for."), 310 ], 311 configured_catalog_json: Annotated[ 312 str, 313 Field( 314 description="The configured catalog as a JSON string. " 315 "This replaces the entire configured catalog for the connection. " 316 "Tip: Use get_connection_catalog first to see the current format." 317 ), 318 ], 319 config_api_root: Annotated[ 320 str | None, 321 Field( 322 description="Optional API root URL override. " 323 "Defaults to Airbyte Cloud. " 324 "Use this to target local or self-hosted deployments.", 325 default=None, 326 ), 327 ] = None, 328 *, 329 ctx: Context, 330) -> dict[str, Any]: 331 """Replace the configured catalog for an Airbyte connection. 332 333 WARNING: This is a destructive emergency operation. Use with extreme caution. 334 This replaces the entire configured catalog, which controls which streams 335 are synced, their sync modes, primary keys, and cursor fields. 336 Get the current catalog first, modify it, then pass the full catalog back here. 337 """ 338 resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id) 339 if resolved_workspace_id is None: 340 raise ValueError( 341 f"Unable to resolve workspace_id {workspace_id!r} to a concrete workspace ID. " 342 "Ensure you provided a valid UUID or supported alias." 343 ) 344 bearer_token, client_id, client_secret = _resolve_cloud_auth(ctx) 345 346 conn = _get_cloud_connection( 347 workspace_id=resolved_workspace_id, 348 connection_id=connection_id, 349 api_root=config_api_root or constants.CLOUD_API_ROOT, 350 bearer_token=bearer_token, 351 client_id=client_id, 352 client_secret=client_secret, 353 ) 354 355 try: 356 configured_catalog = json.loads(configured_catalog_json) 357 except json.JSONDecodeError as e: 358 raise ValueError( 359 "Invalid JSON for parameter 'configured_catalog_json'. " 360 "Expected a JSON object representing an Airbyte configured catalog." 361 ) from e 362 conn.import_raw_catalog(configured_catalog) 363 result = conn.dump_raw_catalog() 364 if result is None: 365 raise RuntimeError( 366 "Failed to retrieve catalog after import. " 367 f"workspace_id={resolved_workspace_id!r}, connection_id={connection_id!r}" 368 ) 369 return result 370 371 372def register_connection_medic_tools(app: FastMCP) -> None: 373 """Register connection medic tools if AIRBYTE_OPS_MEDIC_MODE is enabled.""" 374 if not _is_medic_mode_enabled(): 375 return 376 377 register_mcp_tools(app, mcp_module=__name__)
63@mcp_tool( 64 destructive=True, 65 idempotent=False, 66 open_world=True, 67) 68def update_connection_state( 69 workspace_id: Annotated[ 70 str | WorkspaceAliasEnum, 71 Field( 72 description="The Airbyte Cloud workspace ID (UUID) or alias. " 73 "Aliases: '@devin-ai-sandbox'.", 74 ), 75 ], 76 connection_id: Annotated[ 77 str, 78 Field(description="The connection ID (UUID) to update state for."), 79 ], 80 connection_state_json: Annotated[ 81 str, 82 Field( 83 description="The connection state as a JSON string. Must include: " 84 "'stateType' (one of: 'global', 'stream', 'legacy'), " 85 "and one of: 'state' (for legacy), 'streamState' (for stream), " 86 "'globalState' (for global). " 87 "Tip: Use get_connection_state first to see the current format." 88 ), 89 ], 90 config_api_root: Annotated[ 91 str | None, 92 Field( 93 description="Optional API root URL override. " 94 "Defaults to Airbyte Cloud. " 95 "Use this to target local or self-hosted deployments.", 96 default=None, 97 ), 98 ] = None, 99 *, 100 ctx: Context, 101) -> list[dict[str, Any]]: 102 """Update the full state for an Airbyte connection. 103 104 WARNING: This is a destructive emergency operation. Use with caution. 105 Uses the safe variant that prevents updates while a sync is running (HTTP 423). 106 Get the current state first with get_connection_state, modify it, then pass 107 the full state object back here. 108 """ 109 resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id) 110 if resolved_workspace_id is None: 111 raise ValueError( 112 f"Unable to resolve workspace_id {workspace_id!r} to a concrete workspace ID. " 113 "Ensure you provided a valid UUID or supported alias." 114 ) 115 bearer_token, client_id, client_secret = _resolve_cloud_auth(ctx) 116 117 conn = _get_cloud_connection( 118 workspace_id=resolved_workspace_id, 119 connection_id=connection_id, 120 api_root=config_api_root or constants.CLOUD_API_ROOT, 121 bearer_token=bearer_token, 122 client_id=client_id, 123 client_secret=client_secret, 124 ) 125 126 try: 127 connection_state = json.loads(connection_state_json) 128 except json.JSONDecodeError as e: 129 raise ValueError( 130 "Invalid JSON for parameter 'connection_state_json'. " 131 "Expected a JSON object representing Airbyte connection state " 132 "(for example, an object with a 'stateType' field and related state data)." 133 ) from e 134 conn.import_raw_state(connection_state) 135 return conn.dump_raw_state()
Update the full state for an Airbyte connection.
WARNING: This is a destructive emergency operation. Use with caution. Uses the safe variant that prevents updates while a sync is running (HTTP 423). Get the current state first with get_connection_state, modify it, then pass the full state object back here.
138@mcp_tool( 139 destructive=True, 140 idempotent=False, 141 open_world=True, 142) 143def update_stream_state( 144 workspace_id: Annotated[ 145 str | WorkspaceAliasEnum, 146 Field( 147 description="The Airbyte Cloud workspace ID (UUID) or alias. " 148 "Aliases: '@devin-ai-sandbox'.", 149 ), 150 ], 151 connection_id: Annotated[ 152 str, 153 Field(description="The connection ID (UUID) to update state for."), 154 ], 155 stream_name: Annotated[ 156 str, 157 Field(description="The name of the stream to update state for."), 158 ], 159 stream_state_json: Annotated[ 160 str, 161 Field( 162 description="The state blob for this stream as a JSON string. " 163 "This is the inner state object (e.g., {'cursor': '2024-01-01'}), " 164 "not the full connection state. " 165 "Tip: Use get_connection_state with stream_name first to see the current format." 166 ), 167 ], 168 stream_namespace: Annotated[ 169 str | None, 170 Field( 171 description="Optional stream namespace to identify the stream.", 172 default=None, 173 ), 174 ] = None, 175 config_api_root: Annotated[ 176 str | None, 177 Field( 178 description="Optional API root URL override. " 179 "Defaults to Airbyte Cloud. " 180 "Use this to target local or self-hosted deployments.", 181 default=None, 182 ), 183 ] = None, 184 *, 185 ctx: Context, 186) -> list[dict[str, Any]]: 187 """Update the state for a single stream within a connection. 188 189 WARNING: This is a destructive emergency operation. Use with caution. 190 Fetches the current full state, replaces only the specified stream's state, 191 then sends the updated state back. If the stream doesn't exist, it is appended. 192 Uses the safe variant that prevents updates while a sync is running (HTTP 423). 193 """ 194 resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id) 195 if resolved_workspace_id is None: 196 raise ValueError( 197 f"Failed to resolve workspace_id '{workspace_id}' to a concrete workspace ID. " 198 "Use a valid workspace UUID or a supported alias." 199 ) 200 bearer_token, client_id, client_secret = _resolve_cloud_auth(ctx) 201 202 conn = _get_cloud_connection( 203 workspace_id=resolved_workspace_id, 204 connection_id=connection_id, 205 api_root=config_api_root or constants.CLOUD_API_ROOT, 206 bearer_token=bearer_token, 207 client_id=client_id, 208 client_secret=client_secret, 209 ) 210 211 try: 212 stream_state_dict = json.loads(stream_state_json) 213 except json.JSONDecodeError as exc: 214 raise ValueError( 215 "Invalid JSON provided for 'stream_state_json'. " 216 "Please supply a valid JSON object representing the stream state." 217 ) from exc 218 conn.set_stream_state( 219 stream_name=stream_name, 220 state_blob_dict=stream_state_dict, 221 stream_namespace=stream_namespace, 222 ) 223 return conn.dump_raw_state()
Update the state for a single stream within a connection.
WARNING: This is a destructive emergency operation. Use with caution. Fetches the current full state, replaces only the specified stream's state, then sends the updated state back. If the stream doesn't exist, it is appended. Uses the safe variant that prevents updates while a sync is running (HTTP 423).
226@mcp_tool( 227 destructive=True, 228 idempotent=False, 229 open_world=True, 230) 231def reset_stream_state( 232 workspace_id: Annotated[ 233 str | WorkspaceAliasEnum, 234 Field( 235 description="The Airbyte Cloud workspace ID (UUID) or alias. " 236 "Aliases: '@devin-ai-sandbox'.", 237 ), 238 ], 239 connection_id: Annotated[ 240 str, 241 Field(description="The connection ID (UUID) to update state for."), 242 ], 243 stream_name: Annotated[ 244 str, 245 Field(description="The configured stream name whose state should be reset."), 246 ], 247 stream_namespace: Annotated[ 248 str | None, 249 Field( 250 description="Optional stream namespace to identify the stream.", 251 default=None, 252 ), 253 ] = None, 254 config_api_root: Annotated[ 255 str | None, 256 Field( 257 description="Optional API root URL override. " 258 "Defaults to Airbyte Cloud. " 259 "Use this to target local or self-hosted deployments.", 260 default=None, 261 ), 262 ] = None, 263 *, 264 ctx: Context, 265) -> cloud_connection_state.ResetStreamResult: 266 """Reset a configured stream's state so the next sync full-refreshes it. 267 268 WARNING: This is a destructive emergency operation. Use with caution. 269 Uses the safe variant that prevents updates while a sync is running (HTTP 423). 270 Returns `previous_state_backup` in raw Config API format so the state can be restored. 271 """ 272 resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id) 273 if resolved_workspace_id is None: 274 raise ValueError( 275 f"Failed to resolve workspace_id '{workspace_id}' to a concrete workspace ID. " 276 "Use a valid workspace UUID or a supported alias." 277 ) 278 bearer_token, client_id, client_secret = _resolve_cloud_auth(ctx) 279 280 conn = _get_cloud_connection( 281 workspace_id=resolved_workspace_id, 282 connection_id=connection_id, 283 api_root=config_api_root or constants.CLOUD_API_ROOT, 284 bearer_token=bearer_token, 285 client_id=client_id, 286 client_secret=client_secret, 287 ) 288 return cloud_connection_state.reset_stream_state( 289 conn, 290 stream_name=stream_name, 291 stream_namespace=stream_namespace, 292 )
Reset a configured stream's state so the next sync full-refreshes it.
WARNING: This is a destructive emergency operation. Use with caution.
Uses the safe variant that prevents updates while a sync is running (HTTP 423).
Returns previous_state_backup in raw Config API format so the state can be restored.
295@mcp_tool( 296 destructive=True, 297 idempotent=False, 298 open_world=True, 299) 300def update_connection_catalog( 301 workspace_id: Annotated[ 302 str | WorkspaceAliasEnum, 303 Field( 304 description="The Airbyte Cloud workspace ID (UUID) or alias. " 305 "Aliases: '@devin-ai-sandbox'.", 306 ), 307 ], 308 connection_id: Annotated[ 309 str, 310 Field(description="The connection ID (UUID) to update catalog for."), 311 ], 312 configured_catalog_json: Annotated[ 313 str, 314 Field( 315 description="The configured catalog as a JSON string. " 316 "This replaces the entire configured catalog for the connection. " 317 "Tip: Use get_connection_catalog first to see the current format." 318 ), 319 ], 320 config_api_root: Annotated[ 321 str | None, 322 Field( 323 description="Optional API root URL override. " 324 "Defaults to Airbyte Cloud. " 325 "Use this to target local or self-hosted deployments.", 326 default=None, 327 ), 328 ] = None, 329 *, 330 ctx: Context, 331) -> dict[str, Any]: 332 """Replace the configured catalog for an Airbyte connection. 333 334 WARNING: This is a destructive emergency operation. Use with extreme caution. 335 This replaces the entire configured catalog, which controls which streams 336 are synced, their sync modes, primary keys, and cursor fields. 337 Get the current catalog first, modify it, then pass the full catalog back here. 338 """ 339 resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id) 340 if resolved_workspace_id is None: 341 raise ValueError( 342 f"Unable to resolve workspace_id {workspace_id!r} to a concrete workspace ID. " 343 "Ensure you provided a valid UUID or supported alias." 344 ) 345 bearer_token, client_id, client_secret = _resolve_cloud_auth(ctx) 346 347 conn = _get_cloud_connection( 348 workspace_id=resolved_workspace_id, 349 connection_id=connection_id, 350 api_root=config_api_root or constants.CLOUD_API_ROOT, 351 bearer_token=bearer_token, 352 client_id=client_id, 353 client_secret=client_secret, 354 ) 355 356 try: 357 configured_catalog = json.loads(configured_catalog_json) 358 except json.JSONDecodeError as e: 359 raise ValueError( 360 "Invalid JSON for parameter 'configured_catalog_json'. " 361 "Expected a JSON object representing an Airbyte configured catalog." 362 ) from e 363 conn.import_raw_catalog(configured_catalog) 364 result = conn.dump_raw_catalog() 365 if result is None: 366 raise RuntimeError( 367 "Failed to retrieve catalog after import. " 368 f"workspace_id={resolved_workspace_id!r}, connection_id={connection_id!r}" 369 ) 370 return result
Replace the configured catalog for an Airbyte connection.
WARNING: This is a destructive emergency operation. Use with extreme caution. This replaces the entire configured catalog, which controls which streams are synced, their sync modes, primary keys, and cursor fields. Get the current catalog first, modify it, then pass the full catalog back here.
373def register_connection_medic_tools(app: FastMCP) -> None: 374 """Register connection medic tools if AIRBYTE_OPS_MEDIC_MODE is enabled.""" 375 if not _is_medic_mode_enabled(): 376 return 377 378 register_mcp_tools(app, mcp_module=__name__)
Register connection medic tools if AIRBYTE_OPS_MEDIC_MODE is enabled.