airbyte_ops_mcp.mcp.server
Airbyte Admin MCP server implementation.
This module provides the main MCP server for Airbyte admin operations.
The server can run in two modes:
- stdio mode (default): For direct MCP client connections via stdin/stdout
- HTTP mode: For HTTP-based MCP connections, useful for containerized deployments
Environment Variables:
MCP_HTTP_HOST: Host to bind HTTP server to (default: 127.0.0.1) MCP_HTTP_PORT: Port for HTTP server (default: 8082)
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2"""Airbyte Admin MCP server implementation. 3 4This module provides the main MCP server for Airbyte admin operations. 5 6The server can run in two modes: 7- **stdio mode** (default): For direct MCP client connections via stdin/stdout 8- **HTTP mode**: For HTTP-based MCP connections, useful for containerized deployments 9 10Environment Variables: 11 MCP_HTTP_HOST: Host to bind HTTP server to (default: 127.0.0.1) 12 MCP_HTTP_PORT: Port for HTTP server (default: 8082) 13""" 14 15import asyncio 16import os 17import sys 18from pathlib import Path 19 20from airbyte.cloud.auth import resolve_cloud_client_id, resolve_cloud_client_secret 21from dotenv import load_dotenv 22from fastmcp import FastMCP 23from fastmcp_extensions import MCPServerConfigArg, mcp_server 24 25from airbyte_ops_mcp._sentry import init_sentry_tracking 26from airbyte_ops_mcp.constants import ( 27 HEADER_AIRBYTE_CLOUD_CLIENT_ID, 28 HEADER_AIRBYTE_CLOUD_CLIENT_SECRET, 29 MCP_SERVER_NAME, 30 ServerConfigKey, 31) 32from airbyte_ops_mcp.mcp._guidance import MCP_SERVER_INSTRUCTIONS 33from airbyte_ops_mcp.mcp.agent_message_bus import register_message_bus_tools 34from airbyte_ops_mcp.mcp.cloud_connector_versions import ( 35 register_cloud_connector_version_tools, 36) 37from airbyte_ops_mcp.mcp.connection_medic import register_connection_medic_tools 38from airbyte_ops_mcp.mcp.connection_state import register_connection_state_tools 39from airbyte_ops_mcp.mcp.connector_rollout import register_connector_rollout_tools 40from airbyte_ops_mcp.mcp.devin_reminders import register_devin_reminder_tools 41from airbyte_ops_mcp.mcp.devin_secret_request import register_devin_secret_request_tools 42from airbyte_ops_mcp.mcp.gcp_logs import register_gcp_logs_tools 43from airbyte_ops_mcp.mcp.github_actions import register_github_actions_tools 44from airbyte_ops_mcp.mcp.github_repo_ops import register_github_repo_ops_tools 45from airbyte_ops_mcp.mcp.human_in_the_loop import register_human_in_the_loop_tools 46from airbyte_ops_mcp.mcp.organization_payment_config import ( 47 register_organization_payment_config_tools, 48) 49from airbyte_ops_mcp.mcp.people_lookup import register_people_lookup_tools 50from airbyte_ops_mcp.mcp.prerelease import register_prerelease_tools 51from airbyte_ops_mcp.mcp.prod_db_queries import register_prod_db_query_tools 52from airbyte_ops_mcp.mcp.prompts import register_prompts 53from airbyte_ops_mcp.mcp.registry import register_registry_tools 54from airbyte_ops_mcp.mcp.regression_tests import register_regression_tests_tools 55from airbyte_ops_mcp.mcp.release_block import register_release_block_tools 56from airbyte_ops_mcp.mcp.session_feedback import register_session_feedback_tools 57from airbyte_ops_mcp.mcp.session_namer import register_session_namer_tools 58from airbyte_ops_mcp.mcp.slack_messaging import register_slack_messaging_tools 59from airbyte_ops_mcp.mcp.tier_lookup import register_tier_lookup_tools 60 61# Default HTTP server configuration 62DEFAULT_HTTP_HOST = "127.0.0.1" 63DEFAULT_HTTP_PORT = 8082 64 65 66def _normalize_bearer_token(value: str) -> str | None: 67 """Extract bearer token from Authorization header value. 68 69 Parses "Bearer <token>" format (case-insensitive prefix). 70 Returns None if the value doesn't have the Bearer prefix. 71 """ 72 if value.lower().startswith("bearer "): 73 token = value[7:].strip() 74 return token if token else None 75 return None 76 77 78# Create the MCP server with built-in server info resource 79app = mcp_server( 80 name=MCP_SERVER_NAME, 81 instructions=MCP_SERVER_INSTRUCTIONS, 82 package_name="airbyte-internal-ops", 83 advertised_properties={ 84 "docs_url": "https://github.com/airbytehq/airbyte-ops-mcp", 85 "release_history_url": "https://github.com/airbytehq/airbyte-ops-mcp/releases", 86 }, 87 server_config_args=[ 88 MCPServerConfigArg( 89 name=ServerConfigKey.BEARER_TOKEN, 90 http_header_key="Authorization", 91 env_var="AIRBYTE_CLOUD_BEARER_TOKEN", 92 normalize_fn=_normalize_bearer_token, 93 required=False, 94 sensitive=True, 95 ), 96 MCPServerConfigArg( 97 name=ServerConfigKey.CLIENT_ID, 98 http_header_key=HEADER_AIRBYTE_CLOUD_CLIENT_ID, 99 default=lambda: str(resolve_cloud_client_id()), 100 required=True, 101 sensitive=True, 102 ), 103 MCPServerConfigArg( 104 name=ServerConfigKey.CLIENT_SECRET, 105 http_header_key=HEADER_AIRBYTE_CLOUD_CLIENT_SECRET, 106 default=lambda: str(resolve_cloud_client_secret()), 107 required=True, 108 sensitive=True, 109 ), 110 ], 111 include_standard_tool_filters=True, 112) 113 114 115def register_server_assets(app: FastMCP) -> None: 116 """Register all server assets (tools, prompts, resources) with the FastMCP app. 117 118 This function registers assets for all domains: 119 - REPO: GitHub repository operations 120 - CLOUD: Cloud connector version management 121 - PROMPTS: Prompt templates for common workflows 122 - REGRESSION_TESTS: Connector regression tests (single-version and comparison) 123 - REGISTRY: Connector registry operations (read/write metadata from GCS) 124 - METADATA: Connector metadata operations (future) 125 - QA: Connector quality assurance (future) 126 - INSIGHTS: Connector analysis and insights (future) 127 128 Note: Server info resource is now built-in via mcp_server() helper. 129 130 Args: 131 app: FastMCP application instance 132 """ 133 register_github_repo_ops_tools(app) 134 register_github_actions_tools(app) 135 register_prerelease_tools(app) 136 register_cloud_connector_version_tools(app) 137 register_connector_rollout_tools(app) 138 register_prod_db_query_tools(app) 139 register_gcp_logs_tools(app) 140 register_prompts(app) 141 register_regression_tests_tools(app) 142 register_registry_tools(app) 143 register_connection_state_tools(app) 144 register_connection_medic_tools(app) 145 register_organization_payment_config_tools(app) 146 register_people_lookup_tools(app) 147 register_human_in_the_loop_tools(app) 148 register_devin_reminder_tools(app) 149 register_message_bus_tools(app) 150 register_session_feedback_tools(app) 151 register_session_namer_tools(app) 152 register_slack_messaging_tools(app) 153 register_devin_secret_request_tools(app) 154 register_tier_lookup_tools(app) 155 register_release_block_tools(app) 156 157 158register_server_assets(app) 159 160 161def _load_env() -> None: 162 """Load environment variables from .env file if present.""" 163 env_file = Path.cwd() / ".env" 164 if env_file.exists(): 165 load_dotenv(env_file) 166 print(f"Loaded environment from: {env_file}", flush=True, file=sys.stderr) 167 168 169def main() -> None: 170 """Main entry point for the Airbyte Admin MCP server (stdio mode). 171 172 This is the default entry point that runs the server in stdio mode, 173 suitable for direct MCP client connections. 174 """ 175 _load_env() 176 init_sentry_tracking() 177 178 print("=" * 60, flush=True, file=sys.stderr) 179 print("Starting Airbyte Admin MCP server (stdio mode).", file=sys.stderr) 180 try: 181 asyncio.run(app.run_stdio_async(show_banner=False)) 182 except KeyboardInterrupt: 183 print("Airbyte Admin MCP server interrupted by user.", file=sys.stderr) 184 185 print("Airbyte Admin MCP server stopped.", file=sys.stderr) 186 print("=" * 60, flush=True, file=sys.stderr) 187 188 189def _parse_port(port_str: str | None, default: int) -> int: 190 """Parse and validate a port number from string. 191 192 Args: 193 port_str: Port string from environment variable, or None if not set 194 default: Default port to use if port_str is None 195 196 Returns: 197 Validated port number 198 199 Raises: 200 ValueError: If port_str is not a valid integer or out of range 201 """ 202 if port_str is None: 203 return default 204 205 port_str = port_str.strip() 206 if not port_str.isdecimal(): 207 raise ValueError(f"MCP_HTTP_PORT must be a valid integer, got: {port_str!r}") 208 209 port = int(port_str) 210 if not 1 <= port <= 65535: 211 raise ValueError(f"MCP_HTTP_PORT must be between 1 and 65535, got: {port}") 212 213 return port 214 215 216def main_http() -> None: 217 """HTTP entry point for the Airbyte Admin MCP server. 218 219 This entry point runs the server in HTTP mode, suitable for containerized 220 deployments where the server needs to be accessible over HTTP. 221 222 Environment Variables: 223 MCP_HTTP_HOST: Host to bind to (default: 127.0.0.1) 224 MCP_HTTP_PORT: Port to listen on (default: 8082) 225 """ 226 _load_env() 227 init_sentry_tracking() 228 229 host = os.getenv("MCP_HTTP_HOST", DEFAULT_HTTP_HOST) 230 port = _parse_port(os.getenv("MCP_HTTP_PORT"), DEFAULT_HTTP_PORT) 231 232 print("=" * 60, flush=True, file=sys.stderr) 233 print( 234 f"Starting Airbyte Admin MCP server (HTTP mode) on {host}:{port}", 235 file=sys.stderr, 236 ) 237 try: 238 app.run(transport="http", host=host, port=port) 239 except KeyboardInterrupt: 240 print("Airbyte Admin MCP server interrupted by user.", file=sys.stderr) 241 242 print("Airbyte Admin MCP server stopped.", file=sys.stderr) 243 print("=" * 60, flush=True, file=sys.stderr) 244 245 246if __name__ == "__main__": 247 main()
116def register_server_assets(app: FastMCP) -> None: 117 """Register all server assets (tools, prompts, resources) with the FastMCP app. 118 119 This function registers assets for all domains: 120 - REPO: GitHub repository operations 121 - CLOUD: Cloud connector version management 122 - PROMPTS: Prompt templates for common workflows 123 - REGRESSION_TESTS: Connector regression tests (single-version and comparison) 124 - REGISTRY: Connector registry operations (read/write metadata from GCS) 125 - METADATA: Connector metadata operations (future) 126 - QA: Connector quality assurance (future) 127 - INSIGHTS: Connector analysis and insights (future) 128 129 Note: Server info resource is now built-in via mcp_server() helper. 130 131 Args: 132 app: FastMCP application instance 133 """ 134 register_github_repo_ops_tools(app) 135 register_github_actions_tools(app) 136 register_prerelease_tools(app) 137 register_cloud_connector_version_tools(app) 138 register_connector_rollout_tools(app) 139 register_prod_db_query_tools(app) 140 register_gcp_logs_tools(app) 141 register_prompts(app) 142 register_regression_tests_tools(app) 143 register_registry_tools(app) 144 register_connection_state_tools(app) 145 register_connection_medic_tools(app) 146 register_organization_payment_config_tools(app) 147 register_people_lookup_tools(app) 148 register_human_in_the_loop_tools(app) 149 register_devin_reminder_tools(app) 150 register_message_bus_tools(app) 151 register_session_feedback_tools(app) 152 register_session_namer_tools(app) 153 register_slack_messaging_tools(app) 154 register_devin_secret_request_tools(app) 155 register_tier_lookup_tools(app) 156 register_release_block_tools(app)
Register all server assets (tools, prompts, resources) with the FastMCP app.
This function registers assets for all domains:
- REPO: GitHub repository operations
- CLOUD: Cloud connector version management
- PROMPTS: Prompt templates for common workflows
- REGRESSION_TESTS: Connector regression tests (single-version and comparison)
- REGISTRY: Connector registry operations (read/write metadata from GCS)
- METADATA: Connector metadata operations (future)
- QA: Connector quality assurance (future)
- INSIGHTS: Connector analysis and insights (future)
Note: Server info resource is now built-in via mcp_server() helper.
Arguments:
- app: FastMCP application instance
170def main() -> None: 171 """Main entry point for the Airbyte Admin MCP server (stdio mode). 172 173 This is the default entry point that runs the server in stdio mode, 174 suitable for direct MCP client connections. 175 """ 176 _load_env() 177 init_sentry_tracking() 178 179 print("=" * 60, flush=True, file=sys.stderr) 180 print("Starting Airbyte Admin MCP server (stdio mode).", file=sys.stderr) 181 try: 182 asyncio.run(app.run_stdio_async(show_banner=False)) 183 except KeyboardInterrupt: 184 print("Airbyte Admin MCP server interrupted by user.", file=sys.stderr) 185 186 print("Airbyte Admin MCP server stopped.", file=sys.stderr) 187 print("=" * 60, flush=True, file=sys.stderr)
Main entry point for the Airbyte Admin MCP server (stdio mode).
This is the default entry point that runs the server in stdio mode, suitable for direct MCP client connections.
217def main_http() -> None: 218 """HTTP entry point for the Airbyte Admin MCP server. 219 220 This entry point runs the server in HTTP mode, suitable for containerized 221 deployments where the server needs to be accessible over HTTP. 222 223 Environment Variables: 224 MCP_HTTP_HOST: Host to bind to (default: 127.0.0.1) 225 MCP_HTTP_PORT: Port to listen on (default: 8082) 226 """ 227 _load_env() 228 init_sentry_tracking() 229 230 host = os.getenv("MCP_HTTP_HOST", DEFAULT_HTTP_HOST) 231 port = _parse_port(os.getenv("MCP_HTTP_PORT"), DEFAULT_HTTP_PORT) 232 233 print("=" * 60, flush=True, file=sys.stderr) 234 print( 235 f"Starting Airbyte Admin MCP server (HTTP mode) on {host}:{port}", 236 file=sys.stderr, 237 ) 238 try: 239 app.run(transport="http", host=host, port=port) 240 except KeyboardInterrupt: 241 print("Airbyte Admin MCP server interrupted by user.", file=sys.stderr) 242 243 print("Airbyte Admin MCP server stopped.", file=sys.stderr) 244 print("=" * 60, flush=True, file=sys.stderr)
HTTP entry point for the Airbyte Admin MCP server.
This entry point runs the server in HTTP mode, suitable for containerized deployments where the server needs to be accessible over HTTP.
Environment Variables:
MCP_HTTP_HOST: Host to bind to (default: 127.0.0.1) MCP_HTTP_PORT: Port to listen on (default: 8082)