From a2214f0b62b5813ac09061abc5aed151d17809a1 Mon Sep 17 00:00:00 2001 From: guo zebin Date: Mon, 15 Sep 2025 22:17:45 +0800 Subject: [PATCH] =?UTF-8?q?=E8=87=AA=E7=84=B6=E8=BF=9B=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1、修复qa标签误输到类gpt接口; 2、优化打断操作介入时机。 --- core/fay_core.py | 30 ++++++++++++---------------- core/stream_manager.py | 41 ++++++++++++++++++++------------------ gui/flask_server.py | 45 +++++++++++++++++++----------------------- 3 files changed, 55 insertions(+), 61 deletions(-) diff --git a/core/fay_core.py b/core/fay_core.py index ee8e9d6..741cc20 100644 --- a/core/fay_core.py +++ b/core/fay_core.py @@ -73,7 +73,7 @@ class FeiFei: self.speaking = False #声音是否在播放 self.__running = True self.sp.connect() #TODO 预连接 - self.cemotion = None + self.timer = None self.sound_query = Queue() self.think_mode_users = {} # 使用字典存储每个用户的think模式状态 @@ -164,6 +164,13 @@ class FeiFei: index = interact.interact_type username = interact.data.get("user", "User") uid = member_db.new_instance().find_user(username) + + try: + from utils.stream_state_manager import get_state_manager + if get_state_manager().is_session_active(username): + stream_manager.new_instance().clear_Stream_with_audio(username) + except Exception: + pass # 切换到新会话,令上一会话的流式输出/音频尽快结束 util.printInfo(1, username, "重置中断标志,开始新的对话处理") @@ -268,10 +275,8 @@ class FeiFei: if member_db.new_instance().is_username_exist(username) == "notexists": member_db.new_instance().add_user(username) # 判断调用来源,如果是非stream调用则同步处理 - if interact.data.get("stream", False): - MyThread(target=self.__process_interact, args=[interact]).start() - else: - return self.__process_interact(interact) + MyThread(target=self.__process_interact, args=[interact]).start() + else: return self.__process_interact(interact) @@ -296,7 +301,7 @@ class FeiFei: try: user_for_stop = interact.data.get("user", "User") conv_id_for_stop = interact.data.get("conversation_id") - if stream_manager.new_instance().should_stop_generation(user_for_stop, conversation_id=conv_id_for_stop): + if not is_end and stream_manager.new_instance().should_stop_generation(user_for_stop, conversation_id=conv_id_for_stop): return None except Exception: pass @@ -339,7 +344,7 @@ class FeiFei: try: user_for_stop = interact.data.get("user", "User") conv_id_for_stop = interact.data.get("conversation_id") - if not stream_manager.new_instance().should_stop_generation(user_for_stop, conversation_id=conv_id_for_stop): + if is_end or not stream_manager.new_instance().should_stop_generation(user_for_stop, conversation_id=conv_id_for_stop): self.__process_text_output(text, interact.data.get('user'), uid, content_id, type) except Exception: self.__process_text_output(text, interact.data.get('user'), uid, content_id, type) @@ -425,7 +430,7 @@ class FeiFei: wsa_server.get_web_instance().add_cmd({"panelMsg": "", 'Username' : interact.data.get('user'), 'robot': f'{cfg.fay_url}/robot/Normal.jpg'}) if result is not None or is_first or is_end: - if is_end:#如果结束标记,则延迟1秒处理,免得is end比前面的音频tts要快 + if is_end:#TODO 临时方案:如果结束标记,则延迟1秒处理,免得is end比前面的音频tts要快 time.sleep(1) MyThread(target=self.__process_output_audio, args=[result, interact, text]).start() return result @@ -610,12 +615,6 @@ class FeiFei: config_util.load_config() if config_util.config["interact"]["playSound"]: # 检查是否需要停止音频播放(按会话) - if stream_manager.new_instance().should_stop_generation( - interact.data.get("user", "User"), - conversation_id=interact.data.get("conversation_id") - ): - util.printInfo(1, interact.data.get('user'), '音频播放被打断,跳过加入播放队列') - return self.sound_query.put((file_url, audio_length, interact)) else: if wsa_server.get_web_instance().is_connected(interact.data.get('user')): @@ -648,9 +647,6 @@ class FeiFei: #启动核心服务 def start(self): - if cfg.ltp_mode == "cemotion": - from cemotion import Cemotion - self.cemotion = Cemotion() MyThread(target=self.__play_sound).start() #停止核心服务 diff --git a/core/stream_manager.py b/core/stream_manager.py index 06fe649..3b5473b 100644 --- a/core/stream_manager.py +++ b/core/stream_manager.py @@ -135,12 +135,6 @@ class StreamManager: # 检查是否包含_标记(可能在句子中间) if '_' in sentence: - # 直接使用stream_lock清除文本流 - with self.stream_lock: - self._clear_Stream_internal(username) - # 清空音频队列(Queue本身线程安全,不需要锁) - self._clear_audio_queue(username) - # 收到新处理的第一个句子,重置停止标志,允许后续处理 with self.control_lock: self.stop_generation_flags[username] = False @@ -154,7 +148,8 @@ class StreamManager: tag_cid = conversation_id if conversation_id is not None else current_cid tagged_sentence = f"{sentence}____" if tag_cid else sentence success = Stream.write(tagged_sentence) - nlp_success = nlp_Stream.write(sentence) + # 让 NLP 流也携带隐藏的会话ID,便于前端按会话过滤 + nlp_success = nlp_Stream.write(tagged_sentence) return success and nlp_success except Exception as e: print(f"写入句子时出错: {e}") @@ -170,6 +165,21 @@ class StreamManager: if username in self.nlp_streams: self.nlp_streams[username].clear() + # 清除后写入一条结束标记,分别通知主流与NLP流结束 + try: + # 确保流存在(监听线程也会在首次创建时启动) + stream, nlp_stream = self._get_Stream_internal(username) + cid = self.conversation_ids.get(username, "") + end_marker = "_" + # 主流带会话ID隐藏标签,供下游按会话拦截 + tagged = f"{end_marker}____" if cid else end_marker + stream.write(tagged) + # NLP 流也写入带会话ID的结束标记,前端会按会话过滤 + nlp_stream.write(tagged) + except Exception: + # 忽略写入哨兵失败 + pass + def set_stop_generation(self, username, stop=True): """ 设置指定用户的停止生成标志 @@ -241,15 +251,6 @@ class StreamManager: # 只清理特定用户的音频项,保留其他用户的音频 self._clear_user_specific_audio(username, fay_core.sound_query) - def clear_Stream(self, username): - """ - 清除指定用户ID的文本流数据(外部调用接口,仅清除文本流) - :param username: 用户名 - """ - # 直接使用stream_lock,不再需要clear_lock - with self.stream_lock: - self._clear_Stream_internal(username) - def clear_Stream_with_audio(self, username): """ 清除指定用户ID的文本流数据和音频队列(完全清除) @@ -262,13 +263,15 @@ class StreamManager: # 第二步:设置停止标志(独立操作) with self.control_lock: self.stop_generation_flags[username] = True + + # 第三步:清除音频队列(Queue线程安全,不需要锁) + self._clear_audio_queue(username) - # 第三步:清除文本流(独立操作) + # 第四步:清除文本流(独立操作) with self.stream_lock: self._clear_Stream_internal(username) - # 第四步:清除音频队列(Queue线程安全,不需要锁) - self._clear_audio_queue(username) + def listen(self, username, stream, nlp_stream): while self.running: diff --git a/gui/flask_server.py b/gui/flask_server.py index f58bf0c..f93e4e9 100644 --- a/gui/flask_server.py +++ b/gui/flask_server.py @@ -246,7 +246,7 @@ def api_start_live(): # 启动 try: fay_booter.start() - time.sleep(1) + gsleep(1) wsa_server.get_web_instance().add_cmd({"liveState": 1}) return '{"result":"successful"}' except Exception as e: @@ -257,7 +257,7 @@ def api_stop_live(): # 停止 try: fay_booter.stop() - time.sleep(1) + gsleep(1) wsa_server.get_web_instance().add_cmd({"liveState": 0}) return '{"result":"successful"}' except Exception as e: @@ -276,12 +276,7 @@ def api_send(): if not username or not msg: return jsonify({'result': 'error', 'message': '用户名和消息内容不能为空'}) msg = msg.strip() - - # 新消息到达,立即中断该用户之前的所有处理(文本流+音频队列) - util.printInfo(1, username, f'[API中断] 新消息到达,完整中断用户 {username} 之前的所有处理') - stream_manager.new_instance().clear_Stream_with_audio(username) - util.printInfo(1, username, f'[API中断] 用户 {username} 的文本流和音频队列已清空,准备处理新消息') - + interact = Interact("text", 1, {'user': username, 'msg': msg}) util.printInfo(1, username, '[文字发送按钮]{}'.format(interact.data["msg"]), time.time()) fay_booter.feiFei.on_interact(interact) @@ -343,12 +338,8 @@ def api_send_v1_chat_completions(): model = data.get('model', 'fay') observation = data.get('observation', '') - - # 检查请求中是否指定了流式传输 stream_requested = data.get('stream', False) - - # 优先使用请求中的stream参数,如果没有指定则使用配置中的设置 if stream_requested or model == 'fay-streaming': interact = Interact("text", 1, {'user': username, 'msg': last_content, 'observation': str(observation), 'stream':True}) util.printInfo(1, username, '[文字沟通接口(流式)]{}'.format(interact.data["msg"]), time.time()) @@ -416,7 +407,6 @@ def adopt_msg(): def gpt_stream_response(last_content, username): sm = stream_manager.new_instance() _, nlp_Stream = sm.get_Stream(username) - conversation_id = sm.get_conversation_id(username) def generate(): while True: sentence = nlp_Stream.read() @@ -424,10 +414,16 @@ def gpt_stream_response(last_content, username): gsleep(0.01) continue - # 处理特殊标记 + # 解析并移除隐藏会话ID标签 + try: + m = re.search(r"__]+)>__", sentence) + if m: + sentence = sentence.replace(m.group(0), "") + except Exception: + pass is_first = "_" in sentence is_end = "_" in sentence - content = sentence.replace("_", "").replace("_", "") + content = sentence.replace("_", "").replace("_", "").replace("_", "") if content or is_first or is_end: # 只有当有实际内容时才发送 message = { "id": "faystreaming-" + str(uuid.uuid4()), @@ -451,9 +447,6 @@ def gpt_stream_response(last_content, username): }, "system_fingerprint": "" } - if is_end: - if username in fay_booter.feiFei.nlp_streams: - stream_manager.new_instance().clear_Stream(username) yield f"data: {json.dumps(message)}\n\n" if is_end: break @@ -466,7 +459,6 @@ def gpt_stream_response(last_content, username): def non_streaming_response(last_content, username): sm = stream_manager.new_instance() _, nlp_Stream = sm.get_Stream(username) - conversation_id = sm.get_conversation_id(username) text = "" while True: sentence = nlp_Stream.read() @@ -475,12 +467,16 @@ def non_streaming_response(last_content, username): continue # 处理特殊标记 + try: + m = re.search(r"__]+)>__", sentence) + if m: + sentence = sentence.replace(m.group(0), "") + except Exception: + pass is_first = "_" in sentence is_end = "_" in sentence - text += sentence.replace("_", "").replace("_", "") + text += sentence.replace("_", "").replace("_", "").replace("_", "") if is_end: - if username in fay_booter.feiFei.nlp_streams: - stream_manager.new_instance().clear_Stream(username) break return jsonify({ "id": "fay-" + str(uuid.uuid4()), @@ -581,10 +577,10 @@ def to_stop_talking(): try: data = request.get_json() username = data.get('username', 'User') + stream_manager.new_instance().clear_Stream_with_audio(username) observation = data.get('observation', '') util.printInfo(1, username, f"开始执行打断操作,清空用户 {username} 的处理队列") - stream_manager.new_instance().clear_Stream_with_audio(username) util.printInfo(1, username, f"打断操作完成,用户 {username} 的所有队列已清空") result = "interrupted" # 简单的结果标识 @@ -617,7 +613,6 @@ def transparent_pass(): if response_text or audio_url: # 新消息到达,立即中断该用户之前的所有处理(文本流+音频队列) util.printInfo(1, username, f'[API中断] 新消息到达,完整中断用户 {username} 之前的所有处理') - stream_manager.new_instance().clear_Stream_with_audio(username) util.printInfo(1, username, f'[API中断] 用户 {username} 的文本流和音频队列已清空,准备处理新消息') interact = Interact('transparent_pass', 2, {'user': username, 'text': response_text, 'audio': audio_url, 'isend':True, 'isfirst':True}) util.printInfo(1, username, '透传播放:{},{}'.format(response_text, audio_url), time.time()) @@ -755,7 +750,7 @@ def api_start_genagents(): def monitor_shutdown(): try: while not is_shutdown_requested(): - time.sleep(1) + gsleep(1) util.log(1, f"检测到关闭请求,正在关闭决策分析服务...") genagents_server.shutdown() except Exception as e: