diff --git a/faymcp/mcp_server.py b/faymcp/mcp_server.py index e3a4991..e808314 100644 --- a/faymcp/mcp_server.py +++ b/faymcp/mcp_server.py @@ -20,12 +20,14 @@ import logging import os import sys import json -from typing import Any, Dict, Tuple +from typing import Any, Dict, Tuple, List, Optional try: from mcp.server import Server from mcp.types import Tool, TextContent from mcp.server.sse import SseServerTransport + from faymcp import tool_registry + from faymcp import mcp_service except ImportError: print("缺少 mcp 库,请先安装:pip install mcp", file=sys.stderr) sys.exit(1) @@ -68,6 +70,9 @@ MSG_PATH = os.environ.get("FAY_MCP_MSG_PATH", "/messages") server = Server(SERVER_NAME) sse_transport = SseServerTransport(MSG_PATH) +# 聚合工具索引:namespaced_tool_name -> (server_id, tool_name) +_aggregated_index: Dict[str, Tuple[int, str]] = {} + def _text_content(text: str) -> TextContent: try: @@ -95,7 +100,13 @@ TOOLS: list[Tool] = [ @server.list_tools() async def list_tools() -> list[Tool]: - return TOOLS + # 本地广播工具 + Fay 当前在线 MCP 工具的聚合视图(namespaced) + aggregated = [] + try: + aggregated = _build_aggregated_tools() + except Exception as e: + log.warning(f"Failed to build aggregated tools: {e}") + return TOOLS + aggregated def _parse_arguments(arguments: Dict[str, Any]) -> Tuple[str, str, str]: @@ -105,6 +116,37 @@ def _parse_arguments(arguments: Dict[str, Any]) -> Tuple[str, str, str]: return text, audio_url, user +def _build_aggregated_tools() -> List[Tool]: + """ + 将 Fay 已连接的 MCP 工具聚合,对外暴露为 namespaced 名称: + : + """ + tools: List[Tool] = [] + _aggregated_index.clear() + + server_name_map = {s.get("id"): s.get("name", f"Server{s.get('id')}") for s in mcp_service.mcp_servers or []} + + for entry in tool_registry.get_enabled_tools(): + server_id = entry.get("server_id") + tool_name = entry.get("name") + if server_id is None or not tool_name: + continue + agg_name = f"{server_id}:{tool_name}" + desc = entry.get("description", "") + server_label = server_name_map.get(server_id, f"Server {server_id}") + agg_desc = f"{desc} [via {server_label}]" + input_schema = entry.get("inputSchema") or {} + tool = Tool( + name=agg_name, + description=agg_desc, + inputSchema=input_schema if isinstance(input_schema, dict) else {}, + ) + tools.append(tool) + _aggregated_index[agg_name] = (server_id, tool_name) + + return tools + + async def _send_broadcast(payload: Dict[str, Any]) -> Tuple[bool, str]: def _post() -> Tuple[bool, str]: body = json.dumps(payload, ensure_ascii=False).encode("utf-8") @@ -143,22 +185,68 @@ async def _send_broadcast(payload: Dict[str, Any]) -> Tuple[bool, str]: @server.call_tool() async def call_tool(name: str, arguments: Dict[str, Any]) -> list[TextContent]: - if name != "broadcast_message": + # 本地广播 + if name == "broadcast_message": + text, audio_url, user = _parse_arguments(arguments or {}) + if not text and not audio_url: + return [_text_content("Either 'text' or 'audio_url' must be provided.")] + + payload: Dict[str, Any] = {"user": user} + if text: + payload["text"] = text + if audio_url: + payload["audio"] = audio_url + + ok, message = await _send_broadcast(payload) + prefix = "success" if ok else "error" + return [_text_content(f"{prefix}: {message}")] + + # 聚合的远端 MCP 工具 + target = _aggregated_index.get(name) + if not target: return [_text_content(f"Unknown tool: {name}")] - text, audio_url, user = _parse_arguments(arguments or {}) - if not text and not audio_url: - return [_text_content("Either 'text' or 'audio_url' must be provided.")] + server_id, tool_name = target + try: + success, result = mcp_service.call_mcp_tool(server_id, tool_name, arguments or {}) + if not success: + return [_text_content(f"error: {result}")] + return _normalize_result(result) + except Exception as e: + return [_text_content(f"error: {type(e).__name__}: {e}")] - payload: Dict[str, Any] = {"user": user} - if text: - payload["text"] = text - if audio_url: - payload["audio"] = audio_url - ok, message = await _send_broadcast(payload) - prefix = "success" if ok else "error" - return [_text_content(f"{prefix}: {message}")] +def _normalize_result(result: Any) -> List[TextContent]: + """ + 将上游返回的任意对象转换为 MCP 文本内容列表。 + """ + # 如果已经是 TextContent 或列表,直接返回 + try: + from mcp.types import TextContent + if isinstance(result, TextContent): + return [result] + except Exception: + pass + + if isinstance(result, list): + contents: List[TextContent] = [] + for item in result: + try: + if hasattr(item, "type") and getattr(item, "type", "") == "text" and hasattr(item, "text"): + contents.append(item) + continue + except Exception: + pass + try: + if isinstance(item, dict) and item.get("type") == "text": + contents.append(TextContent(type="text", text=str(item.get("text", "")))) # type: ignore + continue + except Exception: + pass + contents.append(_text_content(str(item))) + return contents + + return [_text_content(str(result))] async def sse_endpoint(request):