airbyte_ops_mcp.mcp.server

Airbyte Admin MCP server implementation.

This module provides the main MCP server for Airbyte admin operations.

The server can run in two modes:

  • stdio mode (default): For direct MCP client connections via stdin/stdout
  • HTTP mode: For HTTP-based MCP connections, useful for containerized deployments
Environment Variables:

MCP_HTTP_HOST: Host to bind HTTP server to (default: 127.0.0.1) MCP_HTTP_PORT: Port for HTTP server (default: 8082)

  1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
  2"""Airbyte Admin MCP server implementation.
  3
  4This module provides the main MCP server for Airbyte admin operations.
  5
  6The server can run in two modes:
  7- **stdio mode** (default): For direct MCP client connections via stdin/stdout
  8- **HTTP mode**: For HTTP-based MCP connections, useful for containerized deployments
  9
 10Environment Variables:
 11    MCP_HTTP_HOST: Host to bind HTTP server to (default: 127.0.0.1)
 12    MCP_HTTP_PORT: Port for HTTP server (default: 8082)
 13"""
 14
 15import asyncio
 16import os
 17import sys
 18from pathlib import Path
 19
 20from airbyte.cloud.auth import resolve_cloud_client_id, resolve_cloud_client_secret
 21from dotenv import load_dotenv
 22from fastmcp import FastMCP
 23from fastmcp_extensions import MCPServerConfigArg, mcp_server
 24
 25from airbyte_ops_mcp._sentry import init_sentry_tracking
 26from airbyte_ops_mcp.constants import (
 27    HEADER_AIRBYTE_CLOUD_CLIENT_ID,
 28    HEADER_AIRBYTE_CLOUD_CLIENT_SECRET,
 29    MCP_SERVER_NAME,
 30    ServerConfigKey,
 31)
 32from airbyte_ops_mcp.mcp._guidance import MCP_SERVER_INSTRUCTIONS
 33from airbyte_ops_mcp.mcp.agent_message_bus import register_message_bus_tools
 34from airbyte_ops_mcp.mcp.cloud_connector_versions import (
 35    register_cloud_connector_version_tools,
 36)
 37from airbyte_ops_mcp.mcp.connection_medic import register_connection_medic_tools
 38from airbyte_ops_mcp.mcp.connection_state import register_connection_state_tools
 39from airbyte_ops_mcp.mcp.connector_rollout import register_connector_rollout_tools
 40from airbyte_ops_mcp.mcp.devin_reminders import register_devin_reminder_tools
 41from airbyte_ops_mcp.mcp.devin_secret_request import register_devin_secret_request_tools
 42from airbyte_ops_mcp.mcp.gcp_logs import register_gcp_logs_tools
 43from airbyte_ops_mcp.mcp.github_actions import register_github_actions_tools
 44from airbyte_ops_mcp.mcp.github_repo_ops import register_github_repo_ops_tools
 45from airbyte_ops_mcp.mcp.human_in_the_loop import register_human_in_the_loop_tools
 46from airbyte_ops_mcp.mcp.organization_agentic_flag import (
 47    register_organization_agentic_flag_tools,
 48)
 49from airbyte_ops_mcp.mcp.organization_payment_config import (
 50    register_organization_payment_config_tools,
 51)
 52from airbyte_ops_mcp.mcp.people_lookup import register_people_lookup_tools
 53from airbyte_ops_mcp.mcp.prerelease import register_prerelease_tools
 54from airbyte_ops_mcp.mcp.prod_db_queries import register_prod_db_query_tools
 55from airbyte_ops_mcp.mcp.prompts import register_prompts
 56from airbyte_ops_mcp.mcp.registry import register_registry_tools
 57from airbyte_ops_mcp.mcp.regression_tests import register_regression_tests_tools
 58from airbyte_ops_mcp.mcp.release_block import register_release_block_tools
 59from airbyte_ops_mcp.mcp.session_feedback import register_session_feedback_tools
 60from airbyte_ops_mcp.mcp.session_namer import register_session_namer_tools
 61from airbyte_ops_mcp.mcp.slack_messaging import register_slack_messaging_tools
 62from airbyte_ops_mcp.mcp.tier_lookup import register_tier_lookup_tools
 63
 64# Default HTTP server configuration
 65DEFAULT_HTTP_HOST = "127.0.0.1"
 66DEFAULT_HTTP_PORT = 8082
 67
 68
 69def _normalize_bearer_token(value: str) -> str | None:
 70    """Extract bearer token from Authorization header value.
 71
 72    Parses "Bearer <token>" format (case-insensitive prefix).
 73    Returns None if the value doesn't have the Bearer prefix.
 74    """
 75    if value.lower().startswith("bearer "):
 76        token = value[7:].strip()
 77        return token if token else None
 78    return None
 79
 80
 81# Create the MCP server with built-in server info resource
 82app = mcp_server(
 83    name=MCP_SERVER_NAME,
 84    instructions=MCP_SERVER_INSTRUCTIONS,
 85    package_name="airbyte-internal-ops",
 86    advertised_properties={
 87        "docs_url": "https://github.com/airbytehq/airbyte-ops-mcp",
 88        "release_history_url": "https://github.com/airbytehq/airbyte-ops-mcp/releases",
 89    },
 90    server_config_args=[
 91        MCPServerConfigArg(
 92            name=ServerConfigKey.BEARER_TOKEN,
 93            http_header_key="Authorization",
 94            env_var="AIRBYTE_CLOUD_BEARER_TOKEN",
 95            normalize_fn=_normalize_bearer_token,
 96            required=False,
 97            sensitive=True,
 98        ),
 99        MCPServerConfigArg(
100            name=ServerConfigKey.CLIENT_ID,
101            http_header_key=HEADER_AIRBYTE_CLOUD_CLIENT_ID,
102            default=lambda: str(resolve_cloud_client_id()),
103            required=True,
104            sensitive=True,
105        ),
106        MCPServerConfigArg(
107            name=ServerConfigKey.CLIENT_SECRET,
108            http_header_key=HEADER_AIRBYTE_CLOUD_CLIENT_SECRET,
109            default=lambda: str(resolve_cloud_client_secret()),
110            required=True,
111            sensitive=True,
112        ),
113    ],
114    include_standard_tool_filters=True,
115)
116
117
118def register_server_assets(app: FastMCP) -> None:
119    """Register all server assets (tools, prompts, resources) with the FastMCP app.
120
121    This function registers assets for all domains:
122    - REPO: GitHub repository operations
123    - CLOUD: Cloud connector version management
124    - PROMPTS: Prompt templates for common workflows
125    - REGRESSION_TESTS: Connector regression tests (single-version and comparison)
126    - REGISTRY: Connector registry operations (read/write metadata from GCS)
127    - METADATA: Connector metadata operations (future)
128    - QA: Connector quality assurance (future)
129    - INSIGHTS: Connector analysis and insights (future)
130
131    Note: Server info resource is now built-in via mcp_server() helper.
132
133    Args:
134        app: FastMCP application instance
135    """
136    register_github_repo_ops_tools(app)
137    register_github_actions_tools(app)
138    register_prerelease_tools(app)
139    register_cloud_connector_version_tools(app)
140    register_connector_rollout_tools(app)
141    register_prod_db_query_tools(app)
142    register_gcp_logs_tools(app)
143    register_prompts(app)
144    register_regression_tests_tools(app)
145    register_registry_tools(app)
146    register_connection_state_tools(app)
147    register_connection_medic_tools(app)
148    register_organization_agentic_flag_tools(app)
149    register_organization_payment_config_tools(app)
150    register_people_lookup_tools(app)
151    register_human_in_the_loop_tools(app)
152    register_devin_reminder_tools(app)
153    register_message_bus_tools(app)
154    register_session_feedback_tools(app)
155    register_session_namer_tools(app)
156    register_slack_messaging_tools(app)
157    register_devin_secret_request_tools(app)
158    register_tier_lookup_tools(app)
159    register_release_block_tools(app)
160
161
162register_server_assets(app)
163
164
165def _load_env() -> None:
166    """Load environment variables from .env file if present."""
167    env_file = Path.cwd() / ".env"
168    if env_file.exists():
169        load_dotenv(env_file)
170        print(f"Loaded environment from: {env_file}", flush=True, file=sys.stderr)
171
172
173def main() -> None:
174    """Main entry point for the Airbyte Admin MCP server (stdio mode).
175
176    This is the default entry point that runs the server in stdio mode,
177    suitable for direct MCP client connections.
178    """
179    _load_env()
180    init_sentry_tracking()
181
182    print("=" * 60, flush=True, file=sys.stderr)
183    print("Starting Airbyte Admin MCP server (stdio mode).", file=sys.stderr)
184    try:
185        asyncio.run(app.run_stdio_async(show_banner=False))
186    except KeyboardInterrupt:
187        print("Airbyte Admin MCP server interrupted by user.", file=sys.stderr)
188
189    print("Airbyte Admin MCP server stopped.", file=sys.stderr)
190    print("=" * 60, flush=True, file=sys.stderr)
191
192
193def _parse_port(port_str: str | None, default: int) -> int:
194    """Parse and validate a port number from string.
195
196    Args:
197        port_str: Port string from environment variable, or None if not set
198        default: Default port to use if port_str is None
199
200    Returns:
201        Validated port number
202
203    Raises:
204        ValueError: If port_str is not a valid integer or out of range
205    """
206    if port_str is None:
207        return default
208
209    port_str = port_str.strip()
210    if not port_str.isdecimal():
211        raise ValueError(f"MCP_HTTP_PORT must be a valid integer, got: {port_str!r}")
212
213    port = int(port_str)
214    if not 1 <= port <= 65535:
215        raise ValueError(f"MCP_HTTP_PORT must be between 1 and 65535, got: {port}")
216
217    return port
218
219
220def main_http() -> None:
221    """HTTP entry point for the Airbyte Admin MCP server.
222
223    This entry point runs the server in HTTP mode, suitable for containerized
224    deployments where the server needs to be accessible over HTTP.
225
226    Environment Variables:
227        MCP_HTTP_HOST: Host to bind to (default: 127.0.0.1)
228        MCP_HTTP_PORT: Port to listen on (default: 8082)
229    """
230    _load_env()
231    init_sentry_tracking()
232
233    host = os.getenv("MCP_HTTP_HOST", DEFAULT_HTTP_HOST)
234    port = _parse_port(os.getenv("MCP_HTTP_PORT"), DEFAULT_HTTP_PORT)
235
236    print("=" * 60, flush=True, file=sys.stderr)
237    print(
238        f"Starting Airbyte Admin MCP server (HTTP mode) on {host}:{port}",
239        file=sys.stderr,
240    )
241    try:
242        app.run(transport="http", host=host, port=port)
243    except KeyboardInterrupt:
244        print("Airbyte Admin MCP server interrupted by user.", file=sys.stderr)
245
246    print("Airbyte Admin MCP server stopped.", file=sys.stderr)
247    print("=" * 60, flush=True, file=sys.stderr)
248
249
250if __name__ == "__main__":
251    main()
DEFAULT_HTTP_HOST = '127.0.0.1'
DEFAULT_HTTP_PORT = 8082
app = FastMCP('airbyte-internal-ops')
def register_server_assets(app: fastmcp.server.server.FastMCP) -> None:
119def register_server_assets(app: FastMCP) -> None:
120    """Register all server assets (tools, prompts, resources) with the FastMCP app.
121
122    This function registers assets for all domains:
123    - REPO: GitHub repository operations
124    - CLOUD: Cloud connector version management
125    - PROMPTS: Prompt templates for common workflows
126    - REGRESSION_TESTS: Connector regression tests (single-version and comparison)
127    - REGISTRY: Connector registry operations (read/write metadata from GCS)
128    - METADATA: Connector metadata operations (future)
129    - QA: Connector quality assurance (future)
130    - INSIGHTS: Connector analysis and insights (future)
131
132    Note: Server info resource is now built-in via mcp_server() helper.
133
134    Args:
135        app: FastMCP application instance
136    """
137    register_github_repo_ops_tools(app)
138    register_github_actions_tools(app)
139    register_prerelease_tools(app)
140    register_cloud_connector_version_tools(app)
141    register_connector_rollout_tools(app)
142    register_prod_db_query_tools(app)
143    register_gcp_logs_tools(app)
144    register_prompts(app)
145    register_regression_tests_tools(app)
146    register_registry_tools(app)
147    register_connection_state_tools(app)
148    register_connection_medic_tools(app)
149    register_organization_agentic_flag_tools(app)
150    register_organization_payment_config_tools(app)
151    register_people_lookup_tools(app)
152    register_human_in_the_loop_tools(app)
153    register_devin_reminder_tools(app)
154    register_message_bus_tools(app)
155    register_session_feedback_tools(app)
156    register_session_namer_tools(app)
157    register_slack_messaging_tools(app)
158    register_devin_secret_request_tools(app)
159    register_tier_lookup_tools(app)
160    register_release_block_tools(app)

