airbyte_ops_mcp.mcp.server

Airbyte Admin MCP server implementation.

This module provides the main MCP server for Airbyte admin operations.

The server can run in two modes:

  • stdio mode (default): For direct MCP client connections via stdin/stdout
  • HTTP mode: For HTTP-based MCP connections. When OIDC_CONFIG_URL, OIDC_CLIENT_ID, and OIDC_CLIENT_SECRET are all set, enables Keycloak OIDC authentication via OIDCProxy.
HTTP mode environment variables:

MCP_SERVER_URL: Public base URL for the MCP server (also used for OIDC redirect callbacks). Defaults to http://localhost:8080. OIDC_CONFIG_URL: Keycloak OIDC discovery URL (enables auth when set) OIDC_CLIENT_ID: OAuth client ID for Keycloak OIDC_CLIENT_SECRET: OAuth client secret for Keycloak

  1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
  2"""Airbyte Admin MCP server implementation.
  3
  4This module provides the main MCP server for Airbyte admin operations.
  5
  6The server can run in two modes:
  7- **stdio mode** (default): For direct MCP client connections via stdin/stdout
  8- **HTTP mode**: For HTTP-based MCP connections. When `OIDC_CONFIG_URL`,
  9  `OIDC_CLIENT_ID`, and `OIDC_CLIENT_SECRET` are all set, enables Keycloak
 10  OIDC authentication via `OIDCProxy`.
 11
 12HTTP mode environment variables:
 13    MCP_SERVER_URL: Public base URL for the MCP server (also used for OIDC
 14        redirect callbacks). Defaults to `http://localhost:8080`.
 15    OIDC_CONFIG_URL: Keycloak OIDC discovery URL (enables auth when set)
 16    OIDC_CLIENT_ID: OAuth client ID for Keycloak
 17    OIDC_CLIENT_SECRET: OAuth client secret for Keycloak
 18"""
 19
 20import asyncio
 21import logging
 22import os
 23import sys
 24from pathlib import Path
 25from urllib.parse import urlparse
 26
 27from airbyte.cloud.auth import resolve_cloud_client_id, resolve_cloud_client_secret
 28from dotenv import load_dotenv
 29from fastmcp import FastMCP
 30from fastmcp.server.auth.oidc_proxy import OIDCProxy
 31from fastmcp.server.dependencies import get_access_token
 32from fastmcp_extensions import (
 33    MCPServerConfigArg,
 34    ToolCallTelemetryMiddleware,
 35    mcp_server,
 36)
 37from starlette.requests import Request
 38from starlette.responses import JSONResponse
 39
 40from airbyte_ops_mcp._sentry import _SENTRY_DSN, init_sentry_tracking
 41from airbyte_ops_mcp.constants import (
 42    HEADER_AIRBYTE_CLOUD_CLIENT_ID,
 43    HEADER_AIRBYTE_CLOUD_CLIENT_SECRET,
 44    MCP_SERVER_NAME,
 45    ServerConfigKey,
 46)
 47from airbyte_ops_mcp.mcp._guidance import MCP_SERVER_INSTRUCTIONS
 48from airbyte_ops_mcp.mcp.agent_message_bus import register_message_bus_tools
 49from airbyte_ops_mcp.mcp.cloud_connector_versions import (
 50    register_cloud_connector_version_tools,
 51)
 52from airbyte_ops_mcp.mcp.connection_medic import register_connection_medic_tools
 53from airbyte_ops_mcp.mcp.connection_state import register_connection_state_tools
 54from airbyte_ops_mcp.mcp.connector_rollout import register_connector_rollout_tools
 55from airbyte_ops_mcp.mcp.devin_reminders import register_devin_reminder_tools
 56from airbyte_ops_mcp.mcp.devin_secret_request import register_devin_secret_request_tools
 57from airbyte_ops_mcp.mcp.gcp_logs import register_gcp_logs_tools
 58from airbyte_ops_mcp.mcp.github_actions import register_github_actions_tools
 59from airbyte_ops_mcp.mcp.github_repo_ops import register_github_repo_ops_tools
 60from airbyte_ops_mcp.mcp.human_in_the_loop import register_human_in_the_loop_tools
 61from airbyte_ops_mcp.mcp.organization_agentic_flag import (
 62    register_organization_agentic_flag_tools,
 63)
 64from airbyte_ops_mcp.mcp.organization_payment_config import (
 65    register_organization_payment_config_tools,
 66)
 67from airbyte_ops_mcp.mcp.people_lookup import register_people_lookup_tools
 68from airbyte_ops_mcp.mcp.prerelease import register_prerelease_tools
 69from airbyte_ops_mcp.mcp.prod_db_queries import register_prod_db_query_tools
 70from airbyte_ops_mcp.mcp.prompts import register_prompts
 71from airbyte_ops_mcp.mcp.registry import register_registry_tools
 72from airbyte_ops_mcp.mcp.regression_tests import register_regression_tests_tools
 73from airbyte_ops_mcp.mcp.release_block import register_release_block_tools
 74from airbyte_ops_mcp.mcp.session_feedback import register_session_feedback_tools
 75from airbyte_ops_mcp.mcp.session_namer import register_session_namer_tools
 76from airbyte_ops_mcp.mcp.slack_messaging import register_slack_messaging_tools
 77from airbyte_ops_mcp.mcp.tier_lookup import register_tier_lookup_tools
 78from airbyte_ops_mcp.telemetry import _DEFAULT_SEGMENT_WRITE_KEY
 79
 80logger = logging.getLogger(__name__)
 81
 82# Default HTTP server configuration
 83DEFAULT_HTTP_HOST = "0.0.0.0"
 84DEFAULT_HTTP_PORT = 8080
 85
 86# OIDC environment variable names
 87OIDC_CONFIG_URL_ENV = "OIDC_CONFIG_URL"
 88OIDC_CLIENT_ID_ENV = "OIDC_CLIENT_ID"
 89OIDC_CLIENT_SECRET_ENV = "OIDC_CLIENT_SECRET"
 90MCP_SERVER_URL_ENV = "MCP_SERVER_URL"
 91
 92
 93def _normalize_bearer_token(value: str) -> str | None:
 94    """Extract bearer token from Authorization header value.
 95
 96    Parses "Bearer <token>" format (case-insensitive prefix).
 97    Returns None if the value doesn't have the Bearer prefix.
 98    """
 99    if value.lower().startswith("bearer "):
