airbyte_ops_mcp.mcp.server

Airbyte Admin MCP server implementation.

This module provides the main MCP server for Airbyte admin operations.

The server can run in two modes:

  • stdio mode (default): For direct MCP client connections via stdin/stdout
  • HTTP mode: For HTTP-based MCP connections, useful for containerized deployments
Environment Variables:

MCP_HTTP_HOST: Host to bind HTTP server to (default: 127.0.0.1) MCP_HTTP_PORT: Port for HTTP server (default: 8082)

  1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
  2"""Airbyte Admin MCP server implementation.
  3
  4This module provides the main MCP server for Airbyte admin operations.
  5
  6The server can run in two modes:
  7- **stdio mode** (default): For direct MCP client connections via stdin/stdout
  8- **HTTP mode**: For HTTP-based MCP connections, useful for containerized deployments
  9
 10Environment Variables:
 11    MCP_HTTP_HOST: Host to bind HTTP server to (default: 127.0.0.1)
 12    MCP_HTTP_PORT: Port for HTTP server (default: 8082)
 13"""
 14
 15import asyncio
 16import os
 17import sys
 18from pathlib import Path
 19
 20from airbyte.cloud.auth import resolve_cloud_client_id, resolve_cloud_client_secret
 21from dotenv import load_dotenv
 22from fastmcp import FastMCP
 23from fastmcp_extensions import MCPServerConfigArg, mcp_server
 24
 25from airbyte_ops_mcp._sentry import init_sentry_tracking
 26from airbyte_ops_mcp.constants import (
 27    HEADER_AIRBYTE_CLOUD_CLIENT_ID,
 28    HEADER_AIRBYTE_CLOUD_CLIENT_SECRET,
 29    MCP_SERVER_NAME,
 30    ServerConfigKey,
 31)
 32from airbyte_ops_mcp.mcp._guidance import MCP_SERVER_INSTRUCTIONS
 33from airbyte_ops_mcp.mcp.agent_message_bus import register_message_bus_tools
 34from airbyte_ops_mcp.mcp.cloud_connector_versions import (
 35    register_cloud_connector_version_tools,
 36)
 37from airbyte_ops_mcp.mcp.connection_medic import register_connection_medic_tools
 38from airbyte_ops_mcp.mcp.connection_state import register_connection_state_tools
 39from airbyte_ops_mcp.mcp.connector_rollout import register_connector_rollout_tools
 40from airbyte_ops_mcp.mcp.devin_reminders import register_devin_reminder_tools
 41from airbyte_ops_mcp.mcp.devin_secret_request import register_devin_secret_request_tools
 42from airbyte_ops_mcp.mcp.gcp_logs import register_gcp_logs_tools
 43from airbyte_ops_mcp.mcp.github_actions import register_github_actions_tools
 44from airbyte_ops_mcp.mcp.github_repo_ops import register_github_repo_ops_tools
 45from airbyte_ops_mcp.mcp.human_in_the_loop import register_human_in_the_loop_tools
 46from airbyte_ops_mcp.mcp.organization_payment_config import (
 47    register_organization_payment_config_tools,
 48)
 49from airbyte_ops_mcp.mcp.people_lookup import register_people_lookup_tools
 50from airbyte_ops_mcp.mcp.prerelease import register_prerelease_tools
 51from airbyte_ops_mcp.mcp.prod_db_queries import register_prod_db_query_tools
 52from airbyte_ops_mcp.mcp.prompts import register_prompts
 53from airbyte_ops_mcp.mcp.registry import register_registry_tools
 54from airbyte_ops_mcp.mcp.regression_tests import register_regression_tests_tools
 55from airbyte_ops_mcp.mcp.release_block import register_release_block_tools
 56from airbyte_ops_mcp.mcp.session_feedback import register_session_feedback_tools
 57from airbyte_ops_mcp.mcp.session_namer import register_session_namer_tools
 58from airbyte_ops_mcp.mcp.slack_messaging import register_slack_messaging_tools
 59from airbyte_ops_mcp.mcp.tier_lookup import register_tier_lookup_tools
 60
 61# Default HTTP server configuration
 62DEFAULT_HTTP_HOST = "127.0.0.1"
 63DEFAULT_HTTP_PORT = 8082
 64
 65
 66def _normalize_bearer_token(value: str) -> str | None:
 67    """Extract bearer token from Authorization header value.
 68
 69    Parses "Bearer <token>" format (case-insensitive prefix).
 70    Returns None if the value doesn't have the Bearer prefix.
 71    """
 72    if value.lower().startswith("bearer "):
 73        token = value[7:].strip()
 74        return token if token else None
 75    return None
 76
 77
 78# Create the MCP server with built-in server info resource
 79app = mcp_server(
 80    name=MCP_SERVER_NAME,
 81    instructions=MCP_SERVER_INSTRUCTIONS,
 82    package_name="airbyte-internal-ops",
 83    advertised_properties={
 84        "docs_url": "https://github.com/airbytehq/airbyte-ops-mcp",
 85        "release_history_url": "https://github.com/airbytehq/airbyte-ops-mcp/releases",
 86    },
 87    server_config_args=[
 88        MCPServerConfigArg(
 89            name=ServerConfigKey.BEARER_TOKEN,
 90            http_header_key="Authorization",
 91            env_var="AIRBYTE_CLOUD_BEARER_TOKEN",
 92            normalize_fn=_normalize_bearer_token,
 93            required=False,
 94            sensitive=True,
 95        ),
 96        MCPServerConfigArg(
 97            name=ServerConfigKey.CLIENT_ID,
 98            http_header_key=HEADER_AIRBYTE_CLOUD_CLIENT_ID,
 99            default=lambda: str(resolve_cloud_client_id()),
100            required=True,
101            sensitive=True,
102        ),
103        MCPServerConfigArg(
104            name=ServerConfigKey.CLIENT_SECRET,
105            http_header_key=HEADER_AIRBYTE_CLOUD_CLIENT_SECRET,
106            default=lambda: str(resolve_cloud_client_secret()),
107            required=True,
108            sensitive=True,
109        ),
110    ],
111    include_standard_tool_filters=True,
112)
113
114
115def register_server_assets(app: FastMCP) -> None:
116    """Register all server assets (tools, prompts, resources) with the FastMCP app.
117
118    This function registers assets for all domains:
119    - REPO: GitHub repository operations
120    - CLOUD: Cloud connector version management
121    - PROMPTS: Prompt templates for common workflows
122    - REGRESSION_TESTS: Connector regression tests (single-version and comparison)
123    - REGISTRY: Connector registry operations (read/write metadata from GCS)
124    - METADATA: Connector metadata operations (future)
125    - QA: Connector quality assurance (future)
126    - INSIGHTS: Connector analysis and insights (future)
127
128    Note: Server info resource is now built-in via mcp_server() helper.
129
130    Args:
131        app: FastMCP application instance
132    """
133    register_github_repo_ops_tools(app)
134    register_github_actions_tools(app)
135    register_prerelease_tools(app)
136    register_cloud_connector_version_tools(app)
137    register_connector_rollout_tools(app)
138    register_prod_db_query_tools(app)
139    register_gcp_logs_tools(app)
140    register_prompts(app)
141    register_regression_tests_tools(app)
142    register_registry_tools(app)
143    register_connection_state_tools(app)
144    register_connection_medic_tools(app)
145    register_organization_payment_config_tools(app)
146    register_people_lookup_tools(app)
147    register_human_in_the_loop_tools(app)
148    register_devin_reminder_tools(app)
149    register_message_bus_tools(app)
150    register_session_feedback_tools(app)
151    register_session_namer_tools(app)
152    register_slack_messaging_tools(app)
153    register_devin_secret_request_tools(app)
154    register_tier_lookup_tools(app)
155    register_release_block_tools(app)
156
157
158register_server_assets(app)
159
160
161def _load_env() -> None:
162    """Load environment variables from .env file if present."""
163    env_file = Path.cwd() / ".env"
164    if env_file.exists():
165        load_dotenv(env_file)
166        print(f"Loaded environment from: {env_file}", flush=True, file=sys.stderr)
167
168
169def main() -> None:
170    """Main entry point for the Airbyte Admin MCP server (stdio mode).
171
172    This is the default entry point that runs the server in stdio mode,
173    suitable for direct MCP client connections.
174    """
175    _load_env()
176    init_sentry_tracking()
177
178    print("=" * 60, flush=True, file=sys.stderr)
179    print("Starting Airbyte Admin MCP server (stdio mode).", file=sys.stderr)
180    try:
181        asyncio.run(app.run_stdio_async(show_banner=False))
182    except KeyboardInterrupt:
183        print("Airbyte Admin MCP server interrupted by user.", file=sys.stderr)
184
185    print("Airbyte Admin MCP server stopped.", file=sys.stderr)
186    print("=" * 60, flush=True, file=sys.stderr)
187
188
189def _parse_port(port_str: str | None, default: int) -> int:
190    """Parse and validate a port number from string.
191
192    Args:
193        port_str: Port string from environment variable, or None if not set
194        default: Default port to use if port_str is None
195
196    Returns:
197        Validated port number
198
199    Raises:
200        ValueError: If port_str is not a valid integer or out of range
201    """
202    if port_str is None:
203        return default
204
205    port_str = port_str.strip()
206    if not port_str.isdecimal():
207        raise ValueError(f"MCP_HTTP_PORT must be a valid integer, got: {port_str!r}")
208
209    port = int(port_str)
210    if not 1 <= port <= 65535:
211        raise ValueError(f"MCP_HTTP_PORT must be between 1 and 65535, got: {port}")
212
213    return port
214
215
216def main_http() -> None:
217    """HTTP entry point for the Airbyte Admin MCP server.
218
219    This entry point runs the server in HTTP mode, suitable for containerized
220    deployments where the server needs to be accessible over HTTP.
221
222    Environment Variables:
223        MCP_HTTP_HOST: Host to bind to (default: 127.0.0.1)
224        MCP_HTTP_PORT: Port to listen on (default: 8082)
225    """
226    _load_env()
227    init_sentry_tracking()
228
229    host = os.getenv("MCP_HTTP_HOST", DEFAULT_HTTP_HOST)
230    port = _parse_port(os.getenv("MCP_HTTP_PORT"), DEFAULT_HTTP_PORT)
231
232    print("=" * 60, flush=True, file=sys.stderr)
233    print(
234        f"Starting Airbyte Admin MCP server (HTTP mode) on {host}:{port}",
235        file=sys.stderr,
236    )
237    try:
238        app.run(transport="http", host=host, port=port)
239    except KeyboardInterrupt:
240        print("Airbyte Admin MCP server interrupted by user.", file=sys.stderr)
241
242    print("Airbyte Admin MCP server stopped.", file=sys.stderr)
243    print("=" * 60, flush=True, file=sys.stderr)
244
245
246if __name__ == "__main__":
247    main()
DEFAULT_HTTP_HOST = '127.0.0.1'
DEFAULT_HTTP_PORT = 8082
app = FastMCP('airbyte-internal-ops')
def register_server_assets(app: fastmcp.server.server.FastMCP) -> None:
116def register_server_assets(app: FastMCP) -> None:
117    """Register all server assets (tools, prompts, resources) with the FastMCP app.
118
119    This function registers assets for all domains:
120    - REPO: GitHub repository operations
121    - CLOUD: Cloud connector version management
122    - PROMPTS: Prompt templates for common workflows
123    - REGRESSION_TESTS: Connector regression tests (single-version and comparison)
124    - REGISTRY: Connector registry operations (read/write metadata from GCS)
125    - METADATA: Connector metadata operations (future)
126    - QA: Connector quality assurance (future)
127    - INSIGHTS: Connector analysis and insights (future)
128
129    Note: Server info resource is now built-in via mcp_server() helper.
130
131    Args:
132        app: FastMCP application instance
133    """
134    register_github_repo_ops_tools(app)
135    register_github_actions_tools(app)
136    register_prerelease_tools(app)
137    register_cloud_connector_version_tools(app)
138    register_connector_rollout_tools(app)
139    register_prod_db_query_tools(app)
140    register_gcp_logs_tools(app)
141    register_prompts(app)
142    register_regression_tests_tools(app)
143    register_registry_tools(app)
144    register_connection_state_tools(app)
145    register_connection_medic_tools(app)
146    register_organization_payment_config_tools(app)
147    register_people_lookup_tools(app)
148    register_human_in_the_loop_tools(app)
149    register_devin_reminder_tools(app)
150    register_message_bus_tools(app)
151    register_session_feedback_tools(app)
152    register_session_namer_tools(app)
153    register_slack_messaging_tools(app)
154    register_devin_secret_request_tools(app)
155    register_tier_lookup_tools(app)
156    register_release_block_tools(app)