Register all server assets (tools, prompts, resources) with the FastMCP app.

This function registers assets for all domains:

  • REPO: GitHub repository operations
  • CLOUD: Cloud connector version management
  • PROMPTS: Prompt templates for common workflows
  • REGRESSION_TESTS: Connector regression tests (single-version and comparison)
  • REGISTRY: Connector registry operations (read/write metadata from GCS)
  • METADATA: Connector metadata operations (future)
  • QA: Connector quality assurance (future)
  • INSIGHTS: Connector analysis and insights (future)

Note: Server info resource is now built-in via mcp_server() helper.

Arguments:
  • app: FastMCP application instance
def main() -> None:
174def main() -> None:
175    """Main entry point for the Airbyte Admin MCP server (stdio mode).
176
177    This is the default entry point that runs the server in stdio mode,
178    suitable for direct MCP client connections.
179    """
180    _load_env()
181    init_sentry_tracking()
182
183    print("=" * 60, flush=True, file=sys.stderr)
184    print("Starting Airbyte Admin MCP server (stdio mode).", file=sys.stderr)
185    try:
186        asyncio.run(app.run_stdio_async(show_banner=False))
187    except KeyboardInterrupt:
188        print("Airbyte Admin MCP server interrupted by user.", file=sys.stderr)
189
190    print("Airbyte Admin MCP server stopped.", file=sys.stderr)
191    print("=" * 60, flush=True, file=sys.stderr)