100        token = value[7:].strip()
101        return token if token else None
102    return None
103
104
105def _resolve_oidc_bearer_token() -> str:
106    """Resolve the upstream bearer token from OIDC auth if available.
107
108    When the server uses OIDCProxy (Keycloak/Okta), the user's upstream
109    access token is stored by FastMCP after the OAuth flow completes.
110    This function retrieves it so Cloud API tools can use the user's
111    identity for delegated access.
112
113    Returns empty string when no OIDC session is active (e.g. stdio mode).
114    """
115    access_token = get_access_token()
116    if access_token and access_token.token:
117        return access_token.token
118    return ""
119
120
121def _create_oidc_auth() -> OIDCProxy | None:
122    """Create an `OIDCProxy` auth provider when OIDC env vars are configured.
123
124    When `OIDC_CONFIG_URL`, `OIDC_CLIENT_ID`, and `OIDC_CLIENT_SECRET` are all
125    set, returns an `OIDCProxy` that handles the Keycloak Authorization Code +
126    PKCE flow for browser-based MCP clients. When any is empty, returns `None`
127    (no OIDC auth — the server falls back to header-based credential resolution).
128    """
129    config_url = os.getenv(OIDC_CONFIG_URL_ENV, "")
130    client_id = os.getenv(OIDC_CLIENT_ID_ENV, "")
131    client_secret = os.getenv(OIDC_CLIENT_SECRET_ENV, "")
132
133    if not config_url or not client_id or not client_secret:
134        return None
135
136    server_url = os.getenv(
137        MCP_SERVER_URL_ENV,
138        f"http://localhost:{DEFAULT_HTTP_PORT}",
139    )
140
141    logger.info(
142        "OIDC auth enabled (issuer=%s, client_id=%s, base_url=%s)",
143        config_url,
144        client_id,
145        server_url,
146    )
147    return OIDCProxy(
148        config_url=config_url,
149        client_id=client_id,
150        client_secret=client_secret,
151        base_url=server_url,
152    )
153
154
155# Create the MCP server with built-in server info resource
156app = mcp_server(
157    name=MCP_SERVER_NAME,
158    instructions=MCP_SERVER_INSTRUCTIONS,
159    package_name="airbyte-internal-ops",
160    advertised_properties={
161        "docs_url": "https://github.com/airbytehq/airbyte-ops-mcp",
162        "release_history_url": "https://github.com/airbytehq/airbyte-ops-mcp/releases",
163    },
164    server_config_args=[
165        MCPServerConfigArg(
166            name=ServerConfigKey.BEARER_TOKEN,
167            http_header_key="Authorization",
168            env_var="AIRBYTE_CLOUD_BEARER_TOKEN",
169            normalize_fn=_normalize_bearer_token,
170            default=_resolve_oidc_bearer_token,
171            required=False,
172            sensitive=True,
173        ),
174        MCPServerConfigArg(
175            name=ServerConfigKey.CLIENT_ID,
176            http_header_key=HEADER_AIRBYTE_CLOUD_CLIENT_ID,
177            default=lambda: str(resolve_cloud_client_id()),
178            required=True,
179            sensitive=True,
180        ),
181        MCPServerConfigArg(
182            name=ServerConfigKey.CLIENT_SECRET,
183            http_header_key=HEADER_AIRBYTE_CLOUD_CLIENT_SECRET,
184            default=lambda: str(resolve_cloud_client_secret()),
185            required=True,
186            sensitive=True,
187        ),
188    ],
189    include_standard_tool_filters=True,
190    auth=_create_oidc_auth(),
191)
192
193
194def register_server_assets(app: FastMCP) -> None:
195    """Register all server assets (tools, prompts, resources) with the FastMCP app.
196
197    This function registers assets for all domains:
198    - REPO: GitHub repository operations
199    - CLOUD: Cloud connector version management
200    - PROMPTS: Prompt templates for common workflows
201    - REGRESSION_TESTS: Connector regression tests (single-version and comparison)
202    - REGISTRY: Connector registry operations (read/write metadata from GCS)
203    - METADATA: Connector metadata operations (future)
204    - QA: Connector quality assurance (future)
205    - INSIGHTS: Connector analysis and insights (future)
206
207    Tools annotated with `requires_client_filesystem=True` are automatically
208    hidden when `MCP_NO_CLIENT_FILESYSTEM=1` via the standard tool filter.
209
210    Note: Server info resource is now built-in via `mcp_server()` helper.
211
212    Args:
213        app: FastMCP application instance
214    """
215    register_github_repo_ops_tools(app)
216    register_github_actions_tools(app)
217    register_prerelease_tools(app)
218    register_cloud_connector_version_tools(app)
219    register_connector_rollout_tools(app)
220    register_prod_db_query_tools(app)
221    register_gcp_logs_tools(app)
222    register_prompts(app)
223    register_regression_tests_tools(app)
224    register_registry_tools(app)
225    register_connection_state_tools(app)
226    register_connection_medic_tools(app)
227    register_organization_agentic_flag_tools(app)
228    register_organization_payment_config_tools(app)
229    register_people_lookup_tools(app)
230    register_human_in_the_loop_tools(app)
231    register_devin_reminder_tools(app)
232    register_message_bus_tools(app)
233    register_session_feedback_tools(app)
234    register_session_namer_tools(app)
235    register_slack_messaging_tools(app)
236    register_devin_secret_request_tools(app)
237    register_tier_lookup_tools(app)
238    register_release_block_tools(app)
239
240
241register_server_assets(app)
242app.add_middleware(
243    ToolCallTelemetryMiddleware(
244        package_name="airbyte-internal-ops",
245        sentry_dsn=_SENTRY_DSN,
246        segment_write_key=_DEFAULT_SEGMENT_WRITE_KEY,
247    )
248)
249
250
251@app.custom_route("/health", methods=["GET"])
252async def health_check(request: Request) -> JSONResponse:
253    """Health check endpoint for Cloud Run liveness/readiness probes."""
254    return JSONResponse({"status": "ok"})
255
256
257def _load_env() -> None:
258    """Load environment variables from .env file if present."""
259    env_file = Path.cwd() / ".env"
260    if env_file.exists():
261        load_dotenv(env_file)
262        print(f"Loaded environment from: {env_file}", flush=True, file=sys.stderr)
263
264
265def main() -> None:
266    """Main entry point for the Airbyte Admin MCP server (stdio mode).
267
268    This is the default entry point that runs the server in stdio mode,
269    suitable for direct MCP client connections.
270    """
271    _load_env()
272    init_sentry_tracking()
273
274    print("=" * 60, flush=True, file=sys.stderr)
275    print("Starting Airbyte Admin MCP server (stdio mode).", file=sys.stderr)
276    try:
277        asyncio.run(app.run_stdio_async(show_banner=False))
278    except KeyboardInterrupt:
279        print("Airbyte Admin MCP server interrupted by user.", file=sys.stderr)
280
281    print("Airbyte Admin MCP server stopped.", file=sys.stderr)
282    print("=" * 60, flush=True, file=sys.stderr)
283
284
285def main_http() -> None:
286    """HTTP entry point for the Airbyte Admin MCP server.
287
288    Runs the server in HTTP mode. When OIDC env vars are configured,
289    Keycloak authentication is enabled automatically.
290    """
291    _load_env()
292    init_sentry_tracking()
293
294    host = DEFAULT_HTTP_HOST
295    port = DEFAULT_HTTP_PORT
296
297    # When deployed behind a path-stripping LB (MCP_SERVER_URL has a path
298    # component like /ops-mcp), serve the MCP endpoint at root so the
299    # public URL is just the base path. Otherwise keep the FastMCP default.
300    server_url = os.getenv(
301        MCP_SERVER_URL_ENV,
302        f"http://localhost:{DEFAULT_HTTP_PORT}",
303    )
304    mcp_path = "/" if urlparse(server_url).path.strip("/") else "/mcp"
305
306    print("=" * 60, flush=True, file=sys.stderr)
307    print(
308        f"Starting Airbyte Admin MCP server (HTTP mode) on {host}:{port}"
309        f" (mcp_path={mcp_path!r})",
310        file=sys.stderr,
311    )
312    try:
313        app.run(
314            transport="streamable-http",
315            host=host,
316            port=port,
317            path=mcp_path,
318            stateless_http=True,
319        )
320    except KeyboardInterrupt:
321        print("Airbyte Admin MCP server interrupted by user.", file=sys.stderr)
322
323    print("Airbyte Admin MCP server stopped.", file=sys.stderr)
324    print("=" * 60, flush=True, file=sys.stderr)
325
326
327if __name__ == "__main__":
328    main()
logger = <Logger airbyte_ops_mcp.mcp.server (WARNING)>
DEFAULT_HTTP_HOST = '0.0.0.0'
DEFAULT_HTTP_PORT = 8080
OIDC_CONFIG_URL_ENV = 'OIDC_CONFIG_URL'
OIDC_CLIENT_ID_ENV = 'OIDC_CLIENT_ID'
OIDC_CLIENT_SECRET_ENV = 'OIDC_CLIENT_SECRET'
MCP_SERVER_URL_ENV = 'MCP_SERVER_URL'
app = FastMCP('airbyte-internal-ops')
def register_server_assets(app: fastmcp.server.server.FastMCP) -> None:
195def register_server_assets(app: FastMCP) -> None:
196    """Register all server assets (tools, prompts, resources) with the FastMCP app.
197
198    This function registers assets for all domains:
199    - REPO: GitHub repository operations
200    - CLOUD: Cloud connector version management
201    - PROMPTS: Prompt templates for common workflows
202    - REGRESSION_TESTS: Connector regression tests (single-version and comparison)
203    - REGISTRY: Connector registry operations (read/write metadata from GCS)
204    - METADATA: Connector metadata operations (future)
205    - QA: Connector quality assurance (future)
206    - INSIGHTS: Connector analysis and insights (future)
207
208    Tools annotated with `requires_client_filesystem=True` are automatically
209    hidden when `MCP_NO_CLIENT_FILESYSTEM=1` via the standard tool filter.
210
211    Note: Server info resource is now built-in via `mcp_server()` helper.
212
213    Args:
214        app: FastMCP application instance
215    """
216    register_github_repo_ops_tools(app)
217    register_github_actions_tools(app)
218    register_prerelease_tools(app)
219    register_cloud_connector_version_tools(app)
220    register_connector_rollout_tools(app)
221    register_prod_db_query_tools(app)
222    register_gcp_logs_tools(app)
223    register_prompts(app)
224    register_regression_tests_tools(app)
225    register_registry_tools(app)
226    register_connection_state_tools(app)
227    register_connection_medic_tools(app)
228    register_organization_agentic_flag_tools(app)
229    register_organization_payment_config_tools(app)
230    register_people_lookup_tools(app)
231    register_human_in_the_loop_tools(app)
232    register_devin_reminder_tools(app)
233    register_message_bus_tools(app)
234    register_session_feedback_tools(app)
235    register_session_namer_tools(app)
236    register_slack_messaging_tools(app)
237    register_devin_secret_request_tools(app)
238    register_tier_lookup_tools(app)
239    register_release_block_tools(app)

