airbyte.mcp.server
Experimental MCP (Model Context Protocol) server for PyAirbyte connector management.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Experimental MCP (Model Context Protocol) server for PyAirbyte connector management.""" 3 4import asyncio 5import sys 6 7from fastmcp import FastMCP 8 9from airbyte.mcp._cloud_ops import register_cloud_ops_tools 10from airbyte.mcp._connector_registry import register_connector_registry_tools 11from airbyte.mcp._local_ops import register_local_ops_tools 12from airbyte.mcp._util import initialize_secrets 13 14 15initialize_secrets() 16 17app: FastMCP = FastMCP("airbyte-mcp") 18register_connector_registry_tools(app) 19register_local_ops_tools(app) 20register_cloud_ops_tools(app) 21 22 23def main() -> None: 24 """Main entry point for the MCP server.""" 25 print("Starting Airbyte MCP server.", file=sys.stderr) 26 try: 27 asyncio.run(app.run_stdio_async()) 28 except KeyboardInterrupt: 29 print("Airbyte MCP server interrupted by user.", file=sys.stderr) 30 except Exception as ex: 31 print(f"Error running Airbyte MCP server: {ex}", file=sys.stderr) 32 sys.exit(1) 33 34 print("Airbyte MCP server stopped.", file=sys.stderr) 35 36 37if __name__ == "__main__": 38 main()
app: fastmcp.server.server.FastMCP =
FastMCP('airbyte-mcp')
def
main() -> None:
24def main() -> None: 25 """Main entry point for the MCP server.""" 26 print("Starting Airbyte MCP server.", file=sys.stderr) 27 try: 28 asyncio.run(app.run_stdio_async()) 29 except KeyboardInterrupt: 30 print("Airbyte MCP server interrupted by user.", file=sys.stderr) 31 except Exception as ex: 32 print(f"Error running Airbyte MCP server: {ex}", file=sys.stderr) 33 sys.exit(1) 34 35 print("Airbyte MCP server stopped.", file=sys.stderr)
Main entry point for the MCP server.