airbyte_ops_mcp.mcp.server

Airbyte Admin MCP server implementation.

This module provides the main MCP server for Airbyte admin operations.

The server can run in two modes:

  • stdio mode (default): For direct MCP client connections via stdin/stdout
  • HTTP mode: For HTTP-based MCP connections, useful for containerized deployments
Environment Variables:

MCP_HTTP_HOST: Host to bind HTTP server to (default: 127.0.0.1) MCP_HTTP_PORT: Port for HTTP server (default: 8082)

  1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
  2"""Airbyte Admin MCP server implementation.
  3
  4This module provides the main MCP server for Airbyte admin operations.
  5
  6The server can run in two modes:
  7- **stdio mode** (default): For direct MCP client connections via stdin/stdout
  8- **HTTP mode**: For HTTP-based MCP connections, useful for containerized deployments
  9
 10Environment Variables:
 11    MCP_HTTP_HOST: Host to bind HTTP server to (default: 127.0.0.1)
 12    MCP_HTTP_PORT: Port for HTTP server (default: 8082)
 13"""
 14
 15import asyncio
 16import os
 17import sys
 18from pathlib import Path
 19
 20from airbyte.cloud.auth import resolve_cloud_client_id, resolve_cloud_client_secret
 21from dotenv import load_dotenv
 22from fastmcp import FastMCP
 23from fastmcp_extensions import MCPServerConfigArg, mcp_server
 24
 25from airbyte_ops_mcp._sentry import init_sentry_tracking
 26from airbyte_ops_mcp.constants import (
 27    HEADER_AIRBYTE_CLOUD_CLIENT_ID,
 28    HEADER_AIRBYTE_CLOUD_CLIENT_SECRET,
 29    MCP_SERVER_NAME,
 30    ServerConfigKey,
 31)
 32from airbyte_ops_mcp.mcp._guidance import MCP_SERVER_INSTRUCTIONS
 33from airbyte_ops_mcp.mcp.agent_message_bus import register_message_bus_tools
 34from airbyte_ops_mcp.mcp.cloud_connector_versions import (
 35    register_cloud_connector_version_tools,
 36)
 37from airbyte_ops_mcp.mcp.connection_medic import register_connection_medic_tools
 38from airbyte_ops_mcp.mcp.connection_state import register_connection_state_tools
 39from airbyte_ops_mcp.mcp.connector_rollout import register_connector_rollout_tools
 40from airbyte_ops_mcp.mcp.devin_reminders import register_devin_reminder_tools
 41from airbyte_ops_mcp.mcp.devin_secret_request import register_devin_secret_request_tools
 42from airbyte_ops_mcp.mcp.gcp_logs import register_gcp_logs_tools
 43from airbyte_ops_mcp.mcp.github_actions import register_github_actions_tools
 44from airbyte_ops_mcp.mcp.github_repo_ops import register_github_repo_ops_tools
 45from airbyte_ops_mcp.mcp.human_in_the_loop import register_human_in_the_loop_tools
 46from airbyte_ops_mcp.mcp.people_lookup import register_people_lookup_tools
 47from airbyte_ops_mcp.mcp.prerelease import register_prerelease_tools
 48from airbyte_ops_mcp.mcp.prod_db_queries import register_prod_db_query_tools
 49from airbyte_ops_mcp.mcp.prompts import register_prompts
 50from airbyte_ops_mcp.mcp.registry import register_registry_tools
 51from airbyte_ops_mcp.mcp.regression_tests import register_regression_tests_tools
 52from airbyte_ops_mcp.mcp.session_feedback import register_session_feedback_tools
 53from airbyte_ops_mcp.mcp.session_namer import register_session_namer_tools
 54from airbyte_ops_mcp.mcp.slack_messaging import register_slack_messaging_tools
 55from airbyte_ops_mcp.mcp.tier_lookup import register_tier_lookup_tools
 56
 57# Default HTTP server configuration
 58DEFAULT_HTTP_HOST = "127.0.0.1"
 59DEFAULT_HTTP_PORT = 8082
 60
 61
 62def _normalize_bearer_token(value: str) -> str | None:
 63    """Extract bearer token from Authorization header value.
 64
 65    Parses "Bearer <token>" format (case-insensitive prefix).
 66    Returns None if the value doesn't have the Bearer prefix.
 67    """
 68    if value.lower().startswith("bearer "):
 69        token = value[7:].strip()
 70        return token if token else None
 71    return None
 72
 73
 74# Create the MCP server with built-in server info resource
 75app = mcp_server(
 76    name=MCP_SERVER_NAME,
 77    instructions=MCP_SERVER_INSTRUCTIONS,
 78    package_name="airbyte-internal-ops",
 79    advertised_properties={
 80        "docs_url": "https://github.com/airbytehq/airbyte-ops-mcp",
 81        "release_history_url": "https://github.com/airbytehq/airbyte-ops-mcp/releases",
 82    },
 83    server_config_args=[
 84        MCPServerConfigArg(
 85            name=ServerConfigKey.BEARER_TOKEN,
 86            http_header_key="Authorization",
 87            env_var="AIRBYTE_CLOUD_BEARER_TOKEN",
 88            normalize_fn=_normalize_bearer_token,
 89            required=False,
 90            sensitive=True,
 91        ),
 92        MCPServerConfigArg(
 93            name=ServerConfigKey.CLIENT_ID,
 94            http_header_key=HEADER_AIRBYTE_CLOUD_CLIENT_ID,
 95            default=lambda: str(resolve_cloud_client_id()),
 96            required=True,
 97            sensitive=True,
 98        ),
 99        MCPServerConfigArg(
100            name=ServerConfigKey.CLIENT_SECRET,
101            http_header_key=HEADER_AIRBYTE_CLOUD_CLIENT_SECRET,
102            default=lambda: str(resolve_cloud_client_secret()),
103            required=True,
104            sensitive=True,
105        ),
106    ],
107    include_standard_tool_filters=True,
108)
109
110
111def register_server_assets(app: FastMCP) -> None:
112    """Register all server assets (tools, prompts, resources) with the FastMCP app.
113
114    This function registers assets for all domains:
115    - REPO: GitHub repository operations
116    - CLOUD: Cloud connector version management
117    - PROMPTS: Prompt templates for common workflows
118    - REGRESSION_TESTS: Connector regression tests (single-version and comparison)
119    - REGISTRY: Connector registry operations (read/write metadata from GCS)
120    - METADATA: Connector metadata operations (future)
121    - QA: Connector quality assurance (future)
122    - INSIGHTS: Connector analysis and insights (future)
123
124    Note: Server info resource is now built-in via mcp_server() helper.
125
126    Args:
127        app: FastMCP application instance
128    """
129    register_github_repo_ops_tools(app)
130    register_github_actions_tools(app)
131    register_prerelease_tools(app)
132    register_cloud_connector_version_tools(app)
133    register_connector_rollout_tools(app)
134    register_prod_db_query_tools(app)
135    register_gcp_logs_tools(app)
136    register_prompts(app)
137    register_regression_tests_tools(app)
138    register_registry_tools(app)
139    register_connection_state_tools(app)
140    register_connection_medic_tools(app)
141    register_people_lookup_tools(app)
142    register_human_in_the_loop_tools(app)
143    register_devin_reminder_tools(app)
144    register_message_bus_tools(app)
145    register_session_feedback_tools(app)
146    register_session_namer_tools(app)
147    register_slack_messaging_tools(app)
148    register_devin_secret_request_tools(app)
149    register_tier_lookup_tools(app)
150
151
152register_server_assets(app)
153
154
155def _load_env() -> None:
156    """Load environment variables from .env file if present."""
157    env_file = Path.cwd() / ".env"
158    if env_file.exists():
159        load_dotenv(env_file)
160        print(f"Loaded environment from: {env_file}", flush=True, file=sys.stderr)
161
162
163def main() -> None:
164    """Main entry point for the Airbyte Admin MCP server (stdio mode).
165
166    This is the default entry point that runs the server in stdio mode,
167    suitable for direct MCP client connections.
168    """
169    _load_env()
170    init_sentry_tracking()
171
172    print("=" * 60, flush=True, file=sys.stderr)
173    print("Starting Airbyte Admin MCP server (stdio mode).", file=sys.stderr)
174    try:
175        asyncio.run(app.run_stdio_async(show_banner=False))
176    except KeyboardInterrupt:
177        print("Airbyte Admin MCP server interrupted by user.", file=sys.stderr)
178
179    print("Airbyte Admin MCP server stopped.", file=sys.stderr)
180    print("=" * 60, flush=True, file=sys.stderr)
181
182
183def _parse_port(port_str: str | None, default: int) -> int:
184    """Parse and validate a port number from string.
185
186    Args:
187        port_str: Port string from environment variable, or None if not set
188        default: Default port to use if port_str is None
189
190    Returns:
191        Validated port number
192
193    Raises:
194        ValueError: If port_str is not a valid integer or out of range
195    """
196    if port_str is None:
197        return default
198
199    port_str = port_str.strip()
200    if not port_str.isdecimal():
201        raise ValueError(f"MCP_HTTP_PORT must be a valid integer, got: {port_str!r}")
202
203    port = int(port_str)
204    if not 1 <= port <= 65535:
205        raise ValueError(f"MCP_HTTP_PORT must be between 1 and 65535, got: {port}")
206
207    return port
208
209
210def main_http() -> None:
211    """HTTP entry point for the Airbyte Admin MCP server.
212
213    This entry point runs the server in HTTP mode, suitable for containerized
214    deployments where the server needs to be accessible over HTTP.
215
216    Environment Variables:
217        MCP_HTTP_HOST: Host to bind to (default: 127.0.0.1)
218        MCP_HTTP_PORT: Port to listen on (default: 8082)
219    """
220    _load_env()
221    init_sentry_tracking()
222
223    host = os.getenv("MCP_HTTP_HOST", DEFAULT_HTTP_HOST)
224    port = _parse_port(os.getenv("MCP_HTTP_PORT"), DEFAULT_HTTP_PORT)
225
226    print("=" * 60, flush=True, file=sys.stderr)
227    print(
228        f"Starting Airbyte Admin MCP server (HTTP mode) on {host}:{port}",
229        file=sys.stderr,
230    )
231    try:
232        app.run(transport="http", host=host, port=port)
233    except KeyboardInterrupt:
234        print("Airbyte Admin MCP server interrupted by user.", file=sys.stderr)
235
236    print("Airbyte Admin MCP server stopped.", file=sys.stderr)
237    print("=" * 60, flush=True, file=sys.stderr)
238
239
240if __name__ == "__main__":
241    main()
DEFAULT_HTTP_HOST = '127.0.0.1'
DEFAULT_HTTP_PORT = 8082
app = FastMCP('airbyte-internal-ops')
def register_server_assets(app: fastmcp.server.server.FastMCP) -> None:
112def register_server_assets(app: FastMCP) -> None:
113    """Register all server assets (tools, prompts, resources) with the FastMCP app.
114
115    This function registers assets for all domains:
116    - REPO: GitHub repository operations
117    - CLOUD: Cloud connector version management
118    - PROMPTS: Prompt templates for common workflows
119    - REGRESSION_TESTS: Connector regression tests (single-version and comparison)
120    - REGISTRY: Connector registry operations (read/write metadata from GCS)
121    - METADATA: Connector metadata operations (future)
122    - QA: Connector quality assurance (future)
123    - INSIGHTS: Connector analysis and insights (future)
124
125    Note: Server info resource is now built-in via mcp_server() helper.
126
127    Args:
128        app: FastMCP application instance
129    """
130    register_github_repo_ops_tools(app)
131    register_github_actions_tools(app)
132    register_prerelease_tools(app)
133    register_cloud_connector_version_tools(app)
134    register_connector_rollout_tools(app)
135    register_prod_db_query_tools(app)
136    register_gcp_logs_tools(app)
137    register_prompts(app)
138    register_regression_tests_tools(app)
139    register_registry_tools(app)
140    register_connection_state_tools(app)
141    register_connection_medic_tools(app)
142    register_people_lookup_tools(app)
143    register_human_in_the_loop_tools(app)
144    register_devin_reminder_tools(app)
145    register_message_bus_tools(app)
146    register_session_feedback_tools(app)
147    register_session_namer_tools(app)
148    register_slack_messaging_tools(app)
149    register_devin_secret_request_tools(app)
150    register_tier_lookup_tools(app)