Register all server assets (tools, prompts, resources) with the FastMCP app.

This function registers assets for all domains:

  • REPO: GitHub repository operations
  • CLOUD: Cloud connector version management
  • PROMPTS: Prompt templates for common workflows
  • REGRESSION_TESTS: Connector regression tests (single-version and comparison)
  • REGISTRY: Connector registry operations (read/write metadata from GCS)
  • METADATA: Connector metadata operations (future)
  • QA: Connector quality assurance (future)
  • INSIGHTS: Connector analysis and insights (future)

Tools annotated with requires_client_filesystem=True are automatically hidden when MCP_NO_CLIENT_FILESYSTEM=1 via the standard tool filter.

Note: Server info resource is now built-in via mcp_server() helper.

Arguments:
  • app: FastMCP application instance
@app.custom_route('/health', methods=['GET'])
async def health_check(request: starlette.requests.Request) -> starlette.responses.JSONResponse:
252@app.custom_route("/health", methods=["GET"])
253async def health_check(request: Request) -> JSONResponse:
254    """Health check endpoint for Cloud Run liveness/readiness probes."""
255    return JSONResponse({"status": "ok"})

Health check endpoint for Cloud Run liveness/readiness probes.

def main() -> None:
266def main() -> None:
267    """Main entry point for the Airbyte Admin MCP server (stdio mode).
268
269    This is the default entry point that runs the server in stdio mode,
270    suitable for direct MCP client connections.
271    """
272    _load_env()
273    init_sentry_tracking()
274
275    print("=" * 60, flush=True, file=sys.stderr)
276    print("Starting Airbyte Admin MCP server (stdio mode).", file=sys.stderr)
277    try:
278        asyncio.run(app.run_stdio_async(show_banner=False))
279    except KeyboardInterrupt:
280        print("Airbyte Admin MCP server interrupted by user.", file=sys.stderr)
281
282    print("Airbyte Admin MCP server stopped.", file=sys.stderr)
283    print("=" * 60, flush=True, file=sys.stderr)

