airbyte.mcp.server

Experimental MCP (Model Context Protocol) server for PyAirbyte connector management.

 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2"""Experimental MCP (Model Context Protocol) server for PyAirbyte connector management."""
 3
 4import asyncio
 5import sys
 6
 7from fastmcp import FastMCP
 8
 9from airbyte.mcp._cloud_ops import register_cloud_ops_tools
10from airbyte.mcp._connector_registry import register_connector_registry_tools
11from airbyte.mcp._local_ops import register_local_ops_tools
12from airbyte.mcp._util import initialize_secrets
13
14
15initialize_secrets()
16
17app: FastMCP = FastMCP("airbyte-mcp")
18register_connector_registry_tools(app)
19register_local_ops_tools(app)
20register_cloud_ops_tools(app)
21
22
23def main() -> None:
24    """Main entry point for the MCP server."""
25    print("Starting Airbyte MCP server.", file=sys.stderr)
26    try:
27        asyncio.run(app.run_stdio_async())
28    except KeyboardInterrupt:
29        print("Airbyte MCP server interrupted by user.", file=sys.stderr)
30    except Exception as ex:
31        print(f"Error running Airbyte MCP server: {ex}", file=sys.stderr)
32        sys.exit(1)
33
34    print("Airbyte MCP server stopped.", file=sys.stderr)
35
36
37if __name__ == "__main__":
38    main()
app: fastmcp.server.server.FastMCP = FastMCP('airbyte-mcp')
def main() -> None:
24def main() -> None:
25    """Main entry point for the MCP server."""
26    print("Starting Airbyte MCP server.", file=sys.stderr)
27    try:
28        asyncio.run(app.run_stdio_async())
29    except KeyboardInterrupt:
30        print("Airbyte MCP server interrupted by user.", file=sys.stderr)
31    except Exception as ex:
32        print(f"Error running Airbyte MCP server: {ex}", file=sys.stderr)
33        sys.exit(1)
34
35    print("Airbyte MCP server stopped.", file=sys.stderr)

Main entry point for the MCP server.