airbyte.mcp.server
Experimental MCP (Model Context Protocol) server for PyAirbyte connector management.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Experimental MCP (Model Context Protocol) server for PyAirbyte connector management.""" 3 4import asyncio 5import sys 6 7from fastmcp import FastMCP 8 9from airbyte._util.meta import set_mcp_mode 10from airbyte.mcp._util import initialize_secrets 11from airbyte.mcp.cloud_ops import register_cloud_ops_tools 12from airbyte.mcp.connector_registry import register_connector_registry_tools 13from airbyte.mcp.local_ops import register_local_ops_tools 14 15 16set_mcp_mode() 17initialize_secrets() 18 19app: FastMCP = FastMCP("airbyte-mcp") 20"""The Airbyte MCP Server application instance.""" 21 22register_connector_registry_tools(app) 23register_local_ops_tools(app) 24register_cloud_ops_tools(app) 25 26 27def main() -> None: 28 """@private Main entry point for the MCP server. 29 30 This function starts the FastMCP server to handle MCP requests. 31 32 It should not be called directly; instead, consult the MCP client documentation 33 for instructions on how to connect to the server. 34 """ 35 print("Starting Airbyte MCP server.", file=sys.stderr) 36 try: 37 asyncio.run(app.run_stdio_async()) 38 except KeyboardInterrupt: 39 print("Airbyte MCP server interrupted by user.", file=sys.stderr) 40 except Exception as ex: 41 print(f"Error running Airbyte MCP server: {ex}", file=sys.stderr) 42 sys.exit(1) 43 44 print("Airbyte MCP server stopped.", file=sys.stderr) 45 46 47if __name__ == "__main__": 48 main()
app: fastmcp.server.server.FastMCP =
FastMCP('airbyte-mcp')
The Airbyte MCP Server application instance.