From 95be7648a1936dc1a6d20436121ea71d0e8802ca Mon Sep 17 00:00:00 2001 From: guo zebin Date: Thu, 4 Sep 2025 21:33:18 +0800 Subject: [PATCH] =?UTF-8?q?=E7=B4=A7=E6=80=A5=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 修复接口打断bug。 --- core/fay_core.py | 50 ++++++++++++++++++++++++++++++---- core/stream_manager.py | 50 +++++++++++++++++++++++++++++++++- gui/flask_server.py | 10 ++++--- llm/nlp_cognitive_stream.py | 12 +++++++- utils/stream_text_processor.py | 17 ++++++++++-- 5 files changed, 126 insertions(+), 13 deletions(-) diff --git a/core/fay_core.py b/core/fay_core.py index fea0f49..434202b 100644 --- a/core/fay_core.py +++ b/core/fay_core.py @@ -145,6 +145,32 @@ class FeiFei: # 失败时也要确保结束会话 state_manager.force_reset_user_state(username) + def __process_stream_output(self, text, username, session_type="type2_stream"): + """ + 按流式方式分割和发送 type=2 的文本 + 使用安全的流式文本处理器和状态管理器 + """ + if not text or text.strip() == "": + return + + # 使用安全的流式文本处理器 + from utils.stream_text_processor import get_processor + from utils.stream_state_manager import get_state_manager + + processor = get_processor() + state_manager = get_state_manager() + + # 处理流式文本,is_qa=False表示普通模式 + success = processor.process_stream_text(text, username, is_qa=False, session_type=session_type) + + if success: + # 普通模式结束会话 + state_manager.end_session(username) + else: + util.log(1, f"type=2流式处理失败,文本长度: {len(text)}") + # 失败时也要确保结束会话 + state_manager.force_reset_user_state(username) + #语音消息处理检查是否命中q&a def __get_answer(self, interleaver, text): answer = None @@ -156,14 +182,18 @@ class FeiFei: return None, None - #语音消息处理 + #消息处理 def __process_interact(self, interact: Interact): if self.__running: try: index = interact.interact_type username = interact.data.get("user", "User") uid = member_db.new_instance().find_user(username) - if index == 1: #语音文字交互 + if index == 1: #语音、文字交互 + # 用户发送新消息,重置中断标志,开始新对话 + stream_manager.new_instance().set_stop_generation(username, False) + util.printInfo(1, username, "用户新输入,重置中断标志") + #记录用户问题,方便obs等调用 self.write_to_file("./logs", "asr_result.txt", interact.data["msg"]) @@ -204,9 +234,10 @@ class FeiFei: if interact.data.get("text"): text = interact.data.get("text") - # 使用统一的文本处理方法,空列表表示没有额外回复 + # 使用统一的文本处理方法 self.__process_text_output(text, username, uid) - MyThread(target=self.say, args=[interact, text]).start() + # 使用流式处理,按标点分割发送 + self.__process_stream_output(text, username, f"type2_{interact.interleaver}") return 'success' except BaseException as e: @@ -308,6 +339,11 @@ class FeiFei: result = self.download_wav(audio_url, './samples/', file_name) elif config_util.config["interact"]["playSound"] or wsa_server.get_instance().is_connected(interact.data.get("user")) or self.__is_send_remote_device_audio(interact):#tts if text != None and text.replace("*", "").strip() != "": + # 检查是否需要停止TTS处理 + if stream_manager.new_instance().should_stop_generation(interact.data.get("user", "User")): + util.printInfo(1, interact.data.get('user'), 'TTS处理被打断,跳过音频合成') + return None + # 先过滤表情符号,然后再合成语音 filtered_text = self.__remove_emojis(text.replace("*", "")) if filtered_text is not None and filtered_text.strip() != "": @@ -485,7 +521,11 @@ class FeiFei: #面板播放 config_util.load_config() if config_util.config["interact"]["playSound"]: - self.sound_query.put((file_url, audio_length, interact)) + # 检查是否需要停止音频播放 + if stream_manager.new_instance().should_stop_generation(interact.data.get("user", "User")): + util.printInfo(1, interact.data.get('user'), '音频播放被打断,跳过加入播放队列') + return + self.sound_query.put((file_url, audio_length, interact)) else: if wsa_server.get_web_instance().is_connected(interact.data.get('user')): wsa_server.get_web_instance().add_cmd({"panelMsg": "", 'Username' : interact.data.get('user'), 'robot': f'{cfg.fay_url}/robot/Normal.jpg'}) diff --git a/core/stream_manager.py b/core/stream_manager.py index 8aefab5..cba9e0e 100644 --- a/core/stream_manager.py +++ b/core/stream_manager.py @@ -42,6 +42,7 @@ class StreamManager: self.running = True # 控制监听线程的运行状态 self._initialized = True # 标记是否已初始化 self.msgid = "" # 消息ID + self.stop_generation_flags = {} # 存储用户的停止生成标志 def get_Stream(self, username): """ @@ -86,6 +87,8 @@ class StreamManager: self._clear_Stream_internal(username) # 清空音频队列(打断时需要清空音频) self._clear_audio_queue(username) + # 重置停止生成标志,开始新的对话 + self._set_stop_generation_internal(username, False) try: Stream, nlp_Stream = self.get_Stream(username) success = Stream.write(sentence) @@ -105,14 +108,50 @@ class StreamManager: if username in self.nlp_streams: self.nlp_streams[username].clear() + def set_stop_generation(self, username, stop=True): + """ + 设置指定用户的停止生成标志 + :param username: 用户名 + :param stop: 是否停止,默认True + """ + with self.lock: + self.stop_generation_flags[username] = stop + + def should_stop_generation(self, username): + """ + 检查指定用户是否应该停止生成 + :param username: 用户名 + :return: 是否应该停止 + """ + with self.lock: + return self.stop_generation_flags.get(username, False) + + def _set_stop_generation_internal(self, username, stop=True): + """ + 设置停止生成标志的内部方法(不使用锁,调用者必须已持有锁) + :param username: 用户名 + :param stop: 是否停止,默认True + """ + self.stop_generation_flags[username] = stop + + def _should_stop_generation_internal(self, username): + """ + 检查停止生成标志的内部方法(不使用锁,调用者必须已持有锁) + :param username: 用户名 + :return: 是否应该停止 + """ + return self.stop_generation_flags.get(username, False) + def _clear_audio_queue(self, username): """ - 清空指定用户的音频队列 + 清空指定用户的音频队列并停止文本生成 :param username: 用户名 """ import queue fay_core = fay_booter.feiFei fay_core.sound_query = queue.Queue() + # 设置停止生成标志,阻止新的文本继续生成和转换为音频 + self._set_stop_generation_internal(username, True) def clear_Stream(self, username): """ @@ -145,6 +184,15 @@ class StreamManager: :param username: 用户名 :param sentence: 要处理的句子 """ + # 使用临时变量避免多次锁获取 + should_stop = False + with self.lock: + should_stop = self._should_stop_generation_internal(username) + + # 检查是否应该停止生成,如果是则不处理新的句子 + if should_stop: + return + fay_core = fay_booter.feiFei is_first = "_" in sentence diff --git a/gui/flask_server.py b/gui/flask_server.py index 23fb17d..c3270a1 100644 --- a/gui/flask_server.py +++ b/gui/flask_server.py @@ -309,6 +309,7 @@ def api_get_Msg(): except Exception as e: return jsonify({'list': [], 'message': f'获取消息时出错: {e}'}), 500 +#文字沟通接口 @__app.route('/v1/chat/completions', methods=['post']) @__app.route('/api/send/v1/chat/completions', methods=['post']) def api_send_v1_chat_completions(): @@ -563,9 +564,10 @@ def to_stop_talking(): message = data.get('text', '你好,请说?') observation = data.get('observation', '') from queue import Queue - stream_manager.clear_Stream_with_audio(username) - interact = Interact("stop_talking", 2, {'user': username, 'text': message, 'observation': str(observation)}) - result = fay_booter.feiFei.on_interact(interact) + stream_manager.new_instance().clear_Stream_with_audio(username) + # 打断操作完成,不输出任何内容,避免时序冲突 + util.printInfo(1, username, "执行打断操作,清空所有处理队列") + result = "interrupted" # 简单的结果标识 return jsonify({ 'status': 'success', 'data': str(result) if result is not None else '', @@ -595,7 +597,7 @@ def transparent_pass(): success = fay_booter.feiFei.on_interact(interact) if (success == 'success'): return jsonify({'code': 200, 'message' : '成功'}) - return jsonify({'code': 500, 'message' : '未错原因出错'}) + return jsonify({'code': 500, 'message' : '未知原因出错'}) except Exception as e: return jsonify({'code': 500, 'message': f'出错: {e}'}), 500 diff --git a/llm/nlp_cognitive_stream.py b/llm/nlp_cognitive_stream.py index 21f78e2..982f258 100644 --- a/llm/nlp_cognitive_stream.py +++ b/llm/nlp_cognitive_stream.py @@ -754,7 +754,7 @@ def question(content, username, observation=None): related_memories = agent.memory_stream.retrieve( [f"""{"主人" if username == "User" else username}提出了问题:{content}"""], # 查询句子列表 current_time_step, # 当前时间步 - n_count=100, # 获取5条相关记忆 + n_count=100, # 获取100条相关记忆 curr_filter="all", # 获取所有类型的记忆 hp=[0.8, 0.5, 0.5], # 权重:[时间近度权重recency_w, 相关性权重relevance_w, 重要性权重importance_w] stateless=False @@ -830,6 +830,11 @@ def question(content, username, observation=None): for chunk in react_agent.stream( {"messages": messages}, {"configurable": {"thread_id": "tid{}".format(username)}} ): + # 检查是否需要停止生成 + if stream_manager.new_instance().should_stop_generation(username): + util.log(1, f"检测到停止标志,中断React Agent文本生成: {username}") + break + react_response_text = "" # 消息类型1:检测工具调用开始 if "agent" in chunk and "tool_calls" in str(chunk): @@ -925,6 +930,11 @@ def question(content, username, observation=None): try: # 2.2 使用全局定义的llm对象进行流式请求 for chunk in llm.stream(messages): + # 检查是否需要停止生成 + if stream_manager.new_instance().should_stop_generation(username): + util.log(1, f"检测到停止标志,中断LLM文本生成: {username}") + break + flush_text = chunk.content if not flush_text: continue diff --git a/utils/stream_text_processor.py b/utils/stream_text_processor.py index fc44e74..bc0c715 100644 --- a/utils/stream_text_processor.py +++ b/utils/stream_text_processor.py @@ -69,6 +69,7 @@ class StreamTextProcessor: util.log(1, f"文本已截断到: {len(accumulated_text)} 字符") # 主处理循环,带安全保护 + first_sentence_sent = False # 跟踪是否已发送第一个句子 while accumulated_text and iteration_count < self.max_iterations: # 超时检查 if time.time() - start_time > self.timeout_seconds: @@ -97,12 +98,15 @@ class StreamTextProcessor: if len(sentence_text) >= self.min_length: # 使用状态管理器准备句子 marked_text, is_first, is_end = state_manager.prepare_sentence( - username, sentence_text, force_first=False, force_end=False + username, sentence_text, + force_first=(not first_sentence_sent), # 第一句=True,其他=False + force_end=False ) success = stream_manager.new_instance().write_sentence(username, marked_text) if success: accumulated_text = accumulated_text[punct_index + 1:].lstrip() + first_sentence_sent = True # 标记已发送第一个句子 sent_successfully = True break else: @@ -115,7 +119,16 @@ class StreamTextProcessor: # 发送剩余文本,如果是最后的文本则标记为结束 if accumulated_text: marked_text, _, _ = state_manager.prepare_sentence( - username, accumulated_text, force_first=False, force_end=True + username, accumulated_text, + force_first=(not first_sentence_sent), # 如果还没发送过句子,这是第一个 + force_end=True + ) + stream_manager.new_instance().write_sentence(username, marked_text) + first_sentence_sent = True + elif not first_sentence_sent: + # 如果整个文本都没有找到合适的分割点,作为完整句子发送 + marked_text, _, _ = state_manager.prepare_sentence( + username, text, force_first=True, force_end=True ) stream_manager.new_instance().write_sentence(username, marked_text) else: