airbyte.mcp.server

Experimental MCP (Model Context Protocol) server for PyAirbyte connector management.

 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2"""Experimental MCP (Model Context Protocol) server for PyAirbyte connector management."""
 3
 4import asyncio
 5import sys
 6
 7from fastmcp import FastMCP
 8
 9from airbyte._util.meta import set_mcp_mode
10from airbyte.mcp._util import initialize_secrets
11from airbyte.mcp.cloud_ops import register_cloud_ops_tools
12from airbyte.mcp.connector_registry import register_connector_registry_tools
13from airbyte.mcp.local_ops import register_local_ops_tools
14
15
16set_mcp_mode()
17initialize_secrets()
18
19app: FastMCP = FastMCP("airbyte-mcp")
20"""The Airbyte MCP Server application instance."""
21
22register_connector_registry_tools(app)
23register_local_ops_tools(app)
24register_cloud_ops_tools(app)
25
26
27def main() -> None:
28    """@private Main entry point for the MCP server.
29
30    This function starts the FastMCP server to handle MCP requests.
31
32    It should not be called directly; instead, consult the MCP client documentation
33    for instructions on how to connect to the server.
34    """
35    print("Starting Airbyte MCP server.", file=sys.stderr)
36    try:
37        asyncio.run(app.run_stdio_async())
38    except KeyboardInterrupt:
39        print("Airbyte MCP server interrupted by user.", file=sys.stderr)
40    except Exception as ex:
41        print(f"Error running Airbyte MCP server: {ex}", file=sys.stderr)
42        sys.exit(1)
43
44    print("Airbyte MCP server stopped.", file=sys.stderr)
45
46
47if __name__ == "__main__":
48    main()
app: fastmcp.server.server.FastMCP = FastMCP('airbyte-mcp')

The Airbyte MCP Server application instance.