airbyte_ops_mcp.mcp.server
Airbyte Admin MCP server implementation.
This module provides the main MCP server for Airbyte admin operations.
The server can run in two modes:
- stdio mode (default): For direct MCP client connections via stdin/stdout
- HTTP mode: For HTTP-based MCP connections, useful for containerized deployments
Environment Variables:
MCP_HTTP_HOST: Host to bind HTTP server to (default: 127.0.0.1) MCP_HTTP_PORT: Port for HTTP server (default: 8082)
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2"""Airbyte Admin MCP server implementation. 3 4This module provides the main MCP server for Airbyte admin operations. 5 6The server can run in two modes: 7- **stdio mode** (default): For direct MCP client connections via stdin/stdout 8- **HTTP mode**: For HTTP-based MCP connections, useful for containerized deployments 9 10Environment Variables: 11 MCP_HTTP_HOST: Host to bind HTTP server to (default: 127.0.0.1) 12 MCP_HTTP_PORT: Port for HTTP server (default: 8082) 13""" 14 15import asyncio 16import os 17import sys 18from pathlib import Path 19 20from airbyte.cloud.auth import resolve_cloud_client_id, resolve_cloud_client_secret 21from dotenv import load_dotenv 22from fastmcp import FastMCP 23from fastmcp_extensions import MCPServerConfigArg, mcp_server 24 25from airbyte_ops_mcp._sentry import init_sentry_tracking 26from airbyte_ops_mcp.constants import ( 27 HEADER_AIRBYTE_CLOUD_CLIENT_ID, 28 HEADER_AIRBYTE_CLOUD_CLIENT_SECRET, 29 MCP_SERVER_NAME, 30 ServerConfigKey, 31) 32from airbyte_ops_mcp.mcp._guidance import MCP_SERVER_INSTRUCTIONS 33from airbyte_ops_mcp.mcp.agent_message_bus import register_message_bus_tools 34from airbyte_ops_mcp.mcp.cloud_connector_versions import ( 35 register_cloud_connector_version_tools, 36) 37from airbyte_ops_mcp.mcp.connection_medic import register_connection_medic_tools 38from airbyte_ops_mcp.mcp.connection_state import register_connection_state_tools 39from airbyte_ops_mcp.mcp.connector_rollout import register_connector_rollout_tools 40from airbyte_ops_mcp.mcp.devin_reminders import register_devin_reminder_tools 41from airbyte_ops_mcp.mcp.devin_secret_request import register_devin_secret_request_tools 42from airbyte_ops_mcp.mcp.gcp_logs import register_gcp_logs_tools 43from airbyte_ops_mcp.mcp.github_actions import register_github_actions_tools 44from airbyte_ops_mcp.mcp.github_repo_ops import register_github_repo_ops_tools 45from airbyte_ops_mcp.mcp.human_in_the_loop import register_human_in_the_loop_tools 46from airbyte_ops_mcp.mcp.organization_agentic_flag import ( 47 register_organization_agentic_flag_tools, 48) 49from airbyte_ops_mcp.mcp.organization_payment_config import ( 50 register_organization_payment_config_tools, 51) 52from airbyte_ops_mcp.mcp.people_lookup import register_people_lookup_tools 53from airbyte_ops_mcp.mcp.prerelease import register_prerelease_tools 54from airbyte_ops_mcp.mcp.prod_db_queries import register_prod_db_query_tools 55from airbyte_ops_mcp.mcp.prompts import register_prompts 56from airbyte_ops_mcp.mcp.registry import register_registry_tools 57from airbyte_ops_mcp.mcp.regression_tests import register_regression_tests_tools 58from airbyte_ops_mcp.mcp.release_block import register_release_block_tools 59from airbyte_ops_mcp.mcp.session_feedback import register_session_feedback_tools 60from airbyte_ops_mcp.mcp.session_namer import register_session_namer_tools 61from airbyte_ops_mcp.mcp.slack_messaging import register_slack_messaging_tools 62from airbyte_ops_mcp.mcp.tier_lookup import register_tier_lookup_tools 63 64# Default HTTP server configuration 65DEFAULT_HTTP_HOST = "127.0.0.1" 66DEFAULT_HTTP_PORT = 8082 67 68 69def _normalize_bearer_token(value: str) -> str | None: 70 """Extract bearer token from Authorization header value. 71 72 Parses "Bearer <token>" format (case-insensitive prefix). 73 Returns None if the value doesn't have the Bearer prefix. 74 """ 75 if value.lower().startswith("bearer "): 76 token = value[7:].strip() 77 return token if token else None 78 return None 79 80 81# Create the MCP server with built-in server info resource 82app = mcp_server( 83 name=MCP_SERVER_NAME, 84 instructions=MCP_SERVER_INSTRUCTIONS, 85 package_name="airbyte-internal-ops", 86 advertised_properties={ 87 "docs_url": "https://github.com/airbytehq/airbyte-ops-mcp", 88 "release_history_url": "https://github.com/airbytehq/airbyte-ops-mcp/releases", 89 }, 90 server_config_args=[ 91 MCPServerConfigArg( 92 name=ServerConfigKey.BEARER_TOKEN, 93 http_header_key="Authorization", 94 env_var="AIRBYTE_CLOUD_BEARER_TOKEN", 95 normalize_fn=_normalize_bearer_token, 96 required=False, 97 sensitive=True, 98 ), 99 MCPServerConfigArg( 100 name=ServerConfigKey.CLIENT_ID, 101 http_header_key=HEADER_AIRBYTE_CLOUD_CLIENT_ID, 102 default=lambda: str(resolve_cloud_client_id()), 103 required=True, 104 sensitive=True, 105 ), 106 MCPServerConfigArg( 107 name=ServerConfigKey.CLIENT_SECRET, 108 http_header_key=HEADER_AIRBYTE_CLOUD_CLIENT_SECRET, 109 default=lambda: str(resolve_cloud_client_secret()), 110 required=True, 111 sensitive=True, 112 ), 113 ], 114 include_standard_tool_filters=True, 115) 116 117 118def register_server_assets(app: FastMCP) -> None: 119 """Register all server assets (tools, prompts, resources) with the FastMCP app. 120 121 This function registers assets for all domains: 122 - REPO: GitHub repository operations 123 - CLOUD: Cloud connector version management 124 - PROMPTS: Prompt templates for common workflows 125 - REGRESSION_TESTS: Connector regression tests (single-version and comparison) 126 - REGISTRY: Connector registry operations (read/write metadata from GCS) 127 - METADATA: Connector metadata operations (future) 128 - QA: Connector quality assurance (future) 129 - INSIGHTS: Connector analysis and insights (future) 130 131 Note: Server info resource is now built-in via mcp_server() helper. 132 133 Args: 134 app: FastMCP application instance 135 """ 136 register_github_repo_ops_tools(app) 137 register_github_actions_tools(app) 138 register_prerelease_tools(app) 139 register_cloud_connector_version_tools(app) 140 register_connector_rollout_tools(app) 141 register_prod_db_query_tools(app) 142 register_gcp_logs_tools(app) 143 register_prompts(app) 144 register_regression_tests_tools(app) 145 register_registry_tools(app) 146 register_connection_state_tools(app) 147 register_connection_medic_tools(app) 148 register_organization_agentic_flag_tools(app) 149 register_organization_payment_config_tools(app) 150 register_people_lookup_tools(app) 151 register_human_in_the_loop_tools(app) 152 register_devin_reminder_tools(app) 153 register_message_bus_tools(app) 154 register_session_feedback_tools(app) 155 register_session_namer_tools(app) 156 register_slack_messaging_tools(app) 157 register_devin_secret_request_tools(app) 158 register_tier_lookup_tools(app) 159 register_release_block_tools(app) 160 161 162register_server_assets(app) 163 164 165def _load_env() -> None: 166 """Load environment variables from .env file if present.""" 167 env_file = Path.cwd() / ".env" 168 if env_file.exists(): 169 load_dotenv(env_file) 170 print(f"Loaded environment from: {env_file}", flush=True, file=sys.stderr) 171 172 173def main() -> None: 174 """Main entry point for the Airbyte Admin MCP server (stdio mode). 175 176 This is the default entry point that runs the server in stdio mode, 177 suitable for direct MCP client connections. 178 """ 179 _load_env() 180 init_sentry_tracking() 181 182 print("=" * 60, flush=True, file=sys.stderr) 183 print("Starting Airbyte Admin MCP server (stdio mode).", file=sys.stderr) 184 try: 185 asyncio.run(app.run_stdio_async(show_banner=False)) 186 except KeyboardInterrupt: 187 print("Airbyte Admin MCP server interrupted by user.", file=sys.stderr) 188 189 print("Airbyte Admin MCP server stopped.", file=sys.stderr) 190 print("=" * 60, flush=True, file=sys.stderr) 191 192 193def _parse_port(port_str: str | None, default: int) -> int: 194 """Parse and validate a port number from string. 195 196 Args: 197 port_str: Port string from environment variable, or None if not set 198 default: Default port to use if port_str is None 199 200 Returns: 201 Validated port number 202 203 Raises: 204 ValueError: If port_str is not a valid integer or out of range 205 """ 206 if port_str is None: 207 return default 208 209 port_str = port_str.strip() 210 if not port_str.isdecimal(): 211 raise ValueError(f"MCP_HTTP_PORT must be a valid integer, got: {port_str!r}") 212 213 port = int(port_str) 214 if not 1 <= port <= 65535: 215 raise ValueError(f"MCP_HTTP_PORT must be between 1 and 65535, got: {port}") 216 217 return port 218 219 220def main_http() -> None: 221 """HTTP entry point for the Airbyte Admin MCP server. 222 223 This entry point runs the server in HTTP mode, suitable for containerized 224 deployments where the server needs to be accessible over HTTP. 225 226 Environment Variables: 227 MCP_HTTP_HOST: Host to bind to (default: 127.0.0.1) 228 MCP_HTTP_PORT: Port to listen on (default: 8082) 229 """ 230 _load_env() 231 init_sentry_tracking() 232 233 host = os.getenv("MCP_HTTP_HOST", DEFAULT_HTTP_HOST) 234 port = _parse_port(os.getenv("MCP_HTTP_PORT"), DEFAULT_HTTP_PORT) 235 236 print("=" * 60, flush=True, file=sys.stderr) 237 print( 238 f"Starting Airbyte Admin MCP server (HTTP mode) on {host}:{port}", 239 file=sys.stderr, 240 ) 241 try: 242 app.run(transport="http", host=host, port=port) 243 except KeyboardInterrupt: 244 print("Airbyte Admin MCP server interrupted by user.", file=sys.stderr) 245 246 print("Airbyte Admin MCP server stopped.", file=sys.stderr) 247 print("=" * 60, flush=True, file=sys.stderr) 248 249 250if __name__ == "__main__": 251 main()
119def register_server_assets(app: FastMCP) -> None: 120 """Register all server assets (tools, prompts, resources) with the FastMCP app. 121 122 This function registers assets for all domains: 123 - REPO: GitHub repository operations 124 - CLOUD: Cloud connector version management 125 - PROMPTS: Prompt templates for common workflows 126 - REGRESSION_TESTS: Connector regression tests (single-version and comparison) 127 - REGISTRY: Connector registry operations (read/write metadata from GCS) 128 - METADATA: Connector metadata operations (future) 129 - QA: Connector quality assurance (future) 130 - INSIGHTS: Connector analysis and insights (future) 131 132 Note: Server info resource is now built-in via mcp_server() helper. 133 134 Args: 135 app: FastMCP application instance 136 """ 137 register_github_repo_ops_tools(app) 138 register_github_actions_tools(app) 139 register_prerelease_tools(app) 140 register_cloud_connector_version_tools(app) 141 register_connector_rollout_tools(app) 142 register_prod_db_query_tools(app) 143 register_gcp_logs_tools(app) 144 register_prompts(app) 145 register_regression_tests_tools(app) 146 register_registry_tools(app) 147 register_connection_state_tools(app) 148 register_connection_medic_tools(app) 149 register_organization_agentic_flag_tools(app) 150 register_organization_payment_config_tools(app) 151 register_people_lookup_tools(app) 152 register_human_in_the_loop_tools(app) 153 register_devin_reminder_tools(app) 154 register_message_bus_tools(app) 155 register_session_feedback_tools(app) 156 register_session_namer_tools(app) 157 register_slack_messaging_tools(app) 158 register_devin_secret_request_tools(app) 159 register_tier_lookup_tools(app) 160 register_release_block_tools(app)
Register all server assets (tools, prompts, resources) with the FastMCP app.
This function registers assets for all domains:
- REPO: GitHub repository operations
- CLOUD: Cloud connector version management
- PROMPTS: Prompt templates for common workflows
- REGRESSION_TESTS: Connector regression tests (single-version and comparison)
- REGISTRY: Connector registry operations (read/write metadata from GCS)
- METADATA: Connector metadata operations (future)
- QA: Connector quality assurance (future)
- INSIGHTS: Connector analysis and insights (future)
Note: Server info resource is now built-in via mcp_server() helper.
Arguments:
- app: FastMCP application instance
174def main() -> None: 175 """Main entry point for the Airbyte Admin MCP server (stdio mode). 176 177 This is the default entry point that runs the server in stdio mode, 178 suitable for direct MCP client connections. 179 """ 180 _load_env() 181 init_sentry_tracking() 182 183 print("=" * 60, flush=True, file=sys.stderr) 184 print("Starting Airbyte Admin MCP server (stdio mode).", file=sys.stderr) 185 try: 186 asyncio.run(app.run_stdio_async(show_banner=False)) 187 except KeyboardInterrupt: 188 print("Airbyte Admin MCP server interrupted by user.", file=sys.stderr) 189 190 print("Airbyte Admin MCP server stopped.", file=sys.stderr) 191 print("=" * 60, flush=True, file=sys.stderr)
Main entry point for the Airbyte Admin MCP server (stdio mode).
This is the default entry point that runs the server in stdio mode, suitable for direct MCP client connections.
221def main_http() -> None: 222 """HTTP entry point for the Airbyte Admin MCP server. 223 224 This entry point runs the server in HTTP mode, suitable for containerized 225 deployments where the server needs to be accessible over HTTP. 226 227 Environment Variables: 228 MCP_HTTP_HOST: Host to bind to (default: 127.0.0.1) 229 MCP_HTTP_PORT: Port to listen on (default: 8082) 230 """ 231 _load_env() 232 init_sentry_tracking() 233 234 host = os.getenv("MCP_HTTP_HOST", DEFAULT_HTTP_HOST) 235 port = _parse_port(os.getenv("MCP_HTTP_PORT"), DEFAULT_HTTP_PORT) 236 237 print("=" * 60, flush=True, file=sys.stderr) 238 print( 239 f"Starting Airbyte Admin MCP server (HTTP mode) on {host}:{port}", 240 file=sys.stderr, 241 ) 242 try: 243 app.run(transport="http", host=host, port=port) 244 except KeyboardInterrupt: 245 print("Airbyte Admin MCP server interrupted by user.", file=sys.stderr) 246 247 print("Airbyte Admin MCP server stopped.", file=sys.stderr) 248 print("=" * 60, flush=True, file=sys.stderr)
HTTP entry point for the Airbyte Admin MCP server.
This entry point runs the server in HTTP mode, suitable for containerized deployments where the server needs to be accessible over HTTP.
Environment Variables:
MCP_HTTP_HOST: Host to bind to (default: 127.0.0.1) MCP_HTTP_PORT: Port to listen on (default: 8082)