Update mcp_server.py

对外暴露聚合mcp 工具。
This commit is contained in:
guo zebin
2026-01-06 20:16:04 +08:00
parent 27e31431a0
commit 1c1d3fff91

View File

@@ -20,12 +20,14 @@ import logging
import os
import sys
import json
from typing import Any, Dict, Tuple
from typing import Any, Dict, Tuple, List, Optional
try:
from mcp.server import Server
from mcp.types import Tool, TextContent
from mcp.server.sse import SseServerTransport
from faymcp import tool_registry
from faymcp import mcp_service
except ImportError:
print("缺少 mcp 库请先安装pip install mcp", file=sys.stderr)
sys.exit(1)
@@ -68,6 +70,9 @@ MSG_PATH = os.environ.get("FAY_MCP_MSG_PATH", "/messages")
server = Server(SERVER_NAME)
sse_transport = SseServerTransport(MSG_PATH)
# 聚合工具索引namespaced_tool_name -> (server_id, tool_name)
_aggregated_index: Dict[str, Tuple[int, str]] = {}
def _text_content(text: str) -> TextContent:
try:
@@ -95,7 +100,13 @@ TOOLS: list[Tool] = [
@server.list_tools()
async def list_tools() -> list[Tool]:
return TOOLS
# 本地广播工具 + Fay 当前在线 MCP 工具的聚合视图namespaced
aggregated = []
try:
aggregated = _build_aggregated_tools()
except Exception as e:
log.warning(f"Failed to build aggregated tools: {e}")
return TOOLS + aggregated
def _parse_arguments(arguments: Dict[str, Any]) -> Tuple[str, str, str]:
@@ -105,6 +116,37 @@ def _parse_arguments(arguments: Dict[str, Any]) -> Tuple[str, str, str]:
return text, audio_url, user
def _build_aggregated_tools() -> List[Tool]:
"""
将 Fay 已连接的 MCP 工具聚合,对外暴露为 namespaced 名称:
<server_id>:<tool_name>
"""
tools: List[Tool] = []
_aggregated_index.clear()
server_name_map = {s.get("id"): s.get("name", f"Server{s.get('id')}") for s in mcp_service.mcp_servers or []}
for entry in tool_registry.get_enabled_tools():
server_id = entry.get("server_id")
tool_name = entry.get("name")
if server_id is None or not tool_name:
continue
agg_name = f"{server_id}:{tool_name}"
desc = entry.get("description", "")
server_label = server_name_map.get(server_id, f"Server {server_id}")
agg_desc = f"{desc} [via {server_label}]"
input_schema = entry.get("inputSchema") or {}
tool = Tool(
name=agg_name,
description=agg_desc,
inputSchema=input_schema if isinstance(input_schema, dict) else {},
)
tools.append(tool)
_aggregated_index[agg_name] = (server_id, tool_name)
return tools
async def _send_broadcast(payload: Dict[str, Any]) -> Tuple[bool, str]:
def _post() -> Tuple[bool, str]:
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
@@ -143,22 +185,68 @@ async def _send_broadcast(payload: Dict[str, Any]) -> Tuple[bool, str]:
@server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]) -> list[TextContent]:
if name != "broadcast_message":
# 本地广播
if name == "broadcast_message":
text, audio_url, user = _parse_arguments(arguments or {})
if not text and not audio_url:
return [_text_content("Either 'text' or 'audio_url' must be provided.")]
payload: Dict[str, Any] = {"user": user}
if text:
payload["text"] = text
if audio_url:
payload["audio"] = audio_url
ok, message = await _send_broadcast(payload)
prefix = "success" if ok else "error"
return [_text_content(f"{prefix}: {message}")]
# 聚合的远端 MCP 工具
target = _aggregated_index.get(name)
if not target:
return [_text_content(f"Unknown tool: {name}")]
text, audio_url, user = _parse_arguments(arguments or {})
if not text and not audio_url:
return [_text_content("Either 'text' or 'audio_url' must be provided.")]
server_id, tool_name = target
try:
success, result = mcp_service.call_mcp_tool(server_id, tool_name, arguments or {})
if not success:
return [_text_content(f"error: {result}")]
return _normalize_result(result)
except Exception as e:
return [_text_content(f"error: {type(e).__name__}: {e}")]
payload: Dict[str, Any] = {"user": user}
if text:
payload["text"] = text
if audio_url:
payload["audio"] = audio_url
ok, message = await _send_broadcast(payload)
prefix = "success" if ok else "error"
return [_text_content(f"{prefix}: {message}")]
def _normalize_result(result: Any) -> List[TextContent]:
"""
将上游返回的任意对象转换为 MCP 文本内容列表。
"""
# 如果已经是 TextContent 或列表,直接返回
try:
from mcp.types import TextContent
if isinstance(result, TextContent):
return [result]
except Exception:
pass
if isinstance(result, list):
contents: List[TextContent] = []
for item in result:
try:
if hasattr(item, "type") and getattr(item, "type", "") == "text" and hasattr(item, "text"):
contents.append(item)
continue
except Exception:
pass
try:
if isinstance(item, dict) and item.get("type") == "text":
contents.append(TextContent(type="text", text=str(item.get("text", "")))) # type: ignore
continue
except Exception:
pass
contents.append(_text_content(str(item)))
return contents
return [_text_content(str(result))]
async def sse_endpoint(request):