Register all server assets (tools, prompts, resources) with the FastMCP app.

This function registers assets for all domains:

  • REPO: GitHub repository operations
  • CLOUD: Cloud connector version management
  • PROMPTS: Prompt templates for common workflows
  • REGRESSION_TESTS: Connector regression tests (single-version and comparison)
  • REGISTRY: Connector registry operations (read/write metadata from GCS)
  • METADATA: Connector metadata operations (future)
  • QA: Connector quality assurance (future)
  • INSIGHTS: Connector analysis and insights (future)

Note: Server info resource is now built-in via mcp_server() helper.

Arguments:
  • app: FastMCP application instance
def main() -> None:
164def main() -> None:
165    """Main entry point for the Airbyte Admin MCP server (stdio mode).
166
167    This is the default entry point that runs the server in stdio mode,
168    suitable for direct MCP client connections.
169    """
170    _load_env()
171    init_sentry_tracking()
172
173    print("=" * 60, flush=True, file=sys.stderr)
174    print("Starting Airbyte Admin MCP server (stdio mode).", file=sys.stderr)
175    try:
176        asyncio.run(app.run_stdio_async(show_banner=False))
177    except KeyboardInterrupt:
178        print("Airbyte Admin MCP server interrupted by user.", file=sys.stderr)
179
180    print("Airbyte Admin MCP server stopped.", file=sys.stderr)
181    print("=" * 60, flush=True, file=sys.stderr)