Main entry point for the Airbyte Admin MCP server (stdio mode).

This is the default entry point that runs the server in stdio mode, suitable for direct MCP client connections.

def main_http() -> None:
221def main_http() -> None:
222    """HTTP entry point for the Airbyte Admin MCP server.
223
224    This entry point runs the server in HTTP mode, suitable for containerized
225    deployments where the server needs to be accessible over HTTP.
226
227    Environment Variables:
228        MCP_HTTP_HOST: Host to bind to (default: 127.0.0.1)
229        MCP_HTTP_PORT: Port to listen on (default: 8082)
230    """
231    _load_env()
232    init_sentry_tracking()
233
234    host = os.getenv("MCP_HTTP_HOST", DEFAULT_HTTP_HOST)
235    port = _parse_port(os.getenv("MCP_HTTP_PORT"), DEFAULT_HTTP_PORT)
236
237    print("=" * 60, flush=True, file=sys.stderr)
238    print(
239        f"Starting Airbyte Admin MCP server (HTTP mode) on {host}:{port}",
240        file=sys.stderr,
241    )
242    try:
243        app.run(transport="http", host=host, port=port)
244    except KeyboardInterrupt:
245        print("Airbyte Admin MCP server interrupted by user.", file=sys.stderr)
246
247    print("Airbyte Admin MCP server stopped.", file=sys.stderr)
248    print("=" * 60, flush=True, file=sys.stderr)

HTTP entry point for the Airbyte Admin MCP server.

This entry point runs the server in HTTP mode, suitable for containerized deployments where the server needs to be accessible over HTTP.

Environment Variables:

MCP_HTTP_HOST: Host to bind to (default: 127.0.0.1) MCP_HTTP_PORT: Port to listen on (default: 8082)