airbyte_ops_mcp.mcp.server
Airbyte Admin MCP server implementation.
This module provides the main MCP server for Airbyte admin operations.
The server can run in two modes:
- stdio mode (default): For direct MCP client connections via stdin/stdout
- HTTP mode: For HTTP-based MCP connections. When
OIDC_CONFIG_URL,OIDC_CLIENT_ID, andOIDC_CLIENT_SECRETare all set, enables Keycloak OIDC authentication viaOIDCProxy.
HTTP mode environment variables:
MCP_SERVER_URL: Public base URL for the MCP server (also used for OIDC redirect callbacks). Defaults to
http://localhost:8080. OIDC_CONFIG_URL: Keycloak OIDC discovery URL (enables auth when set) OIDC_CLIENT_ID: OAuth client ID for Keycloak OIDC_CLIENT_SECRET: OAuth client secret for Keycloak
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2"""Airbyte Admin MCP server implementation. 3 4This module provides the main MCP server for Airbyte admin operations. 5 6The server can run in two modes: 7- **stdio mode** (default): For direct MCP client connections via stdin/stdout 8- **HTTP mode**: For HTTP-based MCP connections. When `OIDC_CONFIG_URL`, 9 `OIDC_CLIENT_ID`, and `OIDC_CLIENT_SECRET` are all set, enables Keycloak 10 OIDC authentication via `OIDCProxy`. 11 12HTTP mode environment variables: 13 MCP_SERVER_URL: Public base URL for the MCP server (also used for OIDC 14 redirect callbacks). Defaults to `http://localhost:8080`. 15 OIDC_CONFIG_URL: Keycloak OIDC discovery URL (enables auth when set) 16 OIDC_CLIENT_ID: OAuth client ID for Keycloak 17 OIDC_CLIENT_SECRET: OAuth client secret for Keycloak 18""" 19 20import asyncio 21import logging 22import os 23import sys 24from pathlib import Path 25from urllib.parse import urlparse 26 27from airbyte.cloud.auth import resolve_cloud_client_id, resolve_cloud_client_secret 28from dotenv import load_dotenv 29from fastmcp import FastMCP 30from fastmcp.server.auth.oidc_proxy import OIDCProxy 31from fastmcp.server.dependencies import get_access_token 32from fastmcp_extensions import ( 33 MCPServerConfigArg, 34 ToolCallTelemetryMiddleware, 35 mcp_server, 36) 37from starlette.requests import Request 38from starlette.responses import JSONResponse 39 40from airbyte_ops_mcp._sentry import _SENTRY_DSN, init_sentry_tracking 41from airbyte_ops_mcp.constants import ( 42 HEADER_AIRBYTE_CLOUD_CLIENT_ID, 43 HEADER_AIRBYTE_CLOUD_CLIENT_SECRET, 44 MCP_SERVER_NAME, 45 ServerConfigKey, 46) 47from airbyte_ops_mcp.mcp._guidance import MCP_SERVER_INSTRUCTIONS 48from airbyte_ops_mcp.mcp.agent_message_bus import register_message_bus_tools 49from airbyte_ops_mcp.mcp.cloud_connector_versions import ( 50 register_cloud_connector_version_tools, 51) 52from airbyte_ops_mcp.mcp.connection_medic import register_connection_medic_tools 53from airbyte_ops_mcp.mcp.connection_state import register_connection_state_tools 54from airbyte_ops_mcp.mcp.connector_rollout import register_connector_rollout_tools 55from airbyte_ops_mcp.mcp.devin_reminders import register_devin_reminder_tools 56from airbyte_ops_mcp.mcp.devin_secret_request import register_devin_secret_request_tools 57from airbyte_ops_mcp.mcp.gcp_logs import register_gcp_logs_tools 58from airbyte_ops_mcp.mcp.github_actions import register_github_actions_tools 59from airbyte_ops_mcp.mcp.github_repo_ops import register_github_repo_ops_tools 60from airbyte_ops_mcp.mcp.human_in_the_loop import register_human_in_the_loop_tools 61from airbyte_ops_mcp.mcp.organization_agentic_flag import ( 62 register_organization_agentic_flag_tools, 63) 64from airbyte_ops_mcp.mcp.organization_payment_config import ( 65 register_organization_payment_config_tools, 66) 67from airbyte_ops_mcp.mcp.people_lookup import register_people_lookup_tools 68from airbyte_ops_mcp.mcp.prerelease import register_prerelease_tools 69from airbyte_ops_mcp.mcp.prod_db_queries import register_prod_db_query_tools 70from airbyte_ops_mcp.mcp.prompts import register_prompts 71from airbyte_ops_mcp.mcp.registry import register_registry_tools 72from airbyte_ops_mcp.mcp.regression_tests import register_regression_tests_tools 73from airbyte_ops_mcp.mcp.release_block import register_release_block_tools 74from airbyte_ops_mcp.mcp.session_feedback import register_session_feedback_tools 75from airbyte_ops_mcp.mcp.session_namer import register_session_namer_tools 76from airbyte_ops_mcp.mcp.slack_messaging import register_slack_messaging_tools 77from airbyte_ops_mcp.mcp.tier_lookup import register_tier_lookup_tools 78from airbyte_ops_mcp.telemetry import _DEFAULT_SEGMENT_WRITE_KEY 79 80logger = logging.getLogger(__name__) 81 82# Default HTTP server configuration 83DEFAULT_HTTP_HOST = "0.0.0.0" 84DEFAULT_HTTP_PORT = 8080 85 86# OIDC environment variable names 87OIDC_CONFIG_URL_ENV = "OIDC_CONFIG_URL" 88OIDC_CLIENT_ID_ENV = "OIDC_CLIENT_ID" 89OIDC_CLIENT_SECRET_ENV = "OIDC_CLIENT_SECRET" 90MCP_SERVER_URL_ENV = "MCP_SERVER_URL" 91 92 93def _normalize_bearer_token(value: str) -> str | None: 94 """Extract bearer token from Authorization header value. 95 96 Parses "Bearer <token>" format (case-insensitive prefix). 97 Returns None if the value doesn't have the Bearer prefix. 98 """ 99 if value.lower().startswith("bearer "): 100 token = value[7:].strip() 101 return token if token else None 102 return None 103 104 105def _resolve_oidc_bearer_token() -> str: 106 """Resolve the upstream bearer token from OIDC auth if available. 107 108 When the server uses OIDCProxy (Keycloak/Okta), the user's upstream 109 access token is stored by FastMCP after the OAuth flow completes. 110 This function retrieves it so Cloud API tools can use the user's 111 identity for delegated access. 112 113 Returns empty string when no OIDC session is active (e.g. stdio mode). 114 """ 115 access_token = get_access_token() 116 if access_token and access_token.token: 117 return access_token.token 118 return "" 119 120 121def _create_oidc_auth() -> OIDCProxy | None: 122 """Create an `OIDCProxy` auth provider when OIDC env vars are configured. 123 124 When `OIDC_CONFIG_URL`, `OIDC_CLIENT_ID`, and `OIDC_CLIENT_SECRET` are all 125 set, returns an `OIDCProxy` that handles the Keycloak Authorization Code + 126 PKCE flow for browser-based MCP clients. When any is empty, returns `None` 127 (no OIDC auth — the server falls back to header-based credential resolution). 128 """ 129 config_url = os.getenv(OIDC_CONFIG_URL_ENV, "") 130 client_id = os.getenv(OIDC_CLIENT_ID_ENV, "") 131 client_secret = os.getenv(OIDC_CLIENT_SECRET_ENV, "") 132 133 if not config_url or not client_id or not client_secret: 134 return None 135 136 server_url = os.getenv( 137 MCP_SERVER_URL_ENV, 138 f"http://localhost:{DEFAULT_HTTP_PORT}", 139 ) 140 141 logger.info( 142 "OIDC auth enabled (issuer=%s, client_id=%s, base_url=%s)", 143 config_url, 144 client_id, 145 server_url, 146 ) 147 return OIDCProxy( 148 config_url=config_url, 149 client_id=client_id, 150 client_secret=client_secret, 151 base_url=server_url, 152 ) 153 154 155# Create the MCP server with built-in server info resource 156app = mcp_server( 157 name=MCP_SERVER_NAME, 158 instructions=MCP_SERVER_INSTRUCTIONS, 159 package_name="airbyte-internal-ops", 160 advertised_properties={ 161 "docs_url": "https://github.com/airbytehq/airbyte-ops-mcp", 162 "release_history_url": "https://github.com/airbytehq/airbyte-ops-mcp/releases", 163 }, 164 server_config_args=[ 165 MCPServerConfigArg( 166 name=ServerConfigKey.BEARER_TOKEN, 167 http_header_key="Authorization", 168 env_var="AIRBYTE_CLOUD_BEARER_TOKEN", 169 normalize_fn=_normalize_bearer_token, 170 default=_resolve_oidc_bearer_token, 171 required=False, 172 sensitive=True, 173 ), 174 MCPServerConfigArg( 175 name=ServerConfigKey.CLIENT_ID, 176 http_header_key=HEADER_AIRBYTE_CLOUD_CLIENT_ID, 177 default=lambda: str(resolve_cloud_client_id()), 178 required=True, 179 sensitive=True, 180 ), 181 MCPServerConfigArg( 182 name=ServerConfigKey.CLIENT_SECRET, 183 http_header_key=HEADER_AIRBYTE_CLOUD_CLIENT_SECRET, 184 default=lambda: str(resolve_cloud_client_secret()), 185 required=True, 186 sensitive=True, 187 ), 188 ], 189 include_standard_tool_filters=True, 190 auth=_create_oidc_auth(), 191) 192 193 194def register_server_assets(app: FastMCP) -> None: 195 """Register all server assets (tools, prompts, resources) with the FastMCP app. 196 197 This function registers assets for all domains: 198 - REPO: GitHub repository operations 199 - CLOUD: Cloud connector version management 200 - PROMPTS: Prompt templates for common workflows 201 - REGRESSION_TESTS: Connector regression tests (single-version and comparison) 202 - REGISTRY: Connector registry operations (read/write metadata from GCS) 203 - METADATA: Connector metadata operations (future) 204 - QA: Connector quality assurance (future) 205 - INSIGHTS: Connector analysis and insights (future) 206 207 Tools annotated with `requires_client_filesystem=True` are automatically 208 hidden when `MCP_NO_CLIENT_FILESYSTEM=1` via the standard tool filter. 209 210 Note: Server info resource is now built-in via `mcp_server()` helper. 211 212 Args: 213 app: FastMCP application instance 214 """ 215 register_github_repo_ops_tools(app) 216 register_github_actions_tools(app) 217 register_prerelease_tools(app) 218 register_cloud_connector_version_tools(app) 219 register_connector_rollout_tools(app) 220 register_prod_db_query_tools(app) 221 register_gcp_logs_tools(app) 222 register_prompts(app) 223 register_regression_tests_tools(app) 224 register_registry_tools(app) 225 register_connection_state_tools(app) 226 register_connection_medic_tools(app) 227 register_organization_agentic_flag_tools(app) 228 register_organization_payment_config_tools(app) 229 register_people_lookup_tools(app) 230 register_human_in_the_loop_tools(app) 231 register_devin_reminder_tools(app) 232 register_message_bus_tools(app) 233 register_session_feedback_tools(app) 234 register_session_namer_tools(app) 235 register_slack_messaging_tools(app) 236 register_devin_secret_request_tools(app) 237 register_tier_lookup_tools(app) 238 register_release_block_tools(app) 239 240 241register_server_assets(app) 242app.add_middleware( 243 ToolCallTelemetryMiddleware( 244 package_name="airbyte-internal-ops", 245 sentry_dsn=_SENTRY_DSN, 246 segment_write_key=_DEFAULT_SEGMENT_WRITE_KEY, 247 ) 248) 249 250 251@app.custom_route("/health", methods=["GET"]) 252async def health_check(request: Request) -> JSONResponse: 253 """Health check endpoint for Cloud Run liveness/readiness probes.""" 254 return JSONResponse({"status": "ok"}) 255 256 257def _load_env() -> None: 258 """Load environment variables from .env file if present.""" 259 env_file = Path.cwd() / ".env" 260 if env_file.exists(): 261 load_dotenv(env_file) 262 print(f"Loaded environment from: {env_file}", flush=True, file=sys.stderr) 263 264 265def main() -> None: 266 """Main entry point for the Airbyte Admin MCP server (stdio mode). 267 268 This is the default entry point that runs the server in stdio mode, 269 suitable for direct MCP client connections. 270 """ 271 _load_env() 272 init_sentry_tracking() 273 274 print("=" * 60, flush=True, file=sys.stderr) 275 print("Starting Airbyte Admin MCP server (stdio mode).", file=sys.stderr) 276 try: 277 asyncio.run(app.run_stdio_async(show_banner=False)) 278 except KeyboardInterrupt: 279 print("Airbyte Admin MCP server interrupted by user.", file=sys.stderr) 280 281 print("Airbyte Admin MCP server stopped.", file=sys.stderr) 282 print("=" * 60, flush=True, file=sys.stderr) 283 284 285def main_http() -> None: 286 """HTTP entry point for the Airbyte Admin MCP server. 287 288 Runs the server in HTTP mode. When OIDC env vars are configured, 289 Keycloak authentication is enabled automatically. 290 """ 291 _load_env() 292 init_sentry_tracking() 293 294 host = DEFAULT_HTTP_HOST 295 port = DEFAULT_HTTP_PORT 296 297 # When deployed behind a path-stripping LB (MCP_SERVER_URL has a path 298 # component like /ops-mcp), serve the MCP endpoint at root so the 299 # public URL is just the base path. Otherwise keep the FastMCP default. 300 server_url = os.getenv( 301 MCP_SERVER_URL_ENV, 302 f"http://localhost:{DEFAULT_HTTP_PORT}", 303 ) 304 mcp_path = "/" if urlparse(server_url).path.strip("/") else "/mcp" 305 306 print("=" * 60, flush=True, file=sys.stderr) 307 print( 308 f"Starting Airbyte Admin MCP server (HTTP mode) on {host}:{port}" 309 f" (mcp_path={mcp_path!r})", 310 file=sys.stderr, 311 ) 312 try: 313 app.run( 314 transport="streamable-http", 315 host=host, 316 port=port, 317 path=mcp_path, 318 stateless_http=True, 319 ) 320 except KeyboardInterrupt: 321 print("Airbyte Admin MCP server interrupted by user.", file=sys.stderr) 322 323 print("Airbyte Admin MCP server stopped.", file=sys.stderr) 324 print("=" * 60, flush=True, file=sys.stderr) 325 326 327if __name__ == "__main__": 328 main()
195def register_server_assets(app: FastMCP) -> None: 196 """Register all server assets (tools, prompts, resources) with the FastMCP app. 197 198 This function registers assets for all domains: 199 - REPO: GitHub repository operations 200 - CLOUD: Cloud connector version management 201 - PROMPTS: Prompt templates for common workflows 202 - REGRESSION_TESTS: Connector regression tests (single-version and comparison) 203 - REGISTRY: Connector registry operations (read/write metadata from GCS) 204 - METADATA: Connector metadata operations (future) 205 - QA: Connector quality assurance (future) 206 - INSIGHTS: Connector analysis and insights (future) 207 208 Tools annotated with `requires_client_filesystem=True` are automatically 209 hidden when `MCP_NO_CLIENT_FILESYSTEM=1` via the standard tool filter. 210 211 Note: Server info resource is now built-in via `mcp_server()` helper. 212 213 Args: 214 app: FastMCP application instance 215 """ 216 register_github_repo_ops_tools(app) 217 register_github_actions_tools(app) 218 register_prerelease_tools(app) 219 register_cloud_connector_version_tools(app) 220 register_connector_rollout_tools(app) 221 register_prod_db_query_tools(app) 222 register_gcp_logs_tools(app) 223 register_prompts(app) 224 register_regression_tests_tools(app) 225 register_registry_tools(app) 226 register_connection_state_tools(app) 227 register_connection_medic_tools(app) 228 register_organization_agentic_flag_tools(app) 229 register_organization_payment_config_tools(app) 230 register_people_lookup_tools(app) 231 register_human_in_the_loop_tools(app) 232 register_devin_reminder_tools(app) 233 register_message_bus_tools(app) 234 register_session_feedback_tools(app) 235 register_session_namer_tools(app) 236 register_slack_messaging_tools(app) 237 register_devin_secret_request_tools(app) 238 register_tier_lookup_tools(app) 239 register_release_block_tools(app)
Register all server assets (tools, prompts, resources) with the FastMCP app.
This function registers assets for all domains:
- REPO: GitHub repository operations
- CLOUD: Cloud connector version management
- PROMPTS: Prompt templates for common workflows
- REGRESSION_TESTS: Connector regression tests (single-version and comparison)
- REGISTRY: Connector registry operations (read/write metadata from GCS)
- METADATA: Connector metadata operations (future)
- QA: Connector quality assurance (future)
- INSIGHTS: Connector analysis and insights (future)
Tools annotated with requires_client_filesystem=True are automatically
hidden when MCP_NO_CLIENT_FILESYSTEM=1 via the standard tool filter.
Note: Server info resource is now built-in via mcp_server() helper.
Arguments:
- app: FastMCP application instance
252@app.custom_route("/health", methods=["GET"]) 253async def health_check(request: Request) -> JSONResponse: 254 """Health check endpoint for Cloud Run liveness/readiness probes.""" 255 return JSONResponse({"status": "ok"})
Health check endpoint for Cloud Run liveness/readiness probes.
266def main() -> None: 267 """Main entry point for the Airbyte Admin MCP server (stdio mode). 268 269 This is the default entry point that runs the server in stdio mode, 270 suitable for direct MCP client connections. 271 """ 272 _load_env() 273 init_sentry_tracking() 274 275 print("=" * 60, flush=True, file=sys.stderr) 276 print("Starting Airbyte Admin MCP server (stdio mode).", file=sys.stderr) 277 try: 278 asyncio.run(app.run_stdio_async(show_banner=False)) 279 except KeyboardInterrupt: 280 print("Airbyte Admin MCP server interrupted by user.", file=sys.stderr) 281 282 print("Airbyte Admin MCP server stopped.", file=sys.stderr) 283 print("=" * 60, flush=True, file=sys.stderr)
Main entry point for the Airbyte Admin MCP server (stdio mode).
This is the default entry point that runs the server in stdio mode, suitable for direct MCP client connections.
286def main_http() -> None: 287 """HTTP entry point for the Airbyte Admin MCP server. 288 289 Runs the server in HTTP mode. When OIDC env vars are configured, 290 Keycloak authentication is enabled automatically. 291 """ 292 _load_env() 293 init_sentry_tracking() 294 295 host = DEFAULT_HTTP_HOST 296 port = DEFAULT_HTTP_PORT 297 298 # When deployed behind a path-stripping LB (MCP_SERVER_URL has a path 299 # component like /ops-mcp), serve the MCP endpoint at root so the 300 # public URL is just the base path. Otherwise keep the FastMCP default. 301 server_url = os.getenv( 302 MCP_SERVER_URL_ENV, 303 f"http://localhost:{DEFAULT_HTTP_PORT}", 304 ) 305 mcp_path = "/" if urlparse(server_url).path.strip("/") else "/mcp" 306 307 print("=" * 60, flush=True, file=sys.stderr) 308 print( 309 f"Starting Airbyte Admin MCP server (HTTP mode) on {host}:{port}" 310 f" (mcp_path={mcp_path!r})", 311 file=sys.stderr, 312 ) 313 try: 314 app.run( 315 transport="streamable-http", 316 host=host, 317 port=port, 318 path=mcp_path, 319 stateless_http=True, 320 ) 321 except KeyboardInterrupt: 322 print("Airbyte Admin MCP server interrupted by user.", file=sys.stderr) 323 324 print("Airbyte Admin MCP server stopped.", file=sys.stderr) 325 print("=" * 60, flush=True, file=sys.stderr)
HTTP entry point for the Airbyte Admin MCP server.
Runs the server in HTTP mode. When OIDC env vars are configured, Keycloak authentication is enabled automatically.