自然进化

1、去掉原知识库逻辑;
2、上传知识库mcp服务器;
3、上传window 窗口截图mcp服务器;
4、聊天窗口支持markdown排版;
5、聊天窗口支持缩略图展示。
This commit is contained in:
guo zebin
2025-12-10 21:21:43 +08:00
parent 177b016882
commit a24b8c9a42
16 changed files with 3441 additions and 1997 deletions

View File

@@ -1,393 +1,385 @@
#核心启动模块
import time
import re
import pyaudio
import socket
import requests
from core.interact import Interact
from core.recorder import Recorder
from scheduler.thread_manager import MyThread
from utils import util, config_util, stream_util
from core.wsa_server import MyServer
from core import wsa_server
from core import socket_bridge_service
# from llm.nlp_cognitive_stream import save_agent_memory
# 全局变量声明
feiFei = None
recorderListener = None
__running = False
deviceSocketServer = None
DeviceInputListenerDict = {}
ngrok = None
socket_service_instance = None
# 延迟导入fay_core
def get_fay_core():
from core import fay_core
return fay_core
#启动状态
def is_running():
return __running
#录制麦克风音频输入并传给aliyun
class RecorderListener(Recorder):
def __init__(self, device, fei):
self.__device = device
self.__FORMAT = pyaudio.paInt16
self.__running = False
self.username = 'User'
# 这两个参数会在 get_stream 中根据实际设备更新
self.channels = None
self.sample_rate = None
super().__init__(fei)
def on_speaking(self, text):
if len(text) > 1:
interact = Interact("mic", 1, {'user': 'User', 'msg': text})
util.printInfo(3, "语音", '{}'.format(interact.data["msg"]), time.time())
feiFei.on_interact(interact)
def get_stream(self):
try:
while True:
config_util.load_config()
record = config_util.config['source']['record']
if record['enabled']:
break
time.sleep(0.1)
self.paudio = pyaudio.PyAudio()
# 获取默认输入设备的信息
default_device = self.paudio.get_default_input_device_info()
self.channels = min(int(default_device.get('maxInputChannels', 1)), 2) # 最多使用2个通道
# self.sample_rate = int(default_device.get('defaultSampleRate', 16000))
util.printInfo(1, "系统", f"默认麦克风信息 - 采样率: {self.sample_rate}Hz, 通道数: {self.channels}")
# 使用系统默认麦克风
self.stream = self.paudio.open(
format=self.__FORMAT,
channels=self.channels,
rate=self.sample_rate,
input=True,
frames_per_buffer=1024
)
self.__running = True
MyThread(target=self.__pyaudio_clear).start()
except Exception as e:
util.log(1, f"打开麦克风时出错: {str(e)}")
util.printInfo(1, self.username, "请检查录音设备是否有误,再重新启动!")
time.sleep(10)
return self.stream
def __pyaudio_clear(self):
try:
while self.__running:
time.sleep(30)
except Exception as e:
util.log(1, f"音频清理线程出错: {str(e)}")
finally:
if hasattr(self, 'stream') and self.stream:
try:
self.stream.stop_stream()
self.stream.close()
except Exception as e:
util.log(1, f"关闭音频流时出错: {str(e)}")
def stop(self):
super().stop()
self.__running = False
time.sleep(0.1)#给清理线程一点处理时间
try:
while self.is_reading:#是为了确保停止的时候麦克风没有刚好在读取音频的
time.sleep(0.1)
if self.stream is not None:
self.stream.stop_stream()
self.stream.close()
self.paudio.terminate()
except Exception as e:
print(e)
util.log(1, "请检查设备是否有误,再重新启动!")
def is_remote(self):
return False
#Edit by xszyou on 20230113:录制远程设备音频输入并传给aliyun
class DeviceInputListener(Recorder):
def __init__(self, deviceConnector, fei):
super().__init__(fei)
self.__running = True
self.streamCache = None
self.thread = MyThread(target=self.run)
self.thread.start() #启动远程音频输入设备监听线程
self.username = 'User'
self.isOutput = True
self.deviceConnector = deviceConnector
def run(self):
#启动ngork
self.streamCache = stream_util.StreamCache(1024*1024*20)
addr = None
while self.__running:
try:
data = b""
while self.deviceConnector:
data = self.deviceConnector.recv(2048)
if b"<username>" in data:
data_str = data.decode("utf-8")
match = re.search(r"<username>(.*?)</username>", data_str)
if match:
self.username = match.group(1)
else:
self.streamCache.write(data)
if b"<output>" in data:
data_str = data.decode("utf-8")
match = re.search(r"<output>(.*?)<output>", data_str)
if match:
self.isOutput = (match.group(1) == "True")
else:
self.streamCache.write(data)
if not b"<username>" in data and not b"<output>" in data:
self.streamCache.write(data)
time.sleep(0.005)
self.streamCache.clear()
except Exception as err:
pass
time.sleep(1)
def on_speaking(self, text):
global feiFei
if len(text) > 1:
interact = Interact("socket", 1, {"user": self.username, "msg": text, "socket": self.deviceConnector})
util.printInfo(3, "(" + self.username + ")远程音频输入", '{}'.format(interact.data["msg"]), time.time())
feiFei.on_interact(interact)
#recorder会等待stream不为空才开始录音
def get_stream(self):
while not self.deviceConnector:
time.sleep(1)
pass
return self.streamCache
def stop(self):
super().stop()
self.__running = False
def is_remote(self):
return True
#检查远程音频连接状态
def device_socket_keep_alive():
global DeviceInputListenerDict
while __running:
delkey = None
for key, value in DeviceInputListenerDict.items():
try:
value.deviceConnector.send(b'\xf0\xf1\xf2\xf3\xf4\xf5\xf6\xf7\xf8')#发送心跳包
if wsa_server.get_web_instance().is_connected(value.username):
wsa_server.get_web_instance().add_cmd({"remote_audio_connect": True, "Username" : value.username})
except Exception as serr:
util.printInfo(1, value.username, "远程音频输入输出设备已经断开:{}".format(key))
value.stop()
delkey = key
break
if delkey:
value = DeviceInputListenerDict.pop(delkey)
if wsa_server.get_web_instance().is_connected(value.username):
wsa_server.get_web_instance().add_cmd({"remote_audio_connect": False, "Username" : value.username})
time.sleep(10)
#远程音频连接
def accept_audio_device_output_connect():
global deviceSocketServer
global __running
global DeviceInputListenerDict
deviceSocketServer = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
deviceSocketServer.bind(("0.0.0.0",10001))
deviceSocketServer.listen(1)
MyThread(target = device_socket_keep_alive).start() # 开启心跳包检测
addr = None
while __running:
try:
deviceConnector,addr = deviceSocketServer.accept() #接受TCP连接并返回新的套接字与IP地址
deviceInputListener = DeviceInputListener(deviceConnector, feiFei) # 设备音频输入输出麦克风
deviceInputListener.start()
#把DeviceInputListenner对象记录下来
peername = str(deviceConnector.getpeername()[0]) + ":" + str(deviceConnector.getpeername()[1])
DeviceInputListenerDict[peername] = deviceInputListener
util.log(1,"远程音频{}输入输出设备连接上:{}".format(len(DeviceInputListenerDict), addr))
except Exception as e:
pass
#数字人端请求获取最新的自动播报消息,若自动播报服务关闭会自动退出自动播报
def start_auto_play_service(): #TODO 评估一下有无优化的空间
if config_util.config['source'].get('automatic_player_url') is None or config_util.config['source'].get('automatic_player_status') is None:
return
url = f"{config_util.config['source']['automatic_player_url']}/get_auto_play_item"
user = "User" #TODO 临时固死了
is_auto_server_error = False
while __running:
if config_util.config['source']['wake_word_enabled'] and config_util.config['source']['wake_word_type'] == 'common' and recorderListener.wakeup_matched == True:
time.sleep(0.01)
continue
if is_auto_server_error:
util.printInfo(1, user, '60s后重连自动播报服务器')
time.sleep(60)
# 请求自动播报服务器
with get_fay_core().auto_play_lock:
if config_util.config['source']['automatic_player_status'] and config_util.config['source']['automatic_player_url'] is not None and get_fay_core().can_auto_play == True and (config_util.config["interact"]["playSound"] or wsa_server.get_instance().is_connected(user)):
get_fay_core().can_auto_play = False
post_data = {"user": user}
try:
response = requests.post(url, json=post_data, timeout=5)
if response.status_code == 200:
is_auto_server_error = False
data = response.json()
audio_url = data.get('audio')
if not audio_url or audio_url.strip()[0:4] != "http":
audio_url = None
response_text = data.get('text')
if audio_url is None and (response_text is None or '' == response_text.strip()):
continue
timestamp = data.get('timestamp')
interact = Interact("auto_play", 2, {'user': user, 'text': response_text, 'audio': audio_url})
util.printInfo(1, user, '自动播报:{}{}'.format(response_text, audio_url), time.time())
feiFei.on_interact(interact)
else:
is_auto_server_error = True
get_fay_core().can_auto_play = True
util.printInfo(1, user, '请求自动播报服务器失败,错误代码是:{}'.format(response.status_code))
except requests.exceptions.RequestException as e:
is_auto_server_error = True
get_fay_core().can_auto_play = True
util.printInfo(1, user, '请求自动播报服务器失败,错误信息是:{}'.format(e))
time.sleep(0.01)
#停止服务
def stop():
global feiFei
global recorderListener
global __running
global DeviceInputListenerDict
global ngrok
global socket_service_instance
global deviceSocketServer
util.log(1, '正在关闭服务...')
__running = False
# 断开所有MCP服务连接
util.log(1, '正在断开所有MCP服务连接...')
try:
from faymcp import mcp_service
mcp_service.disconnect_all_mcp_servers()
util.log(1, '所有MCP服务连接已断开')
except Exception as e:
util.log(1, f'断开MCP服务连接失败: {str(e)}')
# 保存代理记忆(仅在未使用仿生记忆时)
if not config_util.config["memory"].get("use_bionic_memory", False):
util.log(1, '正在保存代理记忆...')
try:
from llm.nlp_cognitive_stream import save_agent_memory
save_agent_memory()
util.log(1, '代理记忆保存成功')
except Exception as e:
util.log(1, f'保存代理记忆失败: {str(e)}')
if recorderListener is not None:
util.log(1, '正在关闭录音服务...')
recorderListener.stop()
time.sleep(0.1)
util.log(1, '正在关闭远程音频输入输出服务...')
try:
if len(DeviceInputListenerDict) > 0:
for key in list(DeviceInputListenerDict.keys()):
value = DeviceInputListenerDict.pop(key)
value.stop()
deviceSocketServer.close()
if socket_service_instance is not None:
socket_service_instance.stop_server()
socket_service_instance = None
except:
pass
util.log(1, '正在关闭核心服务...')
feiFei.stop()
util.log(1, '服务已关闭!')
#开启服务
def start():
global feiFei
global recorderListener
global __running
global socket_service_instance
util.log(1, '开启服务...')
__running = True
#读取配置
util.log(1, '读取配置...')
config_util.load_config()
#开启核心服务
util.log(1, '开启核心服务...')
feiFei = get_fay_core().FeiFei()
feiFei.start()
#根据配置决定是否初始化认知记忆系统
if not config_util.config["memory"].get("use_bionic_memory", False):
util.log(1, '初始化定时保存记忆及反思的任务...')
from llm.nlp_cognitive_stream import init_memory_scheduler
init_memory_scheduler()
#初始化知识库(两个模块共用)
util.log(1, '初始化本地知识库...')
if config_util.config["memory"].get("use_bionic_memory", False):
from llm.nlp_bionicmemory_stream import init_knowledge_base
else:
from llm.nlp_cognitive_stream import init_knowledge_base
init_knowledge_base()
#开启录音服务
record = config_util.config['source']['record']
if record['enabled']:
util.log(1, '开启录音服务...')
recorderListener = RecorderListener('device', feiFei) # 监听麦克风
recorderListener.start()
#启动声音沟通接口服务
util.log(1,'启动声音沟通接口服务...')
deviceSocketThread = MyThread(target=accept_audio_device_output_connect)
deviceSocketThread.start()
socket_service_instance = socket_bridge_service.new_instance()
socket_bridge_service_Thread = MyThread(target=socket_service_instance.start_service)
socket_bridge_service_Thread.start()
#启动自动播报服务
util.log(1,'启动自动播报服务...')
MyThread(target=start_auto_play_service).start()
util.log(1, '服务启动完成!')
if __name__ == '__main__':
ws_server: MyServer = None
feiFei: get_fay_core().FeiFei = None
recorderListener: Recorder = None
start()
#核心启动模块
import time
import re
import pyaudio
import socket
import requests
from core.interact import Interact
from core.recorder import Recorder
from scheduler.thread_manager import MyThread
from utils import util, config_util, stream_util
from core.wsa_server import MyServer
from core import wsa_server
from core import socket_bridge_service
# from llm.nlp_cognitive_stream import save_agent_memory
# 全局变量声明
feiFei = None
recorderListener = None
__running = False
deviceSocketServer = None
DeviceInputListenerDict = {}
ngrok = None
socket_service_instance = None
# 延迟导入fay_core
def get_fay_core():
from core import fay_core
return fay_core
#启动状态
def is_running():
return __running
#录制麦克风音频输入并传给aliyun
class RecorderListener(Recorder):
def __init__(self, device, fei):
self.__device = device
self.__FORMAT = pyaudio.paInt16
self.__running = False
self.username = 'User'
# 这两个参数会在 get_stream 中根据实际设备更新
self.channels = None
self.sample_rate = None
super().__init__(fei)
def on_speaking(self, text):
if len(text) > 1:
interact = Interact("mic", 1, {'user': 'User', 'msg': text})
util.printInfo(3, "语音", '{}'.format(interact.data["msg"]), time.time())
feiFei.on_interact(interact)
def get_stream(self):
try:
while True:
config_util.load_config()
record = config_util.config['source']['record']
if record['enabled']:
break
time.sleep(0.1)
self.paudio = pyaudio.PyAudio()
# 获取默认输入设备的信息
default_device = self.paudio.get_default_input_device_info()
self.channels = min(int(default_device.get('maxInputChannels', 1)), 2) # 最多使用2个通道
# self.sample_rate = int(default_device.get('defaultSampleRate', 16000))
util.printInfo(1, "系统", f"默认麦克风信息 - 采样率: {self.sample_rate}Hz, 通道数: {self.channels}")
# 使用系统默认麦克风
self.stream = self.paudio.open(
format=self.__FORMAT,
channels=self.channels,
rate=self.sample_rate,
input=True,
frames_per_buffer=1024
)
self.__running = True
MyThread(target=self.__pyaudio_clear).start()
except Exception as e:
util.log(1, f"打开麦克风时出错: {str(e)}")
util.printInfo(1, self.username, "请检查录音设备是否有误,再重新启动!")
time.sleep(10)
return self.stream
def __pyaudio_clear(self):
try:
while self.__running:
time.sleep(30)
except Exception as e:
util.log(1, f"音频清理线程出错: {str(e)}")
finally:
if hasattr(self, 'stream') and self.stream:
try:
self.stream.stop_stream()
self.stream.close()
except Exception as e:
util.log(1, f"关闭音频流时出错: {str(e)}")
def stop(self):
super().stop()
self.__running = False
time.sleep(0.1)#给清理线程一点处理时间
try:
while self.is_reading:#是为了确保停止的时候麦克风没有刚好在读取音频的
time.sleep(0.1)
if self.stream is not None:
self.stream.stop_stream()
self.stream.close()
self.paudio.terminate()
except Exception as e:
print(e)
util.log(1, "请检查设备是否有误,再重新启动!")
def is_remote(self):
return False
#Edit by xszyou on 20230113:录制远程设备音频输入并传给aliyun
class DeviceInputListener(Recorder):
def __init__(self, deviceConnector, fei):
super().__init__(fei)
self.__running = True
self.streamCache = None
self.thread = MyThread(target=self.run)
self.thread.start() #启动远程音频输入设备监听线程
self.username = 'User'
self.isOutput = True
self.deviceConnector = deviceConnector
def run(self):
#启动ngork
self.streamCache = stream_util.StreamCache(1024*1024*20)
addr = None
while self.__running:
try:
data = b""
while self.deviceConnector:
data = self.deviceConnector.recv(2048)
if b"<username>" in data:
data_str = data.decode("utf-8")
match = re.search(r"<username>(.*?)</username>", data_str)
if match:
self.username = match.group(1)
else:
self.streamCache.write(data)
if b"<output>" in data:
data_str = data.decode("utf-8")
match = re.search(r"<output>(.*?)<output>", data_str)
if match:
self.isOutput = (match.group(1) == "True")
else:
self.streamCache.write(data)
if not b"<username>" in data and not b"<output>" in data:
self.streamCache.write(data)
time.sleep(0.005)
self.streamCache.clear()
except Exception as err:
pass
time.sleep(1)
def on_speaking(self, text):
global feiFei
if len(text) > 1:
interact = Interact("socket", 1, {"user": self.username, "msg": text, "socket": self.deviceConnector})
util.printInfo(3, "(" + self.username + ")远程音频输入", '{}'.format(interact.data["msg"]), time.time())
feiFei.on_interact(interact)
#recorder会等待stream不为空才开始录音
def get_stream(self):
while not self.deviceConnector:
time.sleep(1)
pass
return self.streamCache
def stop(self):
super().stop()
self.__running = False
def is_remote(self):
return True
#检查远程音频连接状态
def device_socket_keep_alive():
global DeviceInputListenerDict
while __running:
delkey = None
for key, value in DeviceInputListenerDict.items():
try:
value.deviceConnector.send(b'\xf0\xf1\xf2\xf3\xf4\xf5\xf6\xf7\xf8')#发送心跳包
if wsa_server.get_web_instance().is_connected(value.username):
wsa_server.get_web_instance().add_cmd({"remote_audio_connect": True, "Username" : value.username})
except Exception as serr:
util.printInfo(1, value.username, "远程音频输入输出设备已经断开:{}".format(key))
value.stop()
delkey = key
break
if delkey:
value = DeviceInputListenerDict.pop(delkey)
if wsa_server.get_web_instance().is_connected(value.username):
wsa_server.get_web_instance().add_cmd({"remote_audio_connect": False, "Username" : value.username})
time.sleep(10)
#远程音频连接
def accept_audio_device_output_connect():
global deviceSocketServer
global __running
global DeviceInputListenerDict
deviceSocketServer = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
deviceSocketServer.bind(("0.0.0.0",10001))
deviceSocketServer.listen(1)
MyThread(target = device_socket_keep_alive).start() # 开启心跳包检测
addr = None
while __running:
try:
deviceConnector,addr = deviceSocketServer.accept() #接受TCP连接并返回新的套接字与IP地址
deviceInputListener = DeviceInputListener(deviceConnector, feiFei) # 设备音频输入输出麦克风
deviceInputListener.start()
#把DeviceInputListenner对象记录下来
peername = str(deviceConnector.getpeername()[0]) + ":" + str(deviceConnector.getpeername()[1])
DeviceInputListenerDict[peername] = deviceInputListener
util.log(1,"远程音频{}输入输出设备连接上:{}".format(len(DeviceInputListenerDict), addr))
except Exception as e:
pass
#数字人端请求获取最新的自动播报消息,若自动播报服务关闭会自动退出自动播报
def start_auto_play_service(): #TODO 评估一下有无优化的空间
if config_util.config['source'].get('automatic_player_url') is None or config_util.config['source'].get('automatic_player_status') is None:
return
url = f"{config_util.config['source']['automatic_player_url']}/get_auto_play_item"
user = "User" #TODO 临时固死了
is_auto_server_error = False
while __running:
if config_util.config['source']['wake_word_enabled'] and config_util.config['source']['wake_word_type'] == 'common' and recorderListener.wakeup_matched == True:
time.sleep(0.01)
continue
if is_auto_server_error:
util.printInfo(1, user, '60s后重连自动播报服务器')
time.sleep(60)
# 请求自动播报服务器
with get_fay_core().auto_play_lock:
if config_util.config['source']['automatic_player_status'] and config_util.config['source']['automatic_player_url'] is not None and get_fay_core().can_auto_play == True and (config_util.config["interact"]["playSound"] or wsa_server.get_instance().is_connected(user)):
get_fay_core().can_auto_play = False
post_data = {"user": user}
try:
response = requests.post(url, json=post_data, timeout=5)
if response.status_code == 200:
is_auto_server_error = False
data = response.json()
audio_url = data.get('audio')
if not audio_url or audio_url.strip()[0:4] != "http":
audio_url = None
response_text = data.get('text')
if audio_url is None and (response_text is None or '' == response_text.strip()):
continue
timestamp = data.get('timestamp')
interact = Interact("auto_play", 2, {'user': user, 'text': response_text, 'audio': audio_url})
util.printInfo(1, user, '自动播报:{}{}'.format(response_text, audio_url), time.time())
feiFei.on_interact(interact)
else:
is_auto_server_error = True
get_fay_core().can_auto_play = True
util.printInfo(1, user, '请求自动播报服务器失败,错误代码是:{}'.format(response.status_code))
except requests.exceptions.RequestException as e:
is_auto_server_error = True
get_fay_core().can_auto_play = True
util.printInfo(1, user, '请求自动播报服务器失败,错误信息是:{}'.format(e))
time.sleep(0.01)
#停止服务
def stop():
global feiFei
global recorderListener
global __running
global DeviceInputListenerDict
global ngrok
global socket_service_instance
global deviceSocketServer
util.log(1, '正在关闭服务...')
__running = False
# 断开所有MCP服务连接
util.log(1, '正在断开所有MCP服务连接...')
try:
from faymcp import mcp_service
mcp_service.disconnect_all_mcp_servers()
util.log(1, '所有MCP服务连接已断开')
except Exception as e:
util.log(1, f'断开MCP服务连接失败: {str(e)}')
# 保存代理记忆(仅在未使用仿生记忆时)
if not config_util.config["memory"].get("use_bionic_memory", False):
util.log(1, '正在保存代理记忆...')
try:
from llm.nlp_cognitive_stream import save_agent_memory
save_agent_memory()
util.log(1, '代理记忆保存成功')
except Exception as e:
util.log(1, f'保存代理记忆失败: {str(e)}')
if recorderListener is not None:
util.log(1, '正在关闭录音服务...')
recorderListener.stop()
time.sleep(0.1)
util.log(1, '正在关闭远程音频输入输出服务...')
try:
if len(DeviceInputListenerDict) > 0:
for key in list(DeviceInputListenerDict.keys()):
value = DeviceInputListenerDict.pop(key)
value.stop()
deviceSocketServer.close()
if socket_service_instance is not None:
socket_service_instance.stop_server()
socket_service_instance = None
except:
pass
util.log(1, '正在关闭核心服务...')
feiFei.stop()
util.log(1, '服务已关闭!')
#开启服务
def start():
global feiFei
global recorderListener
global __running
global socket_service_instance
util.log(1, '开启服务...')
__running = True
#读取配置
util.log(1, '读取配置...')
config_util.load_config()
#开启核心服务
util.log(1, '开启核心服务...')
feiFei = get_fay_core().FeiFei()
feiFei.start()
#根据配置决定是否初始化认知记忆系统
if not config_util.config["memory"].get("use_bionic_memory", False):
util.log(1, '初始化定时保存记忆及反思的任务...')
from llm.nlp_cognitive_stream import init_memory_scheduler
init_memory_scheduler()
#开启录音服务
record = config_util.config['source']['record']
if record['enabled']:
util.log(1, '开启录音服务...')
recorderListener = RecorderListener('device', feiFei) # 监听麦克风
recorderListener.start()
#启动声音沟通接口服务
util.log(1,'启动声音沟通接口服务...')
deviceSocketThread = MyThread(target=accept_audio_device_output_connect)
deviceSocketThread.start()
socket_service_instance = socket_bridge_service.new_instance()
socket_bridge_service_Thread = MyThread(target=socket_service_instance.start_service)
socket_bridge_service_Thread.start()
#启动自动播报服务
util.log(1,'启动自动播报服务...')
MyThread(target=start_auto_play_service).start()
util.log(1, '服务启动完成!')
if __name__ == '__main__':
ws_server: MyServer = None
feiFei: get_fay_core().FeiFei = None
recorderListener: Recorder = None
start()

