airbyte_ops_mcp.mcp.server
Airbyte Admin MCP server implementation.
This module provides the main MCP server for Airbyte admin operations.
The server can run in two modes:
- stdio mode (default): For direct MCP client connections via stdin/stdout
- HTTP mode: For HTTP-based MCP connections, useful for containerized deployments
Environment Variables:
MCP_HTTP_HOST: Host to bind HTTP server to (default: 127.0.0.1) MCP_HTTP_PORT: Port for HTTP server (default: 8082)
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2"""Airbyte Admin MCP server implementation. 3 4This module provides the main MCP server for Airbyte admin operations. 5 6The server can run in two modes: 7- **stdio mode** (default): For direct MCP client connections via stdin/stdout 8- **HTTP mode**: For HTTP-based MCP connections, useful for containerized deployments 9 10Environment Variables: 11 MCP_HTTP_HOST: Host to bind HTTP server to (default: 127.0.0.1) 12 MCP_HTTP_PORT: Port for HTTP server (default: 8082) 13""" 14 15import asyncio 16import os 17import sys 18from pathlib import Path 19 20from airbyte.cloud.auth import resolve_cloud_client_id, resolve_cloud_client_secret 21from dotenv import load_dotenv 22from fastmcp import FastMCP 23from fastmcp_extensions import MCPServerConfigArg, mcp_server 24 25from airbyte_ops_mcp._sentry import init_sentry_tracking 26from airbyte_ops_mcp.constants import ( 27 HEADER_AIRBYTE_CLOUD_CLIENT_ID, 28 HEADER_AIRBYTE_CLOUD_CLIENT_SECRET, 29 MCP_SERVER_NAME, 30 ServerConfigKey, 31) 32from airbyte_ops_mcp.mcp._guidance import MCP_SERVER_INSTRUCTIONS 33from airbyte_ops_mcp.mcp.agent_message_bus import register_message_bus_tools 34from airbyte_ops_mcp.mcp.cloud_connector_versions import ( 35 register_cloud_connector_version_tools, 36) 37from airbyte_ops_mcp.mcp.connection_medic import register_connection_medic_tools 38from airbyte_ops_mcp.mcp.connection_state import register_connection_state_tools 39from airbyte_ops_mcp.mcp.connector_rollout import register_connector_rollout_tools 40from airbyte_ops_mcp.mcp.devin_reminders import register_devin_reminder_tools 41from airbyte_ops_mcp.mcp.devin_secret_request import register_devin_secret_request_tools 42from airbyte_ops_mcp.mcp.gcp_logs import register_gcp_logs_tools 43from airbyte_ops_mcp.mcp.github_actions import register_github_actions_tools 44from airbyte_ops_mcp.mcp.github_repo_ops import register_github_repo_ops_tools 45from airbyte_ops_mcp.mcp.human_in_the_loop import register_human_in_the_loop_tools 46from airbyte_ops_mcp.mcp.people_lookup import register_people_lookup_tools 47from airbyte_ops_mcp.mcp.prerelease import register_prerelease_tools 48from airbyte_ops_mcp.mcp.prod_db_queries import register_prod_db_query_tools 49from airbyte_ops_mcp.mcp.prompts import register_prompts 50from airbyte_ops_mcp.mcp.registry import register_registry_tools 51from airbyte_ops_mcp.mcp.regression_tests import register_regression_tests_tools 52from airbyte_ops_mcp.mcp.session_feedback import register_session_feedback_tools 53from airbyte_ops_mcp.mcp.session_namer import register_session_namer_tools 54from airbyte_ops_mcp.mcp.slack_messaging import register_slack_messaging_tools 55from airbyte_ops_mcp.mcp.tier_lookup import register_tier_lookup_tools 56 57# Default HTTP server configuration 58DEFAULT_HTTP_HOST = "127.0.0.1" 59DEFAULT_HTTP_PORT = 8082 60 61 62def _normalize_bearer_token(value: str) -> str | None: 63 """Extract bearer token from Authorization header value. 64 65 Parses "Bearer <token>" format (case-insensitive prefix). 66 Returns None if the value doesn't have the Bearer prefix. 67 """ 68 if value.lower().startswith("bearer "): 69 token = value[7:].strip() 70 return token if token else None 71 return None 72 73 74# Create the MCP server with built-in server info resource 75app = mcp_server( 76 name=MCP_SERVER_NAME, 77 instructions=MCP_SERVER_INSTRUCTIONS, 78 package_name="airbyte-internal-ops", 79 advertised_properties={ 80 "docs_url": "https://github.com/airbytehq/airbyte-ops-mcp", 81 "release_history_url": "https://github.com/airbytehq/airbyte-ops-mcp/releases", 82 }, 83 server_config_args=[ 84 MCPServerConfigArg( 85 name=ServerConfigKey.BEARER_TOKEN, 86 http_header_key="Authorization", 87 env_var="AIRBYTE_CLOUD_BEARER_TOKEN", 88 normalize_fn=_normalize_bearer_token, 89 required=False, 90 sensitive=True, 91 ), 92 MCPServerConfigArg( 93 name=ServerConfigKey.CLIENT_ID, 94 http_header_key=HEADER_AIRBYTE_CLOUD_CLIENT_ID, 95 default=lambda: str(resolve_cloud_client_id()), 96 required=True, 97 sensitive=True, 98 ), 99 MCPServerConfigArg( 100 name=ServerConfigKey.CLIENT_SECRET, 101 http_header_key=HEADER_AIRBYTE_CLOUD_CLIENT_SECRET, 102 default=lambda: str(resolve_cloud_client_secret()), 103 required=True, 104 sensitive=True, 105 ), 106 ], 107 include_standard_tool_filters=True, 108) 109 110 111def register_server_assets(app: FastMCP) -> None: 112 """Register all server assets (tools, prompts, resources) with the FastMCP app. 113 114 This function registers assets for all domains: 115 - REPO: GitHub repository operations 116 - CLOUD: Cloud connector version management 117 - PROMPTS: Prompt templates for common workflows 118 - REGRESSION_TESTS: Connector regression tests (single-version and comparison) 119 - REGISTRY: Connector registry operations (read/write metadata from GCS) 120 - METADATA: Connector metadata operations (future) 121 - QA: Connector quality assurance (future) 122 - INSIGHTS: Connector analysis and insights (future) 123 124 Note: Server info resource is now built-in via mcp_server() helper. 125 126 Args: 127 app: FastMCP application instance 128 """ 129 register_github_repo_ops_tools(app) 130 register_github_actions_tools(app) 131 register_prerelease_tools(app) 132 register_cloud_connector_version_tools(app) 133 register_connector_rollout_tools(app) 134 register_prod_db_query_tools(app) 135 register_gcp_logs_tools(app) 136 register_prompts(app) 137 register_regression_tests_tools(app) 138 register_registry_tools(app) 139 register_connection_state_tools(app) 140 register_connection_medic_tools(app) 141 register_people_lookup_tools(app) 142 register_human_in_the_loop_tools(app) 143 register_devin_reminder_tools(app) 144 register_message_bus_tools(app) 145 register_session_feedback_tools(app) 146 register_session_namer_tools(app) 147 register_slack_messaging_tools(app) 148 register_devin_secret_request_tools(app) 149 register_tier_lookup_tools(app) 150 151 152register_server_assets(app) 153 154 155def _load_env() -> None: 156 """Load environment variables from .env file if present.""" 157 env_file = Path.cwd() / ".env" 158 if env_file.exists(): 159 load_dotenv(env_file) 160 print(f"Loaded environment from: {env_file}", flush=True, file=sys.stderr) 161 162 163def main() -> None: 164 """Main entry point for the Airbyte Admin MCP server (stdio mode). 165 166 This is the default entry point that runs the server in stdio mode, 167 suitable for direct MCP client connections. 168 """ 169 _load_env() 170 init_sentry_tracking() 171 172 print("=" * 60, flush=True, file=sys.stderr) 173 print("Starting Airbyte Admin MCP server (stdio mode).", file=sys.stderr) 174 try: 175 asyncio.run(app.run_stdio_async(show_banner=False)) 176 except KeyboardInterrupt: 177 print("Airbyte Admin MCP server interrupted by user.", file=sys.stderr) 178 179 print("Airbyte Admin MCP server stopped.", file=sys.stderr) 180 print("=" * 60, flush=True, file=sys.stderr) 181 182 183def _parse_port(port_str: str | None, default: int) -> int: 184 """Parse and validate a port number from string. 185 186 Args: 187 port_str: Port string from environment variable, or None if not set 188 default: Default port to use if port_str is None 189 190 Returns: 191 Validated port number 192 193 Raises: 194 ValueError: If port_str is not a valid integer or out of range 195 """ 196 if port_str is None: 197 return default 198 199 port_str = port_str.strip() 200 if not port_str.isdecimal(): 201 raise ValueError(f"MCP_HTTP_PORT must be a valid integer, got: {port_str!r}") 202 203 port = int(port_str) 204 if not 1 <= port <= 65535: 205 raise ValueError(f"MCP_HTTP_PORT must be between 1 and 65535, got: {port}") 206 207 return port 208 209 210def main_http() -> None: 211 """HTTP entry point for the Airbyte Admin MCP server. 212 213 This entry point runs the server in HTTP mode, suitable for containerized 214 deployments where the server needs to be accessible over HTTP. 215 216 Environment Variables: 217 MCP_HTTP_HOST: Host to bind to (default: 127.0.0.1) 218 MCP_HTTP_PORT: Port to listen on (default: 8082) 219 """ 220 _load_env() 221 init_sentry_tracking() 222 223 host = os.getenv("MCP_HTTP_HOST", DEFAULT_HTTP_HOST) 224 port = _parse_port(os.getenv("MCP_HTTP_PORT"), DEFAULT_HTTP_PORT) 225 226 print("=" * 60, flush=True, file=sys.stderr) 227 print( 228 f"Starting Airbyte Admin MCP server (HTTP mode) on {host}:{port}", 229 file=sys.stderr, 230 ) 231 try: 232 app.run(transport="http", host=host, port=port) 233 except KeyboardInterrupt: 234 print("Airbyte Admin MCP server interrupted by user.", file=sys.stderr) 235 236 print("Airbyte Admin MCP server stopped.", file=sys.stderr) 237 print("=" * 60, flush=True, file=sys.stderr) 238 239 240if __name__ == "__main__": 241 main()
112def register_server_assets(app: FastMCP) -> None: 113 """Register all server assets (tools, prompts, resources) with the FastMCP app. 114 115 This function registers assets for all domains: 116 - REPO: GitHub repository operations 117 - CLOUD: Cloud connector version management 118 - PROMPTS: Prompt templates for common workflows 119 - REGRESSION_TESTS: Connector regression tests (single-version and comparison) 120 - REGISTRY: Connector registry operations (read/write metadata from GCS) 121 - METADATA: Connector metadata operations (future) 122 - QA: Connector quality assurance (future) 123 - INSIGHTS: Connector analysis and insights (future) 124 125 Note: Server info resource is now built-in via mcp_server() helper. 126 127 Args: 128 app: FastMCP application instance 129 """ 130 register_github_repo_ops_tools(app) 131 register_github_actions_tools(app) 132 register_prerelease_tools(app) 133 register_cloud_connector_version_tools(app) 134 register_connector_rollout_tools(app) 135 register_prod_db_query_tools(app) 136 register_gcp_logs_tools(app) 137 register_prompts(app) 138 register_regression_tests_tools(app) 139 register_registry_tools(app) 140 register_connection_state_tools(app) 141 register_connection_medic_tools(app) 142 register_people_lookup_tools(app) 143 register_human_in_the_loop_tools(app) 144 register_devin_reminder_tools(app) 145 register_message_bus_tools(app) 146 register_session_feedback_tools(app) 147 register_session_namer_tools(app) 148 register_slack_messaging_tools(app) 149 register_devin_secret_request_tools(app) 150 register_tier_lookup_tools(app)
Register all server assets (tools, prompts, resources) with the FastMCP app.
This function registers assets for all domains:
- REPO: GitHub repository operations
- CLOUD: Cloud connector version management
- PROMPTS: Prompt templates for common workflows
- REGRESSION_TESTS: Connector regression tests (single-version and comparison)
- REGISTRY: Connector registry operations (read/write metadata from GCS)
- METADATA: Connector metadata operations (future)
- QA: Connector quality assurance (future)
- INSIGHTS: Connector analysis and insights (future)
Note: Server info resource is now built-in via mcp_server() helper.
Arguments:
- app: FastMCP application instance
164def main() -> None: 165 """Main entry point for the Airbyte Admin MCP server (stdio mode). 166 167 This is the default entry point that runs the server in stdio mode, 168 suitable for direct MCP client connections. 169 """ 170 _load_env() 171 init_sentry_tracking() 172 173 print("=" * 60, flush=True, file=sys.stderr) 174 print("Starting Airbyte Admin MCP server (stdio mode).", file=sys.stderr) 175 try: 176 asyncio.run(app.run_stdio_async(show_banner=False)) 177 except KeyboardInterrupt: 178 print("Airbyte Admin MCP server interrupted by user.", file=sys.stderr) 179 180 print("Airbyte Admin MCP server stopped.", file=sys.stderr) 181 print("=" * 60, flush=True, file=sys.stderr)
Main entry point for the Airbyte Admin MCP server (stdio mode).
This is the default entry point that runs the server in stdio mode, suitable for direct MCP client connections.
211def main_http() -> None: 212 """HTTP entry point for the Airbyte Admin MCP server. 213 214 This entry point runs the server in HTTP mode, suitable for containerized 215 deployments where the server needs to be accessible over HTTP. 216 217 Environment Variables: 218 MCP_HTTP_HOST: Host to bind to (default: 127.0.0.1) 219 MCP_HTTP_PORT: Port to listen on (default: 8082) 220 """ 221 _load_env() 222 init_sentry_tracking() 223 224 host = os.getenv("MCP_HTTP_HOST", DEFAULT_HTTP_HOST) 225 port = _parse_port(os.getenv("MCP_HTTP_PORT"), DEFAULT_HTTP_PORT) 226 227 print("=" * 60, flush=True, file=sys.stderr) 228 print( 229 f"Starting Airbyte Admin MCP server (HTTP mode) on {host}:{port}", 230 file=sys.stderr, 231 ) 232 try: 233 app.run(transport="http", host=host, port=port) 234 except KeyboardInterrupt: 235 print("Airbyte Admin MCP server interrupted by user.", file=sys.stderr) 236 237 print("Airbyte Admin MCP server stopped.", file=sys.stderr) 238 print("=" * 60, flush=True, file=sys.stderr)
HTTP entry point for the Airbyte Admin MCP server.
This entry point runs the server in HTTP mode, suitable for containerized deployments where the server needs to be accessible over HTTP.
Environment Variables:
MCP_HTTP_HOST: Host to bind to (default: 127.0.0.1) MCP_HTTP_PORT: Port to listen on (default: 8082)