Register all server assets (tools, prompts, resources) with the FastMCP app.

This function registers assets for all domains:

  • REPO: GitHub repository operations
  • CLOUD: Cloud connector version management
  • PROMPTS: Prompt templates for common workflows
  • REGRESSION_TESTS: Connector regression tests (single-version and comparison)
  • REGISTRY: Connector registry operations (read/write metadata from GCS)
  • METADATA: Connector metadata operations (future)
  • QA: Connector quality assurance (future)
  • INSIGHTS: Connector analysis and insights (future)

Note: Server info resource is now built-in via mcp_server() helper.

Arguments:
  • app: FastMCP application instance
def main() -> None:
170def main() -> None:
171    """Main entry point for the Airbyte Admin MCP server (stdio mode).
172
173    This is the default entry point that runs the server in stdio mode,
174    suitable for direct MCP client connections.
175    """
176    _load_env()
177    init_sentry_tracking()
178
179    print("=" * 60, flush=True, file=sys.stderr)
180    print("Starting Airbyte Admin MCP server (stdio mode).", file=sys.stderr)
181    try:
182        asyncio.run(app.run_stdio_async(show_banner=False))
183    except KeyboardInterrupt:
184        print("Airbyte Admin MCP server interrupted by user.", file=sys.stderr)
185
186    print("Airbyte Admin MCP server stopped.", file=sys.stderr)
187    print("=" * 60, flush=True, file=sys.stderr)

