airbyte.mcp.server

Experimental MCP (Model Context Protocol) server for PyAirbyte connector management.

 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2"""Experimental MCP (Model Context Protocol) server for PyAirbyte connector management."""
 3
 4import asyncio
 5import sys
 6
 7from fastmcp import FastMCP
 8
 9from airbyte._util.meta import set_mcp_mode
10from airbyte.mcp._util import initialize_secrets
11from airbyte.mcp.cloud_ops import register_cloud_ops_tools
12from airbyte.mcp.connector_registry import register_connector_registry_tools
13from airbyte.mcp.local_ops import register_local_ops_tools
14from airbyte.mcp.prompts import register_prompts
15
16
17set_mcp_mode()
18initialize_secrets()
19
20app: FastMCP = FastMCP("airbyte-mcp")
21"""The Airbyte MCP Server application instance."""
22
23register_connector_registry_tools(app)
24register_local_ops_tools(app)
25register_cloud_ops_tools(app)
26register_prompts(app)
27
28
29def main() -> None:
30    """@private Main entry point for the MCP server.
31
32    This function starts the FastMCP server to handle MCP requests.
33
34    It should not be called directly; instead, consult the MCP client documentation
35    for instructions on how to connect to the server.
36    """
37    print("Starting Airbyte MCP server.", file=sys.stderr)
38    try:
39        asyncio.run(app.run_stdio_async())
40    except KeyboardInterrupt:
41        print("Airbyte MCP server interrupted by user.", file=sys.stderr)
42    except Exception as ex:
43        print(f"Error running Airbyte MCP server: {ex}", file=sys.stderr)
44        sys.exit(1)
45
46    print("Airbyte MCP server stopped.", file=sys.stderr)
47
48
49if __name__ == "__main__":
50    main()
app: fastmcp.server.server.FastMCP = FastMCP('airbyte-mcp')

The Airbyte MCP Server application instance.