Main entry point for the Airbyte Admin MCP server (stdio mode).

This is the default entry point that runs the server in stdio mode, suitable for direct MCP client connections.

def main_http() -> None:
211def main_http() -> None:
212    """HTTP entry point for the Airbyte Admin MCP server.
213
214    This entry point runs the server in HTTP mode, suitable for containerized
215    deployments where the server needs to be accessible over HTTP.
216
217    Environment Variables:
218        MCP_HTTP_HOST: Host to bind to (default: 127.0.0.1)
219        MCP_HTTP_PORT: Port to listen on (default: 8082)
220    """
221    _load_env()
222    init_sentry_tracking()
223
224    host = os.getenv("MCP_HTTP_HOST", DEFAULT_HTTP_HOST)
225    port = _parse_port(os.getenv("MCP_HTTP_PORT"), DEFAULT_HTTP_PORT)
226
227    print("=" * 60, flush=True, file=sys.stderr)
228    print(
229        f"Starting Airbyte Admin MCP server (HTTP mode) on {host}:{port}",
230        file=sys.stderr,
231    )
232    try:
233        app.run(transport="http", host=host, port=port)
234    except KeyboardInterrupt:
235        print("Airbyte Admin MCP server interrupted by user.", file=sys.stderr)
236
237    print("Airbyte Admin MCP server stopped.", file=sys.stderr)
238    print("=" * 60, flush=True, file=sys.stderr)

HTTP entry point for the Airbyte Admin MCP server.

This entry point runs the server in HTTP mode, suitable for containerized deployments where the server needs to be accessible over HTTP.

Environment Variables:

MCP_HTTP_HOST: Host to bind to (default: 127.0.0.1) MCP_HTTP_PORT: Port to listen on (default: 8082)