Main entry point for the Airbyte Admin MCP server (stdio mode).

This is the default entry point that runs the server in stdio mode, suitable for direct MCP client connections.

def main_http() -> None:
217def main_http() -> None:
218    """HTTP entry point for the Airbyte Admin MCP server.
219
220    This entry point runs the server in HTTP mode, suitable for containerized
221    deployments where the server needs to be accessible over HTTP.
222
223    Environment Variables:
224        MCP_HTTP_HOST: Host to bind to (default: 127.0.0.1)
225        MCP_HTTP_PORT: Port to listen on (default: 8082)
226    """
227    _load_env()
228    init_sentry_tracking()
229
230    host = os.getenv("MCP_HTTP_HOST", DEFAULT_HTTP_HOST)
231    port = _parse_port(os.getenv("MCP_HTTP_PORT"), DEFAULT_HTTP_PORT)
232
233    print("=" * 60, flush=True, file=sys.stderr)
234    print(
235        f"Starting Airbyte Admin MCP server (HTTP mode) on {host}:{port}",
236        file=sys.stderr,
237    )
238    try:
239        app.run(transport="http", host=host, port=port)
240    except KeyboardInterrupt:
241        print("Airbyte Admin MCP server interrupted by user.", file=sys.stderr)
242
243    print("Airbyte Admin MCP server stopped.", file=sys.stderr)
244    print("=" * 60, flush=True, file=sys.stderr)

HTTP entry point for the Airbyte Admin MCP server.

This entry point runs the server in HTTP mode, suitable for containerized deployments where the server needs to be accessible over HTTP.

Environment Variables:

MCP_HTTP_HOST: Host to bind to (default: 127.0.0.1) MCP_HTTP_PORT: Port to listen on (default: 8082)