View File

@@ -1,46 +1,81 @@
[
{
"id": 1,
"name": "tools",
"ip": "",
"connection_time": "2025-11-11 11:44:56",
"key": "",
"transport": "stdio",
"command": "python",
"args": [
"test/mcp_stdio_example.py"
],
"cwd": "",
"env": {}
},
{
"id": 2,
"name": "Fay日程管理",
"ip": "",
"connection_time": "2025-11-11 11:44:59",
"key": "",
"transport": "stdio",
"command": "python",
"args": [
"server.py"
],
"cwd": "mcp_servers/schedule_manager",
"env": {}
},
{
"id": 3,
"name": "logseq",
"ip": "",
"connection_time": "2025-10-21 11:07:20",
"key": "",
"transport": "stdio",
"command": "python",
"args": [
"server.py"
],
"cwd": "mcp_servers/logseq",
"env": {
"LOGSEQ_GRAPH_DIR": "D:/iCloudDrive/iCloud~com~logseq~logseq/第二大脑"
}
}
[
{
"id": 1,
"name": "tools",
"ip": "",
"connection_time": "2025-12-10 21:16:35",
"key": "",
"transport": "stdio",
"command": "python",
"args": [
"test/mcp_stdio_example.py"
],
"cwd": "",
"env": {}
},
{
"id": 2,
"name": "Fay日程管理",
"ip": "",
"connection_time": "2025-12-10 21:16:38",
"key": "",
"transport": "stdio",
"command": "python",
"args": [
"server.py"
],
"cwd": "mcp_servers/schedule_manager",
"env": {}
},
{
"id": 3,
"name": "logseq",
"ip": "",
"connection_time": "2025-12-10 21:16:39",
"key": "",
"transport": "stdio",
"command": "python",
"args": [
"server.py"
],
"cwd": "mcp_servers/logseq",
"env": {
"LOGSEQ_GRAPH_DIR": "D:/iCloudDrive/iCloud~com~logseq~logseq/第二大脑"
}
},
{
"id": 4,
"name": "yueshen rag",
"ip": "",
"connection_time": "2025-12-10 21:16:44",
"key": "",
"transport": "stdio",
"command": "C:\\Users\\Lenovo\\anaconda3\\envs\\rag\\python.exe",
"args": [
"mcp_servers/yueshen_rag/server.py"
],
"cwd": "",
"env": {
"YUESHEN_AUTO_INGEST": "1",
"YUESHEN_AUTO_INTERVAL": "300",
"YUESHEN_AUTO_RESET_ON_START": "0",
"YUESHEN_EMBED_API_KEY": "sk-izmvqrzyhjghzyghiofqfpusxprmfljntxzggkcovtneqpas",
"YUESHEN_EMBED_BASE_URL": "https://api.siliconflow.cn/v1",
"YUESHEN_EMBED_MODEL": "Qwen/Qwen3-Embedding-8B"
}
},
{
"id": 5,
"name": "window capture",
"ip": "",
"connection_time": "2025-12-10 21:16:45",
"key": "",
"transport": "stdio",
"command": "python",
"args": [
"mcp_servers/window_capture/server.py"
],
"cwd": "",
"env": {}
}
]

View File

@@ -1,21 +1,44 @@
{
"5": {
"now": true,
"add": false,
"upper": false,
"echo": false,
"ping": false
},
"1": {
"add": true,
"upper": false,
"echo": false,
"ping": false,
"now": true
},
"2": {
"('meta', None)": true,
"('nextCursor', None)": true
},
"3": {}
}
{
"5": {
"now": true,
"add": false,
"upper": false,
"echo": false,
"ping": false,
"ingest_yueshen": false,
"yueshen_stats": false
},
"1": {
"add": true,
"upper": false,
"echo": false,
"ping": false,
"now": true,
"display_media": true
},
"2": {
"('meta', None)": true,
"('nextCursor', None)": true,
"browser_snapshot": false
},
"3": {
"get_file_info": false,
"list_allowed_directories": true,
"edit_file": false,
"list_directory": false,
"directory_tree": false,
"create_directory": false,
"read_text_file": false,
"read_multiple_files": false,
"read_media_file": false,
"read_file": false,
"move_file": false,
"list_directory_with_sizes": false,
"search_files": false,
"write_file": false
},
"4": {},
"6": {
"list_windows": true
}
}

View File

@@ -883,6 +883,63 @@ def api_start_genagents():
util.log(1, f"启动决策分析页面时出错: {str(e)}")
return jsonify({'success': False, 'message': f'启动决策分析页面时出错: {str(e)}'}), 500
# 获取本地图片(用于在网页中显示本地图片)
@__app.route('/api/local-image')
def api_local_image():
try:
file_path = request.args.get('path', '')
if not file_path:
return jsonify({'error': '缺少文件路径参数'}), 400
# 检查文件是否存在
if not os.path.exists(file_path):
return jsonify({'error': f'文件不存在: {file_path}'}), 404
# 检查是否为图片文件
valid_extensions = ('.png', '.jpg', '.jpeg', '.gif', '.bmp', '.webp')
if not file_path.lower().endswith(valid_extensions):
return jsonify({'error': '不是有效的图片文件'}), 400
# 返回图片文件
return send_file(file_path)
except Exception as e:
return jsonify({'error': f'获取图片时出错: {str(e)}'}), 500
# 打开图片文件(使用系统默认程序)
@__app.route('/api/open-image', methods=['POST'])
def api_open_image():
try:
data = request.get_json()
if not data or 'path' not in data:
return jsonify({'success': False, 'message': '缺少文件路径参数'}), 400
file_path = data['path']
# 检查文件是否存在
if not os.path.exists(file_path):
return jsonify({'success': False, 'message': f'文件不存在: {file_path}'}), 404
# 检查是否为图片文件
valid_extensions = ('.png', '.jpg', '.jpeg', '.gif', '.bmp', '.webp')
if not file_path.lower().endswith(valid_extensions):
return jsonify({'success': False, 'message': '不是有效的图片文件'}), 400
# 使用系统默认程序打开图片
import subprocess
import platform
system = platform.system()
if system == 'Windows':
os.startfile(file_path)
elif system == 'Darwin': # macOS
subprocess.run(['open', file_path])
else: # Linux
subprocess.run(['xdg-open', file_path])
return jsonify({'success': True, 'message': '已打开图片'}), 200
except Exception as e:
return jsonify({'success': False, 'message': f'打开图片时出错: {str(e)}'}), 500
def run():
class NullLogHandler:
def write(self, *args, **kwargs):

View File

@@ -591,6 +591,181 @@ html {
}
.message-text {
white-space: pre-wrap;
word-wrap: break-word;
}
/* Markdown 样式 */
.markdown-body {
line-height: 1.6;
font-size: 14px;
}
.markdown-body p {
margin: 0 0 8px 0;
}
.markdown-body p:last-child {
margin-bottom: 0;
}
.markdown-body h1, .markdown-body h2, .markdown-body h3,
.markdown-body h4, .markdown-body h5, .markdown-body h6 {
margin: 12px 0 8px 0;
font-weight: 600;
line-height: 1.4;
}
.markdown-body h1 { font-size: 1.5em; }
.markdown-body h2 { font-size: 1.3em; }
.markdown-body h3 { font-size: 1.15em; }
.markdown-body h4 { font-size: 1em; }
.markdown-body ul, .markdown-body ol {
margin: 8px 0;
padding-left: 20px;
}
.markdown-body li {
margin: 4px 0;
}
.markdown-body code {
background-color: rgba(175, 184, 193, 0.2);
padding: 2px 6px;
border-radius: 4px;
font-family: 'Consolas', 'Monaco', monospace;
font-size: 0.9em;
}
.markdown-body pre {
background-color: #f6f8fa;
padding: 12px;
border-radius: 6px;
overflow-x: auto;
margin: 8px 0;
}
.markdown-body pre code {
background-color: transparent;
padding: 0;
font-size: 0.85em;
line-height: 1.5;
}
.markdown-body blockquote {
margin: 8px 0;
padding: 8px 12px;
border-left: 4px solid #dfe2e5;
color: #6a737d;
background-color: #f6f8fa;
}
.markdown-body blockquote p {
margin: 0;
}
.markdown-body a {
color: #0366d6;
text-decoration: none;
}
.markdown-body a:hover {
text-decoration: underline;
}
.markdown-body table {
border-collapse: collapse;
margin: 8px 0;
width: 100%;
}
.markdown-body th, .markdown-body td {
border: 1px solid #dfe2e5;
padding: 6px 12px;
text-align: left;
}
.markdown-body th {
background-color: #f6f8fa;
font-weight: 600;
}
.markdown-body hr {
border: none;
border-top: 1px solid #dfe2e5;
margin: 12px 0;
}
.markdown-body strong {
font-weight: 600;
}
.markdown-body em {
font-style: italic;
}
/* 图片缩略图样式 */
.image-thumbnail-container {
display: inline-block;
position: relative;
cursor: pointer;
margin: 4px 2px;
border-radius: 6px;
overflow: hidden;
box-shadow: 0 2px 8px rgba(0, 0, 0, 0.15);
transition: transform 0.2s ease, box-shadow 0.2s ease;
max-width: 200px;
vertical-align: middle;
}
.image-thumbnail-container:hover {
transform: scale(1.02);
box-shadow: 0 4px 12px rgba(0, 0, 0, 0.25);
}
.message-image-thumbnail {
display: block;
max-width: 200px;
max-height: 150px;
object-fit: cover;
border-radius: 6px;
}
.image-zoom-hint {
position: absolute;
bottom: 0;
left: 0;
right: 0;
background: rgba(0, 0, 0, 0.6);
color: #fff;
font-size: 11px;
text-align: center;
padding: 3px 0;
opacity: 0;
transition: opacity 0.2s ease;
}
.image-thumbnail-container:hover .image-zoom-hint {
opacity: 1;
}
.image-path-text {
display: inline-block;
padding: 4px 8px;
background-color: #f0f4ff;
border: 1px solid #d0d8e8;
border-radius: 4px;
font-size: 12px;
color: #617bab;
word-break: break-all;
}
/* prestart 内容中的图片缩略图 */
.prestart-content-inline .image-thumbnail-container {
max-width: 150px;
}
.prestart-content-inline .message-image-thumbnail {
max-width: 150px;
max-height: 100px;
}

View File

@@ -1,4 +1,30 @@
// fayApp.js
// 全局函数:打开图片文件
window.openImageFile = function(encodedPath) {
const filePath = decodeURIComponent(encodedPath);
const baseUrl = window.location.protocol + '//' + window.location.hostname + ':' + window.location.port;
fetch(`${baseUrl}/api/open-image`, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({ path: filePath })
})
.then(response => response.json())
.then(data => {
if (!data.success) {
console.error('打开图片失败:', data.message);
alert('打开图片失败: ' + data.message);
}
})
.catch(error => {
console.error('请求失败:', error);
alert('打开图片时发生错误');
});
};
class FayInterface {
constructor(baseWsUrl, baseApiUrl, vueInstance) {
this.baseWsUrl = baseWsUrl;
@@ -634,12 +660,14 @@ unadoptText(id) {
let mainContent = content;
let prestartContent = '';
// 解析 prestart 标签
const prestartRegex = /<prestart>([\s\S]*?)<\/prestart>/i;
// 解析 prestart 标签 - 使用贪婪匹配确保匹配到最后一个 </prestart>
// 同时支持多个 prestart 标签的情况
const prestartRegex = /<prestart>([\s\S]*)<\/prestart>/i;
const prestartMatch = mainContent.match(prestartRegex);
if (prestartMatch && prestartMatch[1]) {
prestartContent = this.trimThinkLines(prestartMatch[1]);
mainContent = mainContent.replace(prestartRegex, '');
// 移除所有 prestart 标签及其内容
mainContent = mainContent.replace(/<prestart>[\s\S]*<\/prestart>/gi, '');
}
// 先尝试匹配完整的 think 标签
@@ -693,7 +721,64 @@ unadoptText(id) {
const message = this.messages[index];
this.$set(message, 'prestartExpanded', !message.prestartExpanded);
},
// 检测并转换图片路径为缩略图
convertImagePaths(content) {
if (!content) return content;
// 匹配常见图片路径格式:
// Windows: D:\path\to\image.png 或 D:/path/to/image.png
// Unix: /path/to/image.png
// 支持的图片格式: png, jpg, jpeg, gif, bmp, webp
const imagePathRegex = /([A-Za-z]:[\\\/][^\s<>"']+\.(png|jpg|jpeg|gif|bmp|webp)|\/[^\s<>"']+\.(png|jpg|jpeg|gif|bmp|webp))/gi;
const baseUrl = window.location.protocol + '//' + window.location.hostname + ':' + window.location.port;
return content.replace(imagePathRegex, (match) => {
// 对原始路径进行编码
const encodedPath = encodeURIComponent(match);
// 通过后端 API 获取图片(解决浏览器安全限制)
const imgSrc = `${baseUrl}/api/local-image?path=${encodedPath}`;
// 用于显示的安全路径
const displayPath = match.replace(/\\/g, '/').replace(/'/g, '&#39;').replace(/"/g, '&quot;');
return `<span class="image-thumbnail-container" onclick="window.openImageFile('${encodedPath}')">
<img src="${imgSrc}" class="message-image-thumbnail" alt="图片" onerror="this.parentElement.innerHTML='<span class=\\'image-path-text\\'>${displayPath}</span>'" />
<span class="image-zoom-hint">点击查看</span>
</span>`;
});
},
// 渲染 Markdown 内容
renderMarkdown(content) {
if (!content) return '';
try {
// 配置 marked 选项
if (typeof marked !== 'undefined') {
marked.setOptions({
breaks: true, // 支持换行
gfm: true, // 支持 GitHub 风格的 Markdown
});
// 预处理:确保 ** 和 * 标记能正确解析
// 处理中文加粗:**文字** 后面可能有空格或其他字符
let processed = content;
// 手动处理加粗语法 **text**
processed = processed.replace(/\*\*([^*\n]+)\*\*/g, '<strong>$1</strong>');
// 手动处理斜体语法 *text*(避免与加粗冲突)
processed = processed.replace(/(?<!\*)\*([^*\n]+)\*(?!\*)/g, '<em>$1</em>');
// 对剩余内容使用 marked 解析
let result = marked.parse(processed);
// 转换图片路径为缩略图
result = this.convertImagePaths(result);
return result;
}
} catch (e) {
console.error('Markdown rendering error:', e);
}
// 如果 marked 不可用,返回简单处理的内容
let result = content.replace(/\n/g, '<br>');
result = this.convertImagePaths(result);
return result;
},
// 检查MCP服务器状态
checkMcpStatus() {
const mcpUrl = `http://${this.hostname}:5010/api/mcp/servers`;

View File

@@ -10,6 +10,7 @@
<script src="{{ url_for('static',filename='js/vue.js') }}"></script>
<script src="{{ url_for('static',filename='js/element-ui.js') }}"></script>
<link rel="stylesheet" href="{{ url_for('static',filename='css/element/theme-chalk.css') }}" />
<script src="https://cdn.jsdelivr.net/npm/marked/marked.min.js"></script>
<script src="{{ url_for('static',filename='js/index.js') }}" defer></script>
<script src="{{ url_for('static',filename='js/script.js') }}" defer></script>
</head>
@@ -51,12 +52,12 @@
<span v-if="item.thinkLoading" class="think-spinner"></span>
</div>
<div v-if="(parseThinkContent(item.content).thinkContent || item.thinkLoading) && item.thinkExpanded" class="think-content-inline">[[parseThinkContent(item.content).thinkContent]]</div>
<div class="message-text">[[parseThinkContent(item.content).mainContent]]</div>
<div class="message-text markdown-body" v-html="renderMarkdown(parseThinkContent(item.content).mainContent)"></div>
<div v-if="parseThinkContent(item.content).prestartContent" class="prestart-toggle" @click="togglePrestart(index)">
<span class="prestart-arrow" :class="{'expanded': item.prestartExpanded}"></span>
<span class="prestart-label">预启动工具</span>
</div>
<div v-if="parseThinkContent(item.content).prestartContent && item.prestartExpanded" class="prestart-content-inline">[[parseThinkContent(item.content).prestartContent]]</div>
<div v-if="parseThinkContent(item.content).prestartContent && item.prestartExpanded" class="prestart-content-inline" v-html="convertImagePaths(parseThinkContent(item.content).prestartContent)"></div>
</div>
<div class="message-time">
<span class="what-time">[[item.timetext]]</span>

Binary file not shown.

File diff suppressed because it is too large Load Diff

View File

@@ -491,7 +491,6 @@ def _build_planner_messages(state: AgentState) -> List[SystemMessage | HumanMess
conversation = state.get("messages", []) or []
history = state.get("tool_results", []) or []
memory_context = context.get("memory_context", "")
knowledge_context = context.get("knowledge_context", "")
observation = context.get("observation", "")
prestart_context = context.get("prestart_context", "")
@@ -520,9 +519,6 @@ def _build_planner_messages(state: AgentState) -> List[SystemMessage | HumanMess
**关联记忆**
{memory_context or '(无相关记忆)'}
**关联知识**
{knowledge_context or '(无相关知识)'}
{prestart_section}
**可用工具**
{tools_text}
@@ -540,9 +536,6 @@ def _build_planner_messages(state: AgentState) -> List[SystemMessage | HumanMess
{{"action": "finish_text"}}"""
).strip()
print("***********************************************************************")
print(user_block)
print("****************************************************************")
return [
SystemMessage(content="你负责规划下一步行动,请严格输出合法 JSON。"),
HumanMessage(content=user_block),
@@ -553,7 +546,6 @@ def _build_final_messages(state: AgentState) -> List[SystemMessage | HumanMessag
context = state.get("context", {}) or {}
system_prompt = context.get("system_prompt", "")
request = state.get("request", "")
knowledge_context = context.get("knowledge_context", "")
memory_context = context.get("memory_context", "")
observation = context.get("observation", "")
prestart_context = context.get("prestart_context", "")
@@ -579,9 +571,6 @@ def _build_final_messages(state: AgentState) -> List[SystemMessage | HumanMessag
**关联记忆**
{memory_context or '(无相关记忆)'}
**关联知识**
{knowledge_context or '(无相关知识)'}
{prestart_section}
**其他观察**
{observation or '(无补充)'}
@@ -593,9 +582,6 @@ def _build_final_messages(state: AgentState) -> List[SystemMessage | HumanMessag
{conversation_block}"""
).strip()
print("***********************************************************************")
print(user_block)
print("****************************************************************")
return [
SystemMessage(content="你是最终回复的口播助手,请用中文自然表达。"),
HumanMessage(content=user_block),
@@ -1464,22 +1450,6 @@ def question(content, username, observation=None):
except Exception as exc:
util.log(1, f"获取相关记忆时出错: {exc}")
knowledge_context = ""
try:
knowledge_base = get_knowledge_base()
if knowledge_base:
knowledge_results = search_knowledge_base(content, knowledge_base, max_results=3)
if knowledge_results:
parts = ["**本地知识库相关信息**"]
for result in knowledge_results:
parts.append(f"来源文件:{result['file_name']}")
parts.append(result["content"])
parts.append("")
knowledge_context = "\n".join(parts).strip()
util.log(1, f"找到 {len(knowledge_results)} 条相关知识库信息")
except Exception as exc:
util.log(1, f"搜索知识库时出错: {exc}")
prestart_context = ""
try:
prestart_context = _run_prestart_tools(content)
@@ -1703,7 +1673,6 @@ def question(content, username, observation=None):
"max_steps": 30,
"context": {
"system_prompt": system_prompt,
"knowledge_context": knowledge_context,
"observation": observation,
"memory_context": memory_context,
"prestart_context": prestart_context,
@@ -1838,7 +1807,6 @@ def question(content, username, observation=None):
"planner_preview": None,
"context": {
"system_prompt": system_prompt,
"knowledge_context": knowledge_context,
"observation": observation,
"memory_context": memory_context,
"prestart_context": prestart_context,

View File

@@ -0,0 +1,34 @@
# Window Capture MCP Server
在 Windows 上按窗口标题(或句柄)截图的 MCP 服务器,提供两个工具:
- `list_windows`:列出当前顶层窗口,可按关键词过滤。
- `capture_window`:按窗口标题关键字或句柄截屏,返回 PNG同时保存到本地
## 准备
1. 进入本目录:`cd mcp_servers/window_capture`
2. 安装依赖:`pip install -r requirements.txt`(需要 Pillow仅支持 Windows
默认保存目录:`cache_data/window_captures`(相对仓库根目录,可在调用时通过 `save_dir` 自定义)。
## 运行
```bash
python mcp_servers/window_capture/server.py
```
或在 Fay 的 MCP 管理页面添加一条记录:
- transport: `stdio`
- command: `python`
- args: `["mcp_servers/window_capture/server.py"]`
- cwd: 仓库根目录或留空
## 工具参数
- `list_windows`
- `keyword` (可选): 标题关键字,模糊匹配,不区分大小写。
- `include_hidden` (可选): 是否包含隐藏/最小化窗口,默认 false。
- `limit` (可选): 最大返回数量,默认 200 表示不限制。
- `capture_window`
- `window` (必填): 窗口标题关键字,或窗口句柄(十进制/0x16 进制)。
- `include_hidden` (可选): 允许捕获隐藏/最小化窗口,默认 false。
- `save_dir` (可选): 自定义保存路径。
返回内容:
- 文本摘要JSON 字符串,包含窗口信息与保存路径)。截图文件保存在 `cache_data/window_captures` 或自定义 `save_dir`

View File

@@ -0,0 +1 @@
Pillow>=9.5.0

View File

@@ -0,0 +1,363 @@
#!/usr/bin/env python3
"""
Window Capture MCP server.
Tools:
- list_windows: enumerate top-level windows with optional keyword filtering.
- capture_window: take a PNG screenshot of a specific window by title keyword or handle.
Only Windows is supported because the capture path relies on Win32 APIs and Pillow's ImageGrab.
"""
import asyncio
import ctypes
from ctypes import wintypes
import json
import os
import sys
import time
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
try:
from PIL import ImageGrab
except ImportError:
print("Pillow not installed. Please run: pip install Pillow", file=sys.stderr)
sys.exit(1)
try:
from mcp.server import Server
from mcp.types import Tool, TextContent
import mcp.server.stdio
except ImportError:
print("MCP library not installed. Please run: pip install mcp", file=sys.stderr)
sys.exit(1)
PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
DEFAULT_SAVE_DIR = os.path.join(PROJECT_ROOT, "cache_data", "window_captures")
os.makedirs(DEFAULT_SAVE_DIR, exist_ok=True)
if os.name != "nt":
print("window_capture MCP server currently supports Windows only.", file=sys.stderr)
server = Server("window_capture")
user32 = ctypes.windll.user32
SW_RESTORE = 9
try:
user32.SetProcessDPIAware()
except Exception:
pass
@dataclass
class WindowInfo:
handle: int
title: str
cls: str
rect: Tuple[int, int, int, int]
visible: bool
minimized: bool
def to_dict(self) -> Dict[str, Any]:
left, top, right, bottom = self.rect
return {
"title": self.title,
"class": self.cls,
"handle": self.handle,
"handle_hex": hex(self.handle),
"visible": self.visible,
"minimized": self.minimized,
"rect": {"left": left, "top": top, "right": right, "bottom": bottom},
"size": {"width": max(0, right - left), "height": max(0, bottom - top)},
}
class WindowCaptureError(Exception):
pass
def _text_content(text: str):
try:
return TextContent(type="text", text=text)
except Exception:
return {"type": "text", "text": text}
def _sanitize_filename(name: str) -> str:
cleaned = "".join(ch for ch in name if ch.isalnum() or ch in (" ", "_", "-"))
cleaned = cleaned.strip().replace(" ", "_")
return cleaned or "window"
def _enum_windows(keyword: Optional[str] = None, include_hidden: bool = False, limit: int = 30) -> List[WindowInfo]:
if os.name != "nt":
raise WindowCaptureError("Window enumeration is only supported on Windows.")
results: List[WindowInfo] = []
keyword_l = keyword.lower() if keyword else None
def callback(hwnd, _lparam):
if not user32.IsWindow(hwnd):
return True
if not include_hidden and not user32.IsWindowVisible(hwnd):
return True
length = user32.GetWindowTextLengthW(hwnd)
if length == 0:
return True
title_buf = ctypes.create_unicode_buffer(length + 1)
user32.GetWindowTextW(hwnd, title_buf, length + 1)
title = title_buf.value.strip()
if not title:
return True
if keyword_l and keyword_l not in title.lower():
return True
class_buf = ctypes.create_unicode_buffer(256)
user32.GetClassNameW(hwnd, class_buf, 255)
rect = wintypes.RECT()
if not user32.GetWindowRect(hwnd, ctypes.byref(rect)):
return True
info = WindowInfo(
handle=int(hwnd),
title=title,
cls=class_buf.value,
rect=(rect.left, rect.top, rect.right, rect.bottom),
visible=bool(user32.IsWindowVisible(hwnd)),
minimized=bool(user32.IsIconic(hwnd)),
)
results.append(info)
if limit > 0 and len(results) >= limit:
return False
return True
enum_proc = ctypes.WINFUNCTYPE(ctypes.c_bool, wintypes.HWND, wintypes.LPARAM)
user32.EnumWindows(enum_proc(callback), 0)
results.sort(key=lambda w: w.title.lower())
return results
def _parse_handle(value: str) -> Optional[int]:
text = value.strip().lower()
if text.startswith("0x"):
try:
return int(text, 16)
except ValueError:
return None
if text.isdigit():
try:
return int(text)
except ValueError:
return None
return None
def _resolve_window(query: str, include_hidden: bool = False) -> WindowInfo:
if not query or not str(query).strip():
raise WindowCaptureError("Window identifier is required.")
handle_candidate = _parse_handle(str(query))
if handle_candidate is not None:
windows = _enum_windows(None, include_hidden=include_hidden, limit=0)
for win in windows:
if win.handle == handle_candidate:
return win
raise WindowCaptureError(f"Window handle {handle_candidate} not found.")
matches = _enum_windows(query, include_hidden=include_hidden, limit=50)
if not matches:
raise WindowCaptureError(f"No window matched keyword '{query}'.")
exact = [w for w in matches if w.title.lower() == query.lower()]
if len(exact) == 1:
return exact[0]
if len(matches) == 1:
return matches[0]
names = "; ".join(w.title for w in matches[:6])
raise WindowCaptureError(f"Multiple windows matched. Please be more specific. Candidates: {names}")
def _get_foreground_window() -> int:
"""获取当前前台窗口句柄"""
try:
return user32.GetForegroundWindow()
except Exception:
return 0
def _activate_window(hwnd: int) -> bool:
"""激活指定窗口,返回是否成功"""
try:
# 如果窗口最小化,先恢复
if user32.IsIconic(hwnd):
user32.ShowWindow(hwnd, SW_RESTORE)
time.sleep(0.1)
# 尝试多种方式激活窗口
# 方法1: 使用 SetForegroundWindow
result = user32.SetForegroundWindow(hwnd)
if not result:
# 方法2: 使用 keybd_event 模拟 Alt 键来允许切换前台
ALT_KEY = 0x12
KEYEVENTF_EXTENDEDKEY = 0x0001
KEYEVENTF_KEYUP = 0x0002
user32.keybd_event(ALT_KEY, 0, KEYEVENTF_EXTENDEDKEY, 0)
user32.SetForegroundWindow(hwnd)
user32.keybd_event(ALT_KEY, 0, KEYEVENTF_EXTENDEDKEY | KEYEVENTF_KEYUP, 0)
time.sleep(0.3) # 等待窗口完全显示
return True
except Exception:
return False
def _get_window_rect(hwnd: int) -> Tuple[int, int, int, int]:
"""获取窗口的最新坐标"""
rect = wintypes.RECT()
if user32.GetWindowRect(hwnd, ctypes.byref(rect)):
return (rect.left, rect.top, rect.right, rect.bottom)
return (0, 0, 0, 0)
def capture_window(query: str, save_dir: Optional[str] = None, include_hidden: bool = False):
if os.name != "nt":
raise WindowCaptureError("Window capture is only supported on Windows.")
window = _resolve_window(query, include_hidden=include_hidden)
# 记录当前前台窗口,截图后恢复
original_foreground = _get_foreground_window()
try:
# 激活目标窗口
_activate_window(window.handle)
# 激活后重新获取窗口坐标(窗口位置可能在激活/恢复后发生变化)
left, top, right, bottom = _get_window_rect(window.handle)
if right - left <= 0 or bottom - top <= 0:
raise WindowCaptureError("Target window has zero area.")
# 更新 window 对象的 rect 以便返回正确信息
window.rect = (left, top, right, bottom)
try:
img = ImageGrab.grab(bbox=(left, top, right, bottom))
except Exception as exc:
raise WindowCaptureError(f"ImageGrab failed: {exc}") from exc
save_dir = save_dir or DEFAULT_SAVE_DIR
os.makedirs(save_dir, exist_ok=True)
filename = f"{_sanitize_filename(window.title)}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
save_path = os.path.abspath(os.path.join(save_dir, filename))
img.save(save_path, format="PNG")
return save_path
finally:
# 恢复原来的前台窗口
if original_foreground and original_foreground != window.handle:
time.sleep(0.1)
_activate_window(original_foreground)
TOOLS: List[Tool] = [
Tool(
name="list_windows",
description="List visible top-level windows. Supports keyword filter and optional hidden windows.",
inputSchema={
"type": "object",
"properties": {
"keyword": {"type": "string", "description": "Substring to match in the window title (case-insensitive)."},
"include_hidden": {"type": "boolean", "description": "Include hidden/minimized windows."},
"limit": {"type": "integer", "description": "Max number of windows to return (0 means no limit)."},
},
"required": [],
},
),
Tool(
name="capture_window",
description="Capture a PNG screenshot of a specific window by title keyword or numeric/hex handle; returns local file path.",
inputSchema={
"type": "object",
"properties": {
"window": {
"type": "string",
"description": "Title keyword or window handle (e.g. 'Notepad', '197324', or '0x2ff3e').",
},
"include_hidden": {"type": "boolean", "description": "Allow capturing hidden/minimized windows."},
"save_dir": {
"type": "string",
"description": f"Optional folder to save the PNG. Default: {DEFAULT_SAVE_DIR}",
},
},
"required": ["window"],
},
),
]
@server.list_tools()
async def list_tools() -> List[Tool]:
return TOOLS
@server.call_tool()
async def call_tool(name: str, arguments: Optional[Dict[str, Any]]) -> List[Any]:
args = arguments or {}
try:
if name == "list_windows":
keyword = args.get("keyword")
include_hidden = bool(args.get("include_hidden", False))
limit_raw = args.get("limit", 20)
try:
limit_val = int(limit_raw)
except Exception:
limit_val = 20
limit_val = max(0, min(limit_val, 200))
windows = _enum_windows(keyword, include_hidden=include_hidden, limit=limit_val or 0)
payload = {
"count": len(windows),
"keyword": keyword or "",
"include_hidden": include_hidden,
"windows": [w.to_dict() for w in windows],
}
text = json.dumps(payload, ensure_ascii=False, indent=2)
return [_text_content(text)]
if name == "capture_window":
query = args.get("window") or args.get("title")
include_hidden = bool(args.get("include_hidden", False))
save_dir = args.get("save_dir") or None
save_path = capture_window(query, save_dir=save_dir, include_hidden=include_hidden)
return [_text_content(save_path)]
return [_text_content(f"Unknown tool: {name}")]
except Exception as exc:
return [_text_content(f"Error running tool {name}: {exc}")]
async def main() -> None:
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
init_opts = server.create_initialization_options()
await server.run(read_stream, write_stream, init_opts)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass

View File

@@ -0,0 +1,43 @@
## YueShen RAG MCP Server
扫描 `新知识库`(或自定义目录)的 pdf/docx按段落/句子切块写入 Chroma并提供检索工具。Embedding 配置可通过 MCP 参数或环境变量传入。
### 依赖
```bash
pip install -r requirements.txt
```
- 若有 `.doc` 请先转换为 `.docx` 再处理;当前依赖仅支持 pdf/docx。
### 环境变量(可选)
- `YUESHEN_CORPUS_DIR`:知识库原始文档目录(默认 `新知识库`
- `YUESHEN_PERSIST_DIR`Chroma 向量库持久化目录(默认 `cache_data/chromadb_yueshen`
- `YUESHEN_EMBED_BASE_URL`Embedding API base url将拼接 `/embeddings`
- `YUESHEN_EMBED_API_KEY`Embedding API key
- `YUESHEN_EMBED_MODEL`Embedding 模型名(默认 `text-embedding-3-small`
- `YUESHEN_AUTO_INGEST`:是否启用启动即自动扫描入库(默认 1设为 0 关闭)
- `YUESHEN_AUTO_INTERVAL`:自动扫描间隔秒(默认 300最小 30
- `YUESHEN_AUTO_RESET_ON_START`:启动时是否 reset 后重建索引(默认 0
### 运行
```bash
cd mcp_servers/yueshen_rag
python server.py
```
### 添加到 Fay
- MCP 管理页面新增服务器transport 选 `stdio`command 填 Python`python` 或虚拟环境路径args `["mcp_servers/yueshen_rag/server.py"]`cwd 指向项目根目录;如需自定义 Embedding填入 env 的 base url / api key / model。
- 也可以直接编辑 `faymcp/data/mcp_servers.json` 添加对应项,重启 Fay MCP 服务后生效。
### 预启动推荐
- 在 MCP 页面工具列表为 `query_yueshen` 打开“预启动”,参数示例:`{"query": "{{question}}", "top_k": 4}`,用户提问会替换 `{{question}}`
- 若希望启动后自动补扫新文档,可为 `ingest_yueshen` 配置预启动(如 `{"reset": false}` 或指定 `corpus_dir`/`batch_size` 等)。
### 工具
- `ingest_yueshen`:扫描并入库;参数 `corpus_dir``reset``chunk_size``overlap``batch_size``max_files`,以及可选 `embedding_base_url`/`embedding_api_key`/`embedding_model` 覆盖环境变量。
- `query_yueshen`:向量检索;参数 `query`,可选 `top_k``where`,以及可选 embedding 配置与 ingest 保持一致。
- `yueshen_stats`:查看向量库状态(持久化目录、集合名、向量数等)。
### 默认路径与切块
- 语料目录:`悦肾e家知识库202511/新知识库`
- 持久化目录:`cache_data/chromadb_yueshen`
- 切块:约 600 字120 重叠,可按需调整

View File

@@ -0,0 +1,6 @@
mcp
chromadb
pdfplumber
python-docx
requests
# doc 格式已转成 docx则无需额外依赖

View File

@@ -0,0 +1,687 @@
#!/usr/bin/env python3
"""
YueShen Knowledge Base RAG MCP Server
- Load pdf/docx from a directory, chunk, and write into Chroma.
- Embedding config is provided via MCP tool params or env vars, not system.conf.
- Auto-ingest watcher can run on startup to keep the index fresh.
- Tools: ingest_yueshen, query_yueshen, yueshen_stats.
"""
import hashlib
import json
import logging
import os
import re
import sys
import time
import threading
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional, Tuple
import requests
# Keep stdout clean for MCP stdio; route logs to stderr and disable Chroma telemetry noise.
os.environ.setdefault("CHROMA_TELEMETRY", "FALSE")
# Make project root importable (for optional fallback embedding)
PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
if PROJECT_ROOT not in sys.path:
sys.path.insert(0, PROJECT_ROOT)
try:
from mcp.server import Server
from mcp.types import Tool, TextContent
import mcp.server.stdio
except ImportError:
print("MCP library not installed. Please run: pip install mcp", file=sys.stderr, flush=True)
sys.exit(1)
try:
import chromadb
except ImportError:
print("chromadb not installed. Please run: pip install chromadb", file=sys.stderr, flush=True)
sys.exit(1)
server = Server("yueshen_rag")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
stream=sys.stderr,
)
logger = logging.getLogger("yueshen_rag")
# Defaults (can be overridden via env)
DEFAULT_CORPUS_DIR = os.getenv(
"YUESHEN_CORPUS_DIR",
os.path.join(PROJECT_ROOT, "新知识库"),
)
DEFAULT_PERSIST_DIR = os.getenv(
"YUESHEN_PERSIST_DIR",
os.path.join(PROJECT_ROOT, "cache_data", "chromadb_yueshen"),
)
COLLECTION_NAME = "yueshen_kb"
DEFAULT_EMBED_BASE_URL = os.getenv("YUESHEN_EMBED_BASE_URL")
DEFAULT_EMBED_API_KEY = os.getenv("YUESHEN_EMBED_API_KEY")
DEFAULT_EMBED_MODEL = os.getenv("YUESHEN_EMBED_MODEL", "text-embedding-3-small")
AUTO_INGEST_ENABLED = os.getenv("YUESHEN_AUTO_INGEST", "1") != "0"
AUTO_INGEST_INTERVAL = int(os.getenv("YUESHEN_AUTO_INTERVAL", "300"))
AUTO_RESET_ON_START = os.getenv("YUESHEN_AUTO_RESET_ON_START", "0") != "0"
# -------------------- Text chunking -------------------- #
def _len_with_newlines(parts: List[str]) -> int:
if not parts:
return 0
return sum(len(p) for p in parts) + (len(parts) - 1)
def split_into_chunks(text: str, chunk_size: int = 600, overlap: int = 120) -> List[str]:
"""Paragraph/ sentence-aware chunking with small overlap."""
cleaned = re.sub(r"[ \t]+", " ", text.replace("\u00a0", " ")).strip()
paragraphs = [p.strip() for p in re.split(r"\n\s*\n", cleaned) if p.strip()]
segments: List[str] = []
for para in paragraphs:
if len(para) <= chunk_size:
segments.append(para)
else:
for sent in re.split(r"(?<=[。!?!?…])", para):
s = sent.strip()
if s:
segments.append(s)
chunks: List[str] = []
buf: List[str] = []
for seg in segments:
seg = seg.strip()
if not seg:
continue
current_len = _len_with_newlines(buf)
if current_len + len(seg) + (1 if buf else 0) <= chunk_size:
buf.append(seg)
continue
if buf:
chunks.append("\n".join(buf).strip())
# build overlap from previous chunk tail
buf = []
if overlap > 0 and chunks:
tail: List[str] = []
tail_len = 0
for s in reversed(chunks[-1].split("\n")):
tail.insert(0, s)
tail_len += len(s)
if tail_len >= overlap:
break
if tail:
buf.extend(tail)
buf.append(seg)
if buf:
chunks.append("\n".join(buf).strip())
return chunks
# -------------------- Document readers -------------------- #
def _extract_docx(path: str) -> str:
from docx import Document
doc = Document(path)
texts: List[str] = []
for para in doc.paragraphs:
t = para.text.strip()
if t:
texts.append(t)
for table in doc.tables:
for row in table.rows:
cells = [cell.text.strip() for cell in row.cells if cell.text.strip()]
if cells:
texts.append(" | ".join(cells))
return "\n".join(texts)
def _extract_pdf_pages(path: str) -> List[Tuple[int, str]]:
try:
import pdfplumber
except ImportError as exc:
raise RuntimeError("pdfplumber is required for pdf parsing") from exc
pages: List[Tuple[int, str]] = []
with pdfplumber.open(path) as pdf:
for idx, page in enumerate(pdf.pages, start=1):
txt = page.extract_text() or ""
pages.append((idx, txt))
return pages
# -------------------- Data models -------------------- #
@dataclass
class Chunk:
text: str
source_path: str
page: Optional[int]
chunk_id: str
metadata: Dict[str, Any]
# -------------------- Corpus loader -------------------- #
class CorpusLoader:
def __init__(self, root_dir: str = DEFAULT_CORPUS_DIR, chunk_size: int = 600, overlap: int = 120):
self.root_dir = root_dir
self.chunk_size = chunk_size
self.overlap = overlap
def _iter_files(self) -> Iterable[str]:
for root, _, files in os.walk(self.root_dir):
for fn in files:
if fn.lower().endswith((".pdf", ".docx")):
yield os.path.join(root, fn)
def _file_to_chunks(self, path: str) -> List[Chunk]:
ext = os.path.splitext(path)[1].lower()
rel_path = os.path.relpath(path, self.root_dir)
chunks: List[Chunk] = []
try:
if ext == ".pdf":
pages = _extract_pdf_pages(path)
for page_num, page_text in pages:
for idx, chunk_text in enumerate(
split_into_chunks(page_text, chunk_size=self.chunk_size, overlap=self.overlap)
):
chunk_id = hashlib.md5(
f"{rel_path}|{page_num}|{idx}|{chunk_text}".encode("utf-8", errors="ignore")
).hexdigest()
chunks.append(
Chunk(
text=chunk_text,
source_path=rel_path,
page=page_num,
chunk_id=chunk_id,
metadata={"source": rel_path, "page": page_num, "ext": ext},
)
)
elif ext == ".docx":
text = _extract_docx(path)
for idx, chunk_text in enumerate(
split_into_chunks(text, chunk_size=self.chunk_size, overlap=self.overlap)
):
chunk_id = hashlib.md5(
f"{rel_path}|docx|{idx}|{chunk_text}".encode("utf-8", errors="ignore")
).hexdigest()
chunks.append(
Chunk(
text=chunk_text,
source_path=rel_path,
page=None,
chunk_id=chunk_id,
metadata={"source": rel_path, "ext": ext},
)
)
except Exception as exc:
logger.warning("Skip file due to parse error %s: %s", rel_path, exc)
return chunks
def load(self, max_files: Optional[int] = None) -> List[Chunk]:
all_chunks: List[Chunk] = []
for idx, file_path in enumerate(self._iter_files(), start=1):
if max_files and idx > max_files:
break
all_chunks.extend(self._file_to_chunks(file_path))
return all_chunks
# -------------------- Embedding backend -------------------- #
class EmbeddingBackend:
"""Embedding client with API config, falling back to project encoder if needed."""
def __init__(
self,
base_url: Optional[str] = None,
api_key: Optional[str] = None,
model: Optional[str] = None,
):
self.base_url = base_url or DEFAULT_EMBED_BASE_URL
self.api_key = api_key or DEFAULT_EMBED_API_KEY
self.model = model or DEFAULT_EMBED_MODEL
self._cache: Dict[str, List[float]] = {}
self._fallback_encoder = None
try:
from simulation_engine.gpt_structure import get_text_embedding as _fallback
self._fallback_encoder = _fallback
except Exception as exc:
logger.info("Fallback embedding not available: %s", exc)
def _call_api(self, text: str) -> List[float]:
if not self.base_url or not self.api_key:
raise RuntimeError("Embedding API config missing (base_url/api_key)")
url = self.base_url.rstrip("/") + "/embeddings"
payload = {"input": text, "model": self.model}
headers = {"Authorization": f"Bearer {self.api_key}"}
resp = requests.post(url, json=payload, headers=headers, timeout=30)
if resp.status_code != 200:
raise RuntimeError(f"Embedding API error: {resp.status_code} {resp.text}")
data = resp.json()
embedding = data.get("data", [{}])[0].get("embedding")
if embedding is None:
raise RuntimeError("Embedding API response missing embedding")
return embedding
def encode(self, text: str) -> List[float]:
cache_key = hashlib.md5(f"{self.model}|{self.base_url}|{text}".encode("utf-8", errors="ignore")).hexdigest()
if cache_key in self._cache:
return self._cache[cache_key]
embedding: Optional[List[float]] = None
if self.base_url and self.api_key:
embedding = self._call_api(text)
elif self._fallback_encoder:
embedding = self._fallback_encoder(text)
else:
raise RuntimeError("No embedding method available (provide base_url/api_key or enable fallback).")
if not isinstance(embedding, list):
embedding = list(embedding)
self._cache[cache_key] = embedding
return embedding
# -------------------- Chroma store -------------------- #
class ChromaStore:
def __init__(
self,
persist_dir: str = DEFAULT_PERSIST_DIR,
collection_name: str = COLLECTION_NAME,
embedder: Optional[EmbeddingBackend] = None,
):
os.makedirs(persist_dir, exist_ok=True)
self.persist_dir = persist_dir
self.collection_name = collection_name
self.embedder = embedder or EmbeddingBackend()
self.client = chromadb.PersistentClient(path=persist_dir)
self.collection = self.client.get_or_create_collection(collection_name)
def reset(self):
self.client.delete_collection(self.collection_name)
self.collection = self.client.get_or_create_collection(self.collection_name)
def upsert_chunks(self, chunks: List[Chunk], batch_size: int = 32) -> Dict[str, Any]:
start = time.time()
total = 0
ids: List[str] = []
docs: List[str] = []
metas: List[Dict[str, Any]] = []
embs: List[List[float]] = []
def flush():
nonlocal total, ids, docs, metas, embs
if not ids:
return
self.collection.upsert(ids=ids, documents=docs, metadatas=metas, embeddings=embs)
total += len(ids)
ids, docs, metas, embs = [], [], [], []
for chunk in chunks:
ids.append(chunk.chunk_id)
docs.append(chunk.text)
metas.append(chunk.metadata)
try:
embs.append(self.embedder.encode(chunk.text))
except Exception as exc:
logger.error("Embedding failed, skip id=%s: %s", chunk.chunk_id, exc)
ids.pop()
docs.pop()
metas.pop()
continue
if len(ids) >= batch_size:
flush()
flush()
elapsed = time.time() - start
return {"inserted": total, "seconds": round(elapsed, 2)}
def query(self, query: str, top_k: int = 5, where: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
emb = self.embedder.encode(query)
res = self.collection.query(query_embeddings=[emb], n_results=top_k, where=where if where else None)
results = []
ids = res.get("ids", [[]])[0]
docs = res.get("documents", [[]])[0]
metas = res.get("metadatas", [[]])[0]
dists = res.get("distances", [[]])[0] if "distances" in res else [None] * len(ids)
for i in range(len(ids)):
results.append(
{"id": ids[i], "document": docs[i], "metadata": metas[i], "distance": dists[i]}
)
return {"results": results, "count": len(results)}
def stats(self) -> Dict[str, Any]:
try:
count = self.collection.count()
except Exception:
count = None
return {"persist_dir": self.persist_dir, "collection": self.collection_name, "vectors": count}
# -------------------- Knowledge manager -------------------- #
class KnowledgeManager:
def __init__(
self,
corpus_dir: str = DEFAULT_CORPUS_DIR,
persist_dir: str = DEFAULT_PERSIST_DIR,
embedder: Optional[EmbeddingBackend] = None,
):
self.corpus_dir = corpus_dir
self.persist_dir = persist_dir
self.embedder = embedder or EmbeddingBackend()
self.store = ChromaStore(persist_dir=persist_dir, collection_name=COLLECTION_NAME, embedder=self.embedder)
def _refresh_embedder(self, base_url: Optional[str], api_key: Optional[str], model: Optional[str]):
if any([base_url, api_key, model]):
self.embedder = EmbeddingBackend(base_url=base_url, api_key=api_key, model=model)
self.store = ChromaStore(
persist_dir=self.persist_dir, collection_name=COLLECTION_NAME, embedder=self.embedder
)
def ingest(
self,
corpus_dir: Optional[str] = None,
reset: bool = False,
chunk_size: int = 600,
overlap: int = 120,
batch_size: int = 32,
max_files: Optional[int] = None,
embedding_base_url: Optional[str] = None,
embedding_api_key: Optional[str] = None,
embedding_model: Optional[str] = None,
) -> Dict[str, Any]:
self._refresh_embedder(embedding_base_url, embedding_api_key, embedding_model)
target_dir = corpus_dir or self.corpus_dir
loader = CorpusLoader(root_dir=target_dir, chunk_size=chunk_size, overlap=overlap)
if reset:
self.store.reset()
chunks = loader.load(max_files=max_files)
logger.info("Loaded %d chunks from %s, start upsert...", len(chunks), target_dir)
upsert_res = self.store.upsert_chunks(chunks, batch_size=batch_size)
return {
"success": True,
"message": "ingest completed",
"chunks": len(chunks),
"inserted": upsert_res.get("inserted", 0),
"seconds": upsert_res.get("seconds"),
"persist_dir": self.persist_dir,
"collection": COLLECTION_NAME,
"corpus_dir": target_dir,
"embedding_base_url": self.embedder.base_url,
"embedding_model": self.embedder.model,
}
def query(
self,
query: str,
top_k: int = 5,
where: Optional[Dict[str, Any]] = None,
embedding_base_url: Optional[str] = None,
embedding_api_key: Optional[str] = None,
embedding_model: Optional[str] = None,
) -> Dict[str, Any]:
self._refresh_embedder(embedding_base_url, embedding_api_key, embedding_model)
return self.store.query(query=query, top_k=top_k, where=where)
def stats(self) -> Dict[str, Any]:
info = self.store.stats()
info.update(
{
"default_corpus_dir": self.corpus_dir,
"embedding_base_url": self.embedder.base_url,
"embedding_model": self.embedder.model,
}
)
return info
manager = KnowledgeManager()
# -------------------- Auto ingest watcher -------------------- #
class AutoIngestor:
"""Simple polling-based watcher to auto-ingest when files change."""
def __init__(
self,
km: KnowledgeManager,
interval_sec: int = AUTO_INGEST_INTERVAL,
reset_on_start: bool = AUTO_RESET_ON_START,
enabled: bool = AUTO_INGEST_ENABLED,
):
self.km = km
self.interval = max(30, interval_sec)
self.reset_on_start = reset_on_start
self.enabled = enabled
self._stop = threading.Event()
self._thread: Optional[threading.Thread] = None
self._snapshot: Dict[str, Tuple[float, int]] = {}
def _take_snapshot(self) -> Dict[str, Tuple[float, int]]:
snap: Dict[str, Tuple[float, int]] = {}
for root, _, files in os.walk(self.km.corpus_dir):
for fn in files:
if fn.lower().endswith((".pdf", ".docx")):
path = os.path.join(root, fn)
try:
st = os.stat(path)
snap[path] = (st.st_mtime, st.st_size)
except OSError:
continue
return snap
def _has_changes(self) -> bool:
new_snap = self._take_snapshot()
if new_snap != self._snapshot:
self._snapshot = new_snap
return True
return False
def _ingest_once(self, reset: bool = False):
try:
res = self.km.ingest(
corpus_dir=self.km.corpus_dir,
reset=reset,
embedding_base_url=self.km.embedder.base_url,
embedding_api_key=self.km.embedder.api_key,
embedding_model=self.km.embedder.model,
)
logger.info("Auto-ingest done: %s", json.dumps(res, ensure_ascii=False))
except Exception as exc:
logger.error("Auto-ingest failed: %s", exc)
def _loop(self):
# initial snapshot and optional first ingest
self._snapshot = self._take_snapshot()
if self.reset_on_start:
logger.info("Auto-ingest on start (reset=%s)...", self.reset_on_start)
self._ingest_once(reset=True)
elif self.enabled:
logger.info("Auto-ingest initial run...")
self._ingest_once(reset=False)
while not self._stop.wait(self.interval):
if self._has_changes():
logger.info("Detected corpus change, auto-ingest...")
self._ingest_once(reset=False)
def start(self):
if not self.enabled:
logger.info("Auto-ingest disabled via env (YUESHEN_AUTO_INGEST=0)")
return
if self._thread and self._thread.is_alive():
return
self._thread = threading.Thread(target=self._loop, daemon=True)
self._thread.start()
logger.info("Auto-ingest watcher started, interval=%ss", self.interval)
def stop(self):
self._stop.set()
if self._thread:
self._thread.join(timeout=2)
# -------------------- Skip patterns for trivial queries -------------------- #
SKIP_QUERY_PATTERNS = [
# 问候语
r'^你好[啊呀吗]?$', r'^hello[!]?$', r'^hi[!]?$', r'^嗨[!]?$', r'^hey[!]?$',
r'^早[上]?好[啊呀]?$', r'^晚[上]?好[啊呀]?$', r'^下午好[啊呀]?$', r'^中午好[啊呀]?$',
# 简单回复
r'^ok[!]?$', r'^好[的吧啊呀]?[!]?$', r'^行[!]?$', r'^可以[!]?$', r'^没问题[!]?$',
r'^嗯[嗯]?[!]?$', r'^哦[哦]?[!]?$', r'^噢[!]?$',
# 笑声/情绪
r'^哈哈[哈]*[!]?$', r'^呵呵[呵]*[!]?$', r'^嘿嘿[嘿]*[!]?$', r'^嘻嘻[嘻]*[!]?$',
r'^哼[!]?$', r'^呜呜[呜]*[!]?$',
# 日常用语
r'^睡觉[了去]?[!]?$', r'^晚安[!]?$', r'^再见[!]?$', r'^拜拜[!]?$', r'^bye[!]?$',
r'^谢谢[你您]?[!]?$', r'^感谢[!]?$', r'^thanks[!]?$', r'^thank you[!]?$',
r'^对不起[!]?$', r'^抱歉[!]?$', r'^sorry[!]?$',
r'^是[的吧啊]?[!]?$', r'^对[的吧啊]?[!]?$', r'^不是[!]?$', r'^不对[!]?$',
r'^知道了[!]?$', r'^明白了[!]?$', r'^懂了[!]?$', r'^了解[!]?$',
r'^收到[!]?$', r'^好嘞[!]?$', r'^得嘞[!]?$',
# 疑问简单回复
r'^啥[?]?$', r'^什么[?]?$', r'^嗯[?]$', r'^哈[?]$',
# 单字或极短
r'^[.。,!?~]+$',
]
def _is_trivial_query(query: str) -> bool:
"""Check if query is a trivial greeting or simple response that doesn't need KB search."""
if not query:
return True
q = query.strip().lower()
if len(q) <= 2:
return True
for pattern in SKIP_QUERY_PATTERNS:
if re.match(pattern, q, re.IGNORECASE):
return True
return False
# -------------------- MCP tools -------------------- #
@server.list_tools()
async def list_tools() -> List[Tool]:
return [
Tool(
name="ingest_yueshen",
description="Scan directory (pdf/docx/doc), chunk and write to Chroma",
inputSchema={
"type": "object",
"properties": {
"corpus_dir": {"type": "string", "description": "Optional corpus directory override"},
"reset": {"type": "boolean", "description": "Recreate collection before ingest", "default": False},
"chunk_size": {"type": "integer", "description": "Chunk length (chars)", "default": 600},
"overlap": {"type": "integer", "description": "Chunk overlap (chars)", "default": 120},
"batch_size": {"type": "integer", "description": "Upsert batch size", "default": 32},
"max_files": {"type": "integer", "description": "Optional limit for quick test"},
"embedding_base_url": {"type": "string", "description": "Embedding API base url"},
"embedding_api_key": {"type": "string", "description": "Embedding API key"},
"embedding_model": {"type": "string", "description": "Embedding model name"},
},
},
),
Tool(
name="query_yueshen",
description="Vector search in YueShen KB",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string", "description": "User query"},
"top_k": {"type": "integer", "description": "Number of results", "default": 5},
"where": {"type": "object", "description": "Optional metadata filter (Chroma where)"},
"embedding_base_url": {"type": "string", "description": "Embedding API base url"},
"embedding_api_key": {"type": "string", "description": "Embedding API key"},
"embedding_model": {"type": "string", "description": "Embedding model name"},
},
"required": ["query"],
},
),
Tool(
name="yueshen_stats",
description="Show current vector store stats",
inputSchema={"type": "object", "properties": {}},
),
]
@server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]:
try:
if name == "ingest_yueshen":
res = manager.ingest(
corpus_dir=arguments.get("corpus_dir"),
reset=bool(arguments.get("reset", False)),
chunk_size=int(arguments.get("chunk_size", 600)),
overlap=int(arguments.get("overlap", 120)),
batch_size=int(arguments.get("batch_size", 32)),
max_files=arguments.get("max_files"),
embedding_base_url=arguments.get("embedding_base_url"),
embedding_api_key=arguments.get("embedding_api_key"),
embedding_model=arguments.get("embedding_model"),
)
return [TextContent(type="text", text=json.dumps(res, ensure_ascii=False, indent=2))]
if name == "query_yueshen":
query_text = arguments.get("query", "")
# 跳过常见问候和简单回复,不进行知识库查询
if _is_trivial_query(query_text):
return [TextContent(type="text", text=json.dumps({
"results": [],
"count": 0,
"skipped": True,
"reason": "trivial query (greeting or simple response)"
}, ensure_ascii=False, indent=2))]
res = manager.query(
query=query_text,
top_k=int(arguments.get("top_k", 5)),
where=arguments.get("where"),
embedding_base_url=arguments.get("embedding_base_url"),
embedding_api_key=arguments.get("embedding_api_key"),
embedding_model=arguments.get("embedding_model"),
)
return [TextContent(type="text", text=json.dumps(res, ensure_ascii=False, indent=2))]
if name == "yueshen_stats":
res = manager.stats()
return [TextContent(type="text", text=json.dumps(res, ensure_ascii=False, indent=2))]
return [
TextContent(
type="text",
text=json.dumps({"success": False, "message": f"unknown tool: {name}"}, ensure_ascii=False),
)
]
except Exception as exc:
return [
TextContent(
type="text",
text=json.dumps({"success": False, "message": f"exception: {exc}"}, ensure_ascii=False),
)
]
async def main():
auto = AutoIngestor(manager)
auto.start()
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
init_opts = server.create_initialization_options()
await server.run(read_stream, write_stream, init_opts)
if __name__ == "__main__":
import asyncio
asyncio.run(main())