airbyte.mcp.server
Experimental MCP (Model Context Protocol) server for PyAirbyte connector management.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Experimental MCP (Model Context Protocol) server for PyAirbyte connector management.""" 3 4import asyncio 5import sys 6 7from fastmcp import FastMCP 8 9from airbyte._util.meta import set_mcp_mode 10from airbyte.mcp._util import initialize_secrets 11from airbyte.mcp.cloud_ops import register_cloud_ops_tools 12from airbyte.mcp.connector_registry import register_connector_registry_tools 13from airbyte.mcp.local_ops import register_local_ops_tools 14from airbyte.mcp.prompts import register_prompts 15 16 17set_mcp_mode() 18initialize_secrets() 19 20app: FastMCP = FastMCP("airbyte-mcp") 21"""The Airbyte MCP Server application instance.""" 22 23register_connector_registry_tools(app) 24register_local_ops_tools(app) 25register_cloud_ops_tools(app) 26register_prompts(app) 27 28 29def main() -> None: 30 """@private Main entry point for the MCP server. 31 32 This function starts the FastMCP server to handle MCP requests. 33 34 It should not be called directly; instead, consult the MCP client documentation 35 for instructions on how to connect to the server. 36 """ 37 print("Starting Airbyte MCP server.", file=sys.stderr) 38 try: 39 asyncio.run(app.run_stdio_async()) 40 except KeyboardInterrupt: 41 print("Airbyte MCP server interrupted by user.", file=sys.stderr) 42 except Exception as ex: 43 print(f"Error running Airbyte MCP server: {ex}", file=sys.stderr) 44 sys.exit(1) 45 46 print("Airbyte MCP server stopped.", file=sys.stderr) 47 48 49if __name__ == "__main__": 50 main()
app: fastmcp.server.server.FastMCP =
FastMCP('airbyte-mcp')
The Airbyte MCP Server application instance.