airbyte.mcp.server
MCP (Model Context Protocol) server for PyAirbyte connector management.
Supports two transport modes:
- stdio (default): For local MCP clients (Claude Desktop, etc.)
- HTTP: For hosted deployment. Start via
airbyte-mcp-httpentry point orpoe mcp-serve-http. WhenOIDC_CONFIG_URL,OIDC_CLIENT_ID, andOIDC_CLIENT_SECRETare all set, enables Keycloak OIDC authentication viaOIDCProxy.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""MCP (Model Context Protocol) server for PyAirbyte connector management. 3 4Supports two transport modes: 5 6- **stdio** (default): For local MCP clients (Claude Desktop, etc.) 7- **HTTP**: For hosted deployment. Start via `airbyte-mcp-http` entry point or 8 `poe mcp-serve-http`. When `OIDC_CONFIG_URL`, `OIDC_CLIENT_ID`, and 9 `OIDC_CLIENT_SECRET` are all set, enables Keycloak OIDC authentication 10 via `OIDCProxy`. 11""" 12 13from __future__ import annotations 14 15import asyncio 16import logging 17import os 18import sys 19from typing import TYPE_CHECKING 20 21from fastmcp.server.auth.oidc_proxy import OIDCProxy 22from fastmcp_extensions import mcp_server 23from starlette.responses import JSONResponse 24 25 26if TYPE_CHECKING: 27 from starlette.requests import Request 28 29from airbyte._util.meta import set_mcp_mode 30from airbyte.mcp._config import load_secrets_to_env_vars 31from airbyte.mcp._tool_utils import ( 32 AIRBYTE_EXCLUDE_MODULES_CONFIG_ARG, 33 AIRBYTE_INCLUDE_MODULES_CONFIG_ARG, 34 AIRBYTE_READONLY_MODE_CONFIG_ARG, 35 API_URL_CONFIG_ARG, 36 BEARER_TOKEN_CONFIG_ARG, 37 CLIENT_ID_CONFIG_ARG, 38 CLIENT_SECRET_CONFIG_ARG, 39 CONFIG_API_URL_CONFIG_ARG, 40 WORKSPACE_ID_CONFIG_ARG, 41 airbyte_module_filter, 42 airbyte_readonly_mode_filter, 43 airbyte_ui_support_filter, 44) 45from airbyte.mcp.cloud import register_cloud_tools 46from airbyte.mcp.interactive import register_interactive_tools 47from airbyte.mcp.local import register_local_tools 48from airbyte.mcp.prompts import register_prompts 49from airbyte.mcp.registry import register_registry_tools 50 51 52# ============================================================================= 53# Server Instructions 54# ============================================================================= 55# This text is provided to AI agents via the MCP protocol's "instructions" field. 56# It helps agents understand when to use this server's tools, especially when 57# tool search is enabled. For more context, see: 58# - FastMCP docs: https://gofastmcp.com/servers/overview 59# - Claude tool search: https://www.anthropic.com/news/tool-use-improvements 60# ============================================================================= 61 62MCP_SERVER_INSTRUCTIONS = """ 63PyAirbyte connector management and data integration server for discovering, 64deploying, and running Airbyte connectors. 65 66Use this server for: 67- Discovering connectors from the Airbyte registry (sources and destinations) 68- Deploying sources, destinations, and connections to Airbyte Cloud 69- Running cloud syncs and monitoring sync status 70- Managing custom connector definitions in Airbyte Cloud 71- Local connector execution for data extraction without cloud deployment 72- Listing and describing environment variables for connector configuration 73 74Operational modes: 75- Cloud operations: Deploy and manage connectors on Airbyte Cloud (requires 76 AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, AIRBYTE_CLOUD_WORKSPACE_ID) 77- Local operations: Run connectors locally for data extraction (requires 78 AIRBYTE_PROJECT_DIR for artifact storage) 79 80Safety features: 81- Safe mode (default): Restricts destructive operations to objects created in 82 the current session 83- Read-only mode: Disables all write operations for cloud resources 84""".strip() 85 86logger = logging.getLogger(__name__) 87 88OIDC_CONFIG_URL_ENV = "OIDC_CONFIG_URL" 89OIDC_CLIENT_ID_ENV = "OIDC_CLIENT_ID" 90OIDC_CLIENT_SECRET_ENV = "OIDC_CLIENT_SECRET" 91MCP_SERVER_URL_ENV = "MCP_SERVER_URL" 92 93DEFAULT_HTTP_HOST = "0.0.0.0" 94DEFAULT_HTTP_PORT = 8080 95 96 97def _create_oidc_auth() -> OIDCProxy | None: 98 """Create an `OIDCProxy` auth provider when OIDC env vars are configured. 99 100 When `OIDC_CONFIG_URL`, `OIDC_CLIENT_ID`, and `OIDC_CLIENT_SECRET` are all 101 set, returns an `OIDCProxy` that handles the Keycloak Authorization Code + 102 PKCE flow. Otherwise returns `None` (no auth, standard local behavior). 103 """ 104 config_url = os.getenv(OIDC_CONFIG_URL_ENV, "") 105 client_id = os.getenv(OIDC_CLIENT_ID_ENV, "") 106 client_secret = os.getenv(OIDC_CLIENT_SECRET_ENV, "") 107 108 if not config_url or not client_id or not client_secret: 109 return None 110 111 server_url = os.getenv( 112 MCP_SERVER_URL_ENV, 113 f"http://localhost:{DEFAULT_HTTP_PORT}", 114 ) 115 116 logger.info( 117 "OIDC auth enabled (issuer=%s, client_id=%s, base_url=%s)", 118 config_url, 119 client_id, 120 server_url, 121 ) 122 return OIDCProxy( 123 config_url=config_url, 124 client_id=client_id, 125 client_secret=client_secret, 126 base_url=server_url, 127 ) 128 129 130set_mcp_mode() 131load_secrets_to_env_vars() 132 133app = mcp_server( 134 name="airbyte-mcp", 135 package_name="airbyte", 136 instructions=MCP_SERVER_INSTRUCTIONS, 137 include_standard_tool_filters=True, 138 server_config_args=[ 139 AIRBYTE_READONLY_MODE_CONFIG_ARG, 140 AIRBYTE_EXCLUDE_MODULES_CONFIG_ARG, 141 AIRBYTE_INCLUDE_MODULES_CONFIG_ARG, 142 WORKSPACE_ID_CONFIG_ARG, 143 BEARER_TOKEN_CONFIG_ARG, 144 CLIENT_ID_CONFIG_ARG, 145 CLIENT_SECRET_CONFIG_ARG, 146 API_URL_CONFIG_ARG, 147 CONFIG_API_URL_CONFIG_ARG, 148 ], 149 tool_filters=[ 150 airbyte_readonly_mode_filter, 151 airbyte_module_filter, 152 airbyte_ui_support_filter, 153 ], 154 auth=_create_oidc_auth(), 155) 156"""The Airbyte MCP Server application instance.""" 157 158# Register tools from each module 159register_cloud_tools(app) 160register_local_tools(app) 161register_registry_tools(app) 162register_interactive_tools(app) 163register_prompts(app) 164 165 166@app.custom_route("/health", methods=["GET"]) 167async def health_check(request: Request) -> JSONResponse: # noqa: ARG001, RUF029 168 """Health check endpoint for load balancer probes.""" 169 return JSONResponse({"status": "ok"}) 170 171 172def main() -> None: 173 """@private Main entry point for the MCP server. 174 175 This function starts the FastMCP server to handle MCP requests. 176 177 It should not be called directly; instead, consult the MCP client documentation 178 for instructions on how to connect to the server. 179 """ 180 print("Starting Airbyte MCP server.", file=sys.stderr) 181 try: 182 asyncio.run(app.run_stdio_async()) 183 except KeyboardInterrupt: 184 print("Airbyte MCP server interrupted by user.", file=sys.stderr) 185 except Exception as ex: 186 print(f"Error running Airbyte MCP server: {ex}", file=sys.stderr) 187 sys.exit(1) 188 189 print("Airbyte MCP server stopped.", file=sys.stderr) 190 191 192if __name__ == "__main__": 193 main()
MCP_SERVER_INSTRUCTIONS =
'PyAirbyte connector management and data integration server for discovering,\ndeploying, and running Airbyte connectors.\n\nUse this server for:\n- Discovering connectors from the Airbyte registry (sources and destinations)\n- Deploying sources, destinations, and connections to Airbyte Cloud\n- Running cloud syncs and monitoring sync status\n- Managing custom connector definitions in Airbyte Cloud\n- Local connector execution for data extraction without cloud deployment\n- Listing and describing environment variables for connector configuration\n\nOperational modes:\n- Cloud operations: Deploy and manage connectors on Airbyte Cloud (requires\n AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, AIRBYTE_CLOUD_WORKSPACE_ID)\n- Local operations: Run connectors locally for data extraction (requires\n AIRBYTE_PROJECT_DIR for artifact storage)\n\nSafety features:\n- Safe mode (default): Restricts destructive operations to objects created in\n the current session\n- Read-only mode: Disables all write operations for cloud resources'
logger =
<Logger airbyte.mcp.server (INFO)>
OIDC_CONFIG_URL_ENV =
'OIDC_CONFIG_URL'
OIDC_CLIENT_ID_ENV =
'OIDC_CLIENT_ID'
OIDC_CLIENT_SECRET_ENV =
'OIDC_CLIENT_SECRET'
MCP_SERVER_URL_ENV =
'MCP_SERVER_URL'
DEFAULT_HTTP_HOST =
'0.0.0.0'
DEFAULT_HTTP_PORT =
8080
app =
FastMCP('airbyte-mcp')
The Airbyte MCP Server application instance.
@app.custom_route('/health', methods=['GET'])
async def
health_check(request: starlette.requests.Request) -> starlette.responses.JSONResponse:
167@app.custom_route("/health", methods=["GET"]) 168async def health_check(request: Request) -> JSONResponse: # noqa: ARG001, RUF029 169 """Health check endpoint for load balancer probes.""" 170 return JSONResponse({"status": "ok"})
Health check endpoint for load balancer probes.