diff --git a/core/fay_core.py b/core/fay_core.py index 434202b..7e023b5 100644 --- a/core/fay_core.py +++ b/core/fay_core.py @@ -190,9 +190,18 @@ class FeiFei: username = interact.data.get("user", "User") uid = member_db.new_instance().find_user(username) if index == 1: #语音、文字交互 - # 用户发送新消息,重置中断标志,开始新对话 - stream_manager.new_instance().set_stop_generation(username, False) - util.printInfo(1, username, "用户新输入,重置中断标志") + util.printInfo(1, username, "重置中断标志,开始新的对话处理") + # 切换到新会话,令上一会话的流式输出/音频尽快结束 + try: + sm = stream_manager.new_instance() + new_version = sm.bump_session(username) + # 将当前会话版本附加到交互数据 + interact.data["session_version"] = new_version + # 允许新的生成 + sm.set_stop_generation(username, stop=False) + except Exception: + pass + # 已通过上方 sm.set_stop_generation(username, False) 复位 #记录用户问题,方便obs等调用 self.write_to_file("./logs", "asr_result.txt", interact.data["msg"]) @@ -256,7 +265,7 @@ class FeiFei: file.flush() os.fsync(file.fileno()) - #触发语音交互 + #触发交互 def on_interact(self, interact: Interact): #创建用户 if interact.interact_type == 1: @@ -282,6 +291,14 @@ class FeiFei: uid = member_db.new_instance().find_user(interact.data.get("user")) is_end = interact.data.get("isend", False) is_first = interact.data.get("isfirst", False) + # 提前进行会话有效性与中断检查,避免产生多余面板/数字人输出 + try: + user_for_stop = interact.data.get("user", "User") + session_ver = interact.data.get("session_version") + if stream_manager.new_instance().should_stop_generation(user_for_stop, session_version=session_ver): + return None + except Exception: + pass if is_first == True: conv = "conv_" + str(uuid.uuid4()) conv_no = 0 @@ -292,7 +309,14 @@ class FeiFei: if not is_first and not is_end and (text is None or text.strip() == ""): return None - self.__send_panel_message(text, interact.data.get('user'), uid, 0, type) + # 仅在会话有效时才发送面板消息 + try: + user_for_stop = interact.data.get("user", "User") + session_ver = interact.data.get("session_version") + if not stream_manager.new_instance().should_stop_generation(user_for_stop, session_version=session_ver): + self.__send_panel_message(text, interact.data.get('user'), uid, 0, type) + except Exception: + self.__send_panel_message(text, interact.data.get('user'), uid, 0, type) # 处理think标签 is_start_think = False @@ -318,11 +342,19 @@ class FeiFei: self.think_time_users[uid] = time.time() if self.think_mode_users.get(uid, False) and is_start_think: - if wsa_server.get_web_instance().is_connected(interact.data.get('user')): - wsa_server.get_web_instance().add_cmd({"panelMsg": "思考中...", "Username" : interact.data.get('user'), 'robot': f'{cfg.fay_url}/robot/Thinking.jpg'}) - if wsa_server.get_instance().is_connected(interact.data.get("user")): - content = {'Topic': 'human', 'Data': {'Key': 'log', 'Value': "思考中..."}, 'Username' : interact.data.get('user'), 'robot': f'{cfg.fay_url}/robot/Thinking.jpg'} - wsa_server.get_instance().add_cmd(content) + # 会话有效时才提示“思考中...” + try: + user_for_stop = interact.data.get("user", "User") + session_ver = interact.data.get("session_version") + should_block = stream_manager.new_instance().should_stop_generation(user_for_stop, session_version=session_ver) + except Exception: + should_block = False + if not should_block: + if wsa_server.get_web_instance().is_connected(interact.data.get('user')): + wsa_server.get_web_instance().add_cmd({"panelMsg": "思考中...", "Username" : interact.data.get('user'), 'robot': f'{cfg.fay_url}/robot/Thinking.jpg'}) + if wsa_server.get_instance().is_connected(interact.data.get("user")): + content = {'Topic': 'human', 'Data': {'Key': 'log', 'Value': "思考中..."}, 'Username' : interact.data.get('user'), 'robot': f'{cfg.fay_url}/robot/Thinking.jpg'} + wsa_server.get_instance().add_cmd(content) if self.think_mode_users.get(uid, False) == True and time.time() - self.think_time_users[uid] >= 5: self.think_time_users[uid] = time.time() text = "请稍等..." @@ -339,8 +371,11 @@ class FeiFei: result = self.download_wav(audio_url, './samples/', file_name) elif config_util.config["interact"]["playSound"] or wsa_server.get_instance().is_connected(interact.data.get("user")) or self.__is_send_remote_device_audio(interact):#tts if text != None and text.replace("*", "").strip() != "": - # 检查是否需要停止TTS处理 - if stream_manager.new_instance().should_stop_generation(interact.data.get("user", "User")): + # 检查是否需要停止TTS处理(按会话) + if stream_manager.new_instance().should_stop_generation( + interact.data.get("user", "User"), + session_version=interact.data.get("session_version") + ): util.printInfo(1, interact.data.get('user'), 'TTS处理被打断,跳过音频合成') return None @@ -350,6 +385,14 @@ class FeiFei: util.printInfo(1, interact.data.get('user'), '合成音频...') tm = time.time() result = self.sp.to_sample(filtered_text, self.__get_mood_voice()) + # 合成完成后再次检查会话是否仍有效,避免继续输出旧会话结果 + try: + user_for_stop = interact.data.get("user", "User") + session_ver = interact.data.get("session_version") + if stream_manager.new_instance().should_stop_generation(user_for_stop, session_version=session_ver): + return None + except Exception: + pass util.printInfo(1, interact.data.get("user"), "合成音频完成. 耗时: {} ms 文件:{}".format(math.floor((time.time() - tm) * 1000), result)) else: if is_end and wsa_server.get_web_instance().is_connected(interact.data.get('user')): @@ -486,6 +529,14 @@ class FeiFei: #输出音频处理 def __process_output_audio(self, file_url, interact, text): try: + # 会话有效性与中断检查(最早返回,避免向面板/数字人发送任何旧会话输出) + try: + user_for_stop = interact.data.get("user", "User") + session_ver = interact.data.get("session_version") + if stream_manager.new_instance().should_stop_generation(user_for_stop, session_version=session_ver): + return + except Exception: + pass try: if file_url is None: audio_length = 0 @@ -521,8 +572,11 @@ class FeiFei: #面板播放 config_util.load_config() if config_util.config["interact"]["playSound"]: - # 检查是否需要停止音频播放 - if stream_manager.new_instance().should_stop_generation(interact.data.get("user", "User")): + # 检查是否需要停止音频播放(按会话) + if stream_manager.new_instance().should_stop_generation( + interact.data.get("user", "User"), + session_version=interact.data.get("session_version") + ): util.printInfo(1, interact.data.get('user'), '音频播放被打断,跳过加入播放队列') return self.sound_query.put((file_url, audio_length, interact)) diff --git a/core/stream_manager.py b/core/stream_manager.py index cba9e0e..4b77428 100644 --- a/core/stream_manager.py +++ b/core/stream_manager.py @@ -34,7 +34,9 @@ class StreamManager: """ if hasattr(self, '_initialized') and self._initialized: return - self.lock = threading.Lock() # 线程锁,用于保护streams字典的访问 + # 使用两个独立的锁,避免死锁 + self.stream_lock = threading.RLock() # 流读写操作锁(可重入锁,允许同一线程多次获取) + self.control_lock = threading.Lock() # 控制标志锁(用于停止生成标志) self.streams = {} # 存储用户ID到句子缓存的映射 self.nlp_streams = {} # 存储用户ID到句子缓存的映射 self.max_sentences = max_sentences # 最大句子缓存数量 @@ -43,16 +45,34 @@ class StreamManager: self._initialized = True # 标记是否已初始化 self.msgid = "" # 消息ID self.stop_generation_flags = {} # 存储用户的停止生成标志 + self.session_versions = {} # 存储每个用户的会话版本(单调递增) - def get_Stream(self, username): + def bump_session(self, username): """ - 获取指定用户ID的文本流,如果不存在则创建新的(线程安全) + 切换到新会话:为用户的会话版本号 +1 并返回新版本号。 + """ + with self.control_lock: + current = self.session_versions.get(username, 0) + current += 1 + self.session_versions[username] = current + return current + + def get_session_version(self, username): + """获取用户当前会话版本(不存在则为0)。""" + with self.control_lock: + return self.session_versions.get(username, 0) + + def is_session_valid(self, username, version): + """检查给定版本是否仍为该用户的当前会话版本。""" + with self.control_lock: + return version == self.session_versions.get(username, 0) + + def _get_Stream_internal(self, username): + """ + 内部方法:获取指定用户ID的文本流(不加锁,调用者必须已持有stream_lock) :param username: 用户名 :return: 对应的句子缓存对象 """ - # 注意:这个方法应该在已经获得锁的情况下调用 - # 如果从外部调用,需要先获得锁 - if username not in self.streams or username not in self.nlp_streams: # 创建新的流缓存 self.streams[username] = stream_sentence.SentenceCache(self.max_sentences) @@ -67,6 +87,16 @@ class StreamManager: thread.start() return self.streams[username], self.nlp_streams[username] + + def get_Stream(self, username): + """ + 获取指定用户ID的文本流,如果不存在则创建新的(线程安全) + :param username: 用户名 + :return: 对应的句子缓存对象 + """ + # 使用stream_lock保护流的读写操作 + with self.stream_lock: + return self._get_Stream_internal(username) def write_sentence(self, username, sentence): """ @@ -79,18 +109,31 @@ class StreamManager: if len(sentence) > 10240: # 10KB限制 sentence = sentence[:10240] - # 使用锁保护获取和写入操作 - with self.lock: - # 检查是否包含_标记(可能在句子中间) - if '_' in sentence: - # 清空文本流 + # 若当前处于停止状态且这不是新会话的首句,则丢弃写入,避免残余输出 + try: + with self.control_lock: + if self.stop_generation_flags.get(username, False) and ('_' not in sentence): + return False + except Exception: + pass + + # 检查是否包含_标记(可能在句子中间) + if '_' in sentence: + # 直接使用stream_lock清除文本流 + with self.stream_lock: self._clear_Stream_internal(username) - # 清空音频队列(打断时需要清空音频) - self._clear_audio_queue(username) - # 重置停止生成标志,开始新的对话 - self._set_stop_generation_internal(username, False) + # 清空音频队列(Queue本身线程安全,不需要锁) + self._clear_audio_queue(username) + + # 收到新处理的第一个句子,重置停止标志,允许后续处理 + with self.control_lock: + self.stop_generation_flags[username] = False + + # 使用stream_lock保护写入操作 + with self.stream_lock: try: - Stream, nlp_Stream = self.get_Stream(username) + # 使用内部方法避免重复加锁 + Stream, nlp_Stream = self._get_Stream_internal(username) success = Stream.write(sentence) nlp_success = nlp_Stream.write(sentence) return success and nlp_success @@ -114,61 +157,97 @@ class StreamManager: :param username: 用户名 :param stop: 是否停止,默认True """ - with self.lock: + with self.control_lock: self.stop_generation_flags[username] = stop - def should_stop_generation(self, username): + def should_stop_generation(self, username, session_version=None): """ 检查指定用户是否应该停止生成 :param username: 用户名 :return: 是否应该停止 """ - with self.lock: - return self.stop_generation_flags.get(username, False) + with self.control_lock: + flag = self.stop_generation_flags.get(username, False) + if flag: + return True + if session_version is not None: + if session_version != self.session_versions.get(username, 0): + return True + return False - def _set_stop_generation_internal(self, username, stop=True): - """ - 设置停止生成标志的内部方法(不使用锁,调用者必须已持有锁) - :param username: 用户名 - :param stop: 是否停止,默认True - """ - self.stop_generation_flags[username] = stop + # 内部方法已移除,直接使用带锁的公共方法 - def _should_stop_generation_internal(self, username): + def _clear_user_specific_audio(self, username, sound_queue): """ - 检查停止生成标志的内部方法(不使用锁,调用者必须已持有锁) - :param username: 用户名 - :return: 是否应该停止 + 清理特定用户的音频队列项,保留其他用户的音频 + :param username: 要清理的用户名 + :param sound_queue: 音频队列 """ - return self.stop_generation_flags.get(username, False) + import queue + from utils import util + temp_items = [] + + # 使用非阻塞方式提取所有项,避免死锁 + try: + while True: + item = sound_queue.get_nowait() # 非阻塞获取 + file_url, audio_length, interact = item + item_user = interact.data.get('user', '') + if item_user != username: + temp_items.append(item) # 保留非目标用户的项 + # 目标用户的项直接丢弃(不添加到 temp_items) + except queue.Empty: + # 队列空了,正常退出循环 + pass + + # 将保留的项重新放入队列(使用非阻塞方式) + for item in temp_items: + try: + sound_queue.put_nowait(item) # 非阻塞放入 + except queue.Full: + # 队列满的情况很少见,如果发生则记录日志 + util.printInfo(1, username, "音频队列已满,跳过部分音频项") + break + def _clear_audio_queue(self, username): """ - 清空指定用户的音频队列并停止文本生成 + 清空指定用户的音频队列 :param username: 用户名 + 注意:此方法假设调用者已持有必要的锁 """ - import queue fay_core = fay_booter.feiFei - fay_core.sound_query = queue.Queue() - # 设置停止生成标志,阻止新的文本继续生成和转换为音频 - self._set_stop_generation_internal(username, True) + # 只清理特定用户的音频项,保留其他用户的音频 + self._clear_user_specific_audio(username, fay_core.sound_query) def clear_Stream(self, username): """ 清除指定用户ID的文本流数据(外部调用接口,仅清除文本流) :param username: 用户名 """ - with self.lock: + # 直接使用stream_lock,不再需要clear_lock + with self.stream_lock: self._clear_Stream_internal(username) def clear_Stream_with_audio(self, username): """ 清除指定用户ID的文本流数据和音频队列(完全清除) + 注意:分步操作,避免锁嵌套 :param username: 用户名 """ - with self.lock: + # 第一步:切换会话版本,令现有读/写循环尽快退出 + self.bump_session(username) + + # 第二步:设置停止标志(独立操作) + with self.control_lock: + self.stop_generation_flags[username] = True + + # 第三步:清除文本流(独立操作) + with self.stream_lock: self._clear_Stream_internal(username) - self._clear_audio_queue(username) + + # 第四步:清除音频队列(Queue线程安全,不需要锁) + self._clear_audio_queue(username) def listen(self, username, stream, nlp_stream): while self.running: @@ -184,22 +263,31 @@ class StreamManager: :param username: 用户名 :param sentence: 要处理的句子 """ - # 使用临时变量避免多次锁获取 - should_stop = False - with self.lock: - should_stop = self._should_stop_generation_internal(username) - - # 检查是否应该停止生成,如果是则不处理新的句子 + # 检查停止标志(使用control_lock) + with self.control_lock: + should_stop = self.stop_generation_flags.get(username, False) + if should_stop: return - - fay_core = fay_booter.feiFei + # 进一步进行基于会话版本的快速拦截(避免进入下游 say) + try: + current_version = self.get_session_version(username) + if self.should_stop_generation(username, session_version=current_version): + return + except Exception: + pass + + # 处理句子标记(无锁,避免长时间持有锁) is_first = "_" in sentence is_end = "_" in sentence sentence = sentence.replace("_", "").replace("_", "") - - if sentence or is_first or is_end : - interact = Interact("stream", 1, {"user": username, "msg": sentence, "isfirst" : is_first, "isend" : is_end}) + + # 执行实际处理(无锁,避免死锁) + if sentence or is_first or is_end: + fay_core = fay_booter.feiFei + # 附带当前会话版本,方便下游按会话控制输出 + session_version = self.get_session_version(username) + interact = Interact("stream", 1, {"user": username, "msg": sentence, "isfirst": is_first, "isend": is_end, "session_version": session_version}) fay_core.say(interact, sentence) # 调用核心处理模块进行响应 - time.sleep(0.01) # 短暂休眠以控制处理频率 \ No newline at end of file + time.sleep(0.01) # 短暂休眠以控制处理频率 diff --git a/gui/flask_server.py b/gui/flask_server.py index c3270a1..0944e61 100644 --- a/gui/flask_server.py +++ b/gui/flask_server.py @@ -15,6 +15,11 @@ import uuid import fay_booter from tts import tts_voice from gevent import pywsgi +try: + # Use gevent.sleep to avoid blocking the gevent loop; fallback to time.sleep if unavailable + from gevent import sleep as gsleep +except Exception: + from time import sleep as gsleep from scheduler.thread_manager import MyThread from utils import config_util, util from core import wsa_server @@ -270,6 +275,12 @@ def api_send(): if not username or not msg: return jsonify({'result': 'error', 'message': '用户名和消息内容不能为空'}) msg = msg.strip() + + # 新消息到达,立即中断该用户之前的所有处理(文本流+音频队列) + util.printInfo(1, username, f'[API中断] 新消息到达,完整中断用户 {username} 之前的所有处理') + stream_manager.new_instance().clear_Stream_with_audio(username) + util.printInfo(1, username, f'[API中断] 用户 {username} 的文本流和音频队列已清空,准备处理新消息') + interact = Interact("text", 1, {'user': username, 'msg': msg}) util.printInfo(1, username, '[文字发送按钮]{}'.format(interact.data["msg"]), time.time()) fay_booter.feiFei.on_interact(interact) @@ -398,12 +409,18 @@ def adopt_msg(): return jsonify({'status':'error', 'msg': f'采纳消息时出错: {e}'}), 500 def gpt_stream_response(last_content, username): - _, nlp_Stream = stream_manager.new_instance().get_Stream(username) + sm = stream_manager.new_instance() + _, nlp_Stream = sm.get_Stream(username) + session_version = sm.get_session_version(username) def generate(): while True: + # If interrupted or session switched, end the SSE stream promptly + if sm.should_stop_generation(username, session_version=session_version): + yield 'data: [DONE]\n\n' + break sentence = nlp_Stream.read() if sentence is None: - time.sleep(0.01) + gsleep(0.01) continue # 处理特殊标记 @@ -439,19 +456,24 @@ def gpt_stream_response(last_content, username): yield f"data: {json.dumps(message)}\n\n" if is_end: break - time.sleep(0.01) + gsleep(0.01) yield 'data: [DONE]\n\n' return Response(generate(), mimetype='text/event-stream') # 处理非流式响应 def non_streaming_response(last_content, username): - _, nlp_Stream = stream_manager.new_instance().get_Stream(username) + sm = stream_manager.new_instance() + _, nlp_Stream = sm.get_Stream(username) + session_version = sm.get_session_version(username) text = "" while True: + # If interrupted or session switched, stop waiting and return what we have + if sm.should_stop_generation(username, session_version=session_version): + break sentence = nlp_Stream.read() if sentence is None: - time.sleep(0.01) + gsleep(0.01) continue # 处理特殊标记 @@ -561,19 +583,21 @@ def to_stop_talking(): try: data = request.get_json() username = data.get('username', 'User') - message = data.get('text', '你好,请说?') observation = data.get('observation', '') - from queue import Queue + + util.printInfo(1, username, f"开始执行打断操作,清空用户 {username} 的处理队列") stream_manager.new_instance().clear_Stream_with_audio(username) - # 打断操作完成,不输出任何内容,避免时序冲突 - util.printInfo(1, username, "执行打断操作,清空所有处理队列") + util.printInfo(1, username, f"打断操作完成,用户 {username} 的所有队列已清空") + result = "interrupted" # 简单的结果标识 return jsonify({ 'status': 'success', 'data': str(result) if result is not None else '', - 'msg': '已停止说话' + 'msg': f'已停止用户 {username} 的说话' }), 200 except Exception as e: + username_str = username if 'username' in locals() else 'Unknown' + util.printInfo(1, username_str, f"打断操作失败: {str(e)}") return jsonify({ 'status': 'error', 'msg': str(e) diff --git a/llm/nlp_cognitive_stream.py b/llm/nlp_cognitive_stream.py index 982f258..5455aea 100644 --- a/llm/nlp_cognitive_stream.py +++ b/llm/nlp_cognitive_stream.py @@ -723,6 +723,11 @@ def question(content, username, observation=None): punctuation_marks = [",", ",","。", "!", "?", ".", "!", "?", "\n"] is_first_sentence = True + # 记录当前会话版本,用于精准中断 + from core import stream_manager + sm = stream_manager.new_instance() + session_version = sm.get_session_version(username) + # 创建代理 agent = create_agent(username) @@ -831,7 +836,7 @@ def question(content, username, observation=None): {"messages": messages}, {"configurable": {"thread_id": "tid{}".format(username)}} ): # 检查是否需要停止生成 - if stream_manager.new_instance().should_stop_generation(username): + if sm.should_stop_generation(username, session_version=session_version): util.log(1, f"检测到停止标志,中断React Agent文本生成: {username}") break @@ -909,21 +914,22 @@ def question(content, username, observation=None): full_response_text += react_response_text - # 确保React Agent最后一段文本也被发送,并标记为结束 + # 确保React Agent最后一段文本也被发送,并标记为结束(若会话未被取消) from utils.stream_state_manager import get_state_manager state_manager = get_state_manager() - if accumulated_text: - # 使用状态管理器准备最后的文本,强制标记为结束 - marked_text, _, _ = state_manager.prepare_sentence(username, accumulated_text, force_end=True) - stream_manager.new_instance().write_sentence(username, marked_text) - else: - # 如果没有剩余文本,检查是否需要发送结束标记 - session_info = state_manager.get_session_info(username) - if session_info and not session_info.get('is_end_sent', False): - # 发送一个空的结束标记 - marked_text, _, _ = state_manager.prepare_sentence(username, "", force_end=True) + if not sm.should_stop_generation(username, session_version=session_version): + if accumulated_text: + # 使用状态管理器准备最后的文本,强制标记为结束 + marked_text, _, _ = state_manager.prepare_sentence(username, accumulated_text, force_end=True) stream_manager.new_instance().write_sentence(username, marked_text) + else: + # 如果没有剩余文本,检查是否需要发送结束标记 + session_info = state_manager.get_session_info(username) + if session_info and not session_info.get('is_end_sent', False): + # 发送一个空的结束标记 + marked_text, _, _ = state_manager.prepare_sentence(username, "", force_end=True) + stream_manager.new_instance().write_sentence(username, marked_text) else: @@ -931,7 +937,7 @@ def question(content, username, observation=None): # 2.2 使用全局定义的llm对象进行流式请求 for chunk in llm.stream(messages): # 检查是否需要停止生成 - if stream_manager.new_instance().should_stop_generation(username): + if sm.should_stop_generation(username, session_version=session_version): util.log(1, f"检测到停止标志,中断LLM文本生成: {username}") break @@ -967,27 +973,30 @@ def question(content, username, observation=None): accumulated_text = accumulated_text[last_punct_pos + 1:].lstrip() full_response_text += flush_text - # 确保最后一段文本也被发送,并标记为结束 + # 确保最后一段文本也被发送,并标记为结束(若会话未被取消) from utils.stream_state_manager import get_state_manager state_manager = get_state_manager() - if accumulated_text: - # 使用状态管理器准备最后的文本,强制标记为结束 - marked_text, _, _ = state_manager.prepare_sentence(username, accumulated_text, force_end=True) - stream_manager.new_instance().write_sentence(username, marked_text) - else: - # 如果没有剩余文本,检查是否需要发送结束标记 - session_info = state_manager.get_session_info(username) - if session_info and not session_info.get('is_end_sent', False): - # 发送一个空的结束标记 - marked_text, _, _ = state_manager.prepare_sentence(username, "", force_end=True) + if not sm.should_stop_generation(username, session_version=session_version): + if accumulated_text: + # 使用状态管理器准备最后的文本,强制标记为结束 + marked_text, _, _ = state_manager.prepare_sentence(username, accumulated_text, force_end=True) stream_manager.new_instance().write_sentence(username, marked_text) + else: + # 如果没有剩余文本,检查是否需要发送结束标记 + session_info = state_manager.get_session_info(username) + if session_info and not session_info.get('is_end_sent', False): + # 发送一个空的结束标记 + marked_text, _, _ = state_manager.prepare_sentence(username, "", force_end=True) + stream_manager.new_instance().write_sentence(username, marked_text) except requests.exceptions.RequestException as e: util.log(1, f"请求失败: {e}") error_message = "抱歉,我现在太忙了,休息一会,请稍后再试。" - stream_manager.new_instance().write_sentence(username, "_" + error_message + "_") + # 会话未被取消时才发送错误提示 + if not sm.should_stop_generation(username, session_version=session_version): + stream_manager.new_instance().write_sentence(username, "_" + error_message + "_") full_response_text = error_message # 结束会话(不再需要发送额外的结束标记)