Main entry point for the Airbyte Admin MCP server (stdio mode).

This is the default entry point that runs the server in stdio mode, suitable for direct MCP client connections.

def main_http() -> None:
286def main_http() -> None:
287    """HTTP entry point for the Airbyte Admin MCP server.
288
289    Runs the server in HTTP mode. When OIDC env vars are configured,
290    Keycloak authentication is enabled automatically.
291    """
292    _load_env()
293    init_sentry_tracking()
294
295    host = DEFAULT_HTTP_HOST
296    port = DEFAULT_HTTP_PORT
297
298    # When deployed behind a path-stripping LB (MCP_SERVER_URL has a path
299    # component like /ops-mcp), serve the MCP endpoint at root so the
300    # public URL is just the base path. Otherwise keep the FastMCP default.
301    server_url = os.getenv(
302        MCP_SERVER_URL_ENV,
303        f"http://localhost:{DEFAULT_HTTP_PORT}",
304    )
305    mcp_path = "/" if urlparse(server_url).path.strip("/") else "/mcp"
306
307    print("=" * 60, flush=True, file=sys.stderr)
308    print(
309        f"Starting Airbyte Admin MCP server (HTTP mode) on {host}:{port}"
310        f" (mcp_path={mcp_path!r})",
311        file=sys.stderr,
312    )
313    try:
314        app.run(
315            transport="streamable-http",
316            host=host,
317            port=port,
318            path=mcp_path,
319            stateless_http=True,
320        )
321    except KeyboardInterrupt:
322        print("Airbyte Admin MCP server interrupted by user.", file=sys.stderr)
323
324    print("Airbyte Admin MCP server stopped.", file=sys.stderr)
325    print("=" * 60, flush=True, file=sys.stderr)

HTTP entry point for the Airbyte Admin MCP server.

Runs the server in HTTP mode. When OIDC env vars are configured, Keycloak authentication is enabled automatically.