紧急修复

修复接口打断bug。
This commit is contained in:
guo zebin
2025-09-04 21:33:18 +08:00
parent af2f8d682a
commit 95be7648a1
5 changed files with 126 additions and 13 deletions

View File

@@ -145,6 +145,32 @@ class FeiFei:
# 失败时也要确保结束会话
state_manager.force_reset_user_state(username)
def __process_stream_output(self, text, username, session_type="type2_stream"):
"""
按流式方式分割和发送 type=2 的文本
使用安全的流式文本处理器和状态管理器
"""
if not text or text.strip() == "":
return
# 使用安全的流式文本处理器
from utils.stream_text_processor import get_processor
from utils.stream_state_manager import get_state_manager
processor = get_processor()
state_manager = get_state_manager()
# 处理流式文本is_qa=False表示普通模式
success = processor.process_stream_text(text, username, is_qa=False, session_type=session_type)
if success:
# 普通模式结束会话
state_manager.end_session(username)
else:
util.log(1, f"type=2流式处理失败文本长度: {len(text)}")
# 失败时也要确保结束会话
state_manager.force_reset_user_state(username)
#语音消息处理检查是否命中q&a
def __get_answer(self, interleaver, text):
answer = None
@@ -156,14 +182,18 @@ class FeiFei:
return None, None
#语音消息处理
#消息处理
def __process_interact(self, interact: Interact):
if self.__running:
try:
index = interact.interact_type
username = interact.data.get("user", "User")
uid = member_db.new_instance().find_user(username)
if index == 1: #语音文字交互
if index == 1: #语音文字交互
# 用户发送新消息,重置中断标志,开始新对话
stream_manager.new_instance().set_stop_generation(username, False)
util.printInfo(1, username, "用户新输入,重置中断标志")
#记录用户问题,方便obs等调用
self.write_to_file("./logs", "asr_result.txt", interact.data["msg"])
@@ -204,9 +234,10 @@ class FeiFei:
if interact.data.get("text"):
text = interact.data.get("text")
# 使用统一的文本处理方法,空列表表示没有额外回复
# 使用统一的文本处理方法
self.__process_text_output(text, username, uid)
MyThread(target=self.say, args=[interact, text]).start()
# 使用流式处理,按标点分割发送
self.__process_stream_output(text, username, f"type2_{interact.interleaver}")
return 'success'
except BaseException as e:
@@ -308,6 +339,11 @@ class FeiFei:
result = self.download_wav(audio_url, './samples/', file_name)
elif config_util.config["interact"]["playSound"] or wsa_server.get_instance().is_connected(interact.data.get("user")) or self.__is_send_remote_device_audio(interact):#tts
if text != None and text.replace("*", "").strip() != "":
# 检查是否需要停止TTS处理
if stream_manager.new_instance().should_stop_generation(interact.data.get("user", "User")):
util.printInfo(1, interact.data.get('user'), 'TTS处理被打断跳过音频合成')
return None
# 先过滤表情符号,然后再合成语音
filtered_text = self.__remove_emojis(text.replace("*", ""))
if filtered_text is not None and filtered_text.strip() != "":
@@ -485,7 +521,11 @@ class FeiFei:
#面板播放
config_util.load_config()
if config_util.config["interact"]["playSound"]:
self.sound_query.put((file_url, audio_length, interact))
# 检查是否需要停止音频播放
if stream_manager.new_instance().should_stop_generation(interact.data.get("user", "User")):
util.printInfo(1, interact.data.get('user'), '音频播放被打断,跳过加入播放队列')
return
self.sound_query.put((file_url, audio_length, interact))
else:
if wsa_server.get_web_instance().is_connected(interact.data.get('user')):
wsa_server.get_web_instance().add_cmd({"panelMsg": "", 'Username' : interact.data.get('user'), 'robot': f'{cfg.fay_url}/robot/Normal.jpg'})

View File

@@ -42,6 +42,7 @@ class StreamManager:
self.running = True # 控制监听线程的运行状态
self._initialized = True # 标记是否已初始化
self.msgid = "" # 消息ID
self.stop_generation_flags = {} # 存储用户的停止生成标志
def get_Stream(self, username):
"""
@@ -86,6 +87,8 @@ class StreamManager:
self._clear_Stream_internal(username)
# 清空音频队列(打断时需要清空音频)
self._clear_audio_queue(username)
# 重置停止生成标志,开始新的对话
self._set_stop_generation_internal(username, False)
try:
Stream, nlp_Stream = self.get_Stream(username)
success = Stream.write(sentence)
@@ -105,14 +108,50 @@ class StreamManager:
if username in self.nlp_streams:
self.nlp_streams[username].clear()
def set_stop_generation(self, username, stop=True):
"""
设置指定用户的停止生成标志
:param username: 用户名
:param stop: 是否停止默认True
"""
with self.lock:
self.stop_generation_flags[username] = stop
def should_stop_generation(self, username):
"""
检查指定用户是否应该停止生成
:param username: 用户名
:return: 是否应该停止
"""
with self.lock:
return self.stop_generation_flags.get(username, False)
def _set_stop_generation_internal(self, username, stop=True):
"""
设置停止生成标志的内部方法(不使用锁,调用者必须已持有锁)
:param username: 用户名
:param stop: 是否停止默认True
"""
self.stop_generation_flags[username] = stop
def _should_stop_generation_internal(self, username):
"""
检查停止生成标志的内部方法(不使用锁,调用者必须已持有锁)
:param username: 用户名
:return: 是否应该停止
"""
return self.stop_generation_flags.get(username, False)
def _clear_audio_queue(self, username):
"""
清空指定用户的音频队列
清空指定用户的音频队列并停止文本生成
:param username: 用户名
"""
import queue
fay_core = fay_booter.feiFei
fay_core.sound_query = queue.Queue()
# 设置停止生成标志,阻止新的文本继续生成和转换为音频
self._set_stop_generation_internal(username, True)
def clear_Stream(self, username):
"""
@@ -145,6 +184,15 @@ class StreamManager:
:param username: 用户名
:param sentence: 要处理的句子
"""
# 使用临时变量避免多次锁获取
should_stop = False
with self.lock:
should_stop = self._should_stop_generation_internal(username)
# 检查是否应该停止生成,如果是则不处理新的句子
if should_stop:
return
fay_core = fay_booter.feiFei
is_first = "_<isfirst>" in sentence

View File

@@ -309,6 +309,7 @@ def api_get_Msg():
except Exception as e:
return jsonify({'list': [], 'message': f'获取消息时出错: {e}'}), 500
#文字沟通接口
@__app.route('/v1/chat/completions', methods=['post'])
@__app.route('/api/send/v1/chat/completions', methods=['post'])
def api_send_v1_chat_completions():
@@ -563,9 +564,10 @@ def to_stop_talking():
message = data.get('text', '你好,请说?')
observation = data.get('observation', '')
from queue import Queue
stream_manager.clear_Stream_with_audio(username)
interact = Interact("stop_talking", 2, {'user': username, 'text': message, 'observation': str(observation)})
result = fay_booter.feiFei.on_interact(interact)
stream_manager.new_instance().clear_Stream_with_audio(username)
# 打断操作完成,不输出任何内容,避免时序冲突
util.printInfo(1, username, "执行打断操作,清空所有处理队列")
result = "interrupted" # 简单的结果标识
return jsonify({
'status': 'success',
'data': str(result) if result is not None else '',
@@ -595,7 +597,7 @@ def transparent_pass():
success = fay_booter.feiFei.on_interact(interact)
if (success == 'success'):
return jsonify({'code': 200, 'message' : '成功'})
return jsonify({'code': 500, 'message' : '原因出错'})
return jsonify({'code': 500, 'message' : '原因出错'})
except Exception as e:
return jsonify({'code': 500, 'message': f'出错: {e}'}), 500

View File

@@ -754,7 +754,7 @@ def question(content, username, observation=None):
related_memories = agent.memory_stream.retrieve(
[f"""{"主人" if username == "User" else username}提出了问题:{content}"""], # 查询句子列表
current_time_step, # 当前时间步
n_count=100, # 获取5条相关记忆
n_count=100, # 获取100条相关记忆
curr_filter="all", # 获取所有类型的记忆
hp=[0.8, 0.5, 0.5], # 权重:[时间近度权重recency_w, 相关性权重relevance_w, 重要性权重importance_w]
stateless=False
@@ -830,6 +830,11 @@ def question(content, username, observation=None):
for chunk in react_agent.stream(
{"messages": messages}, {"configurable": {"thread_id": "tid{}".format(username)}}
):
# 检查是否需要停止生成
if stream_manager.new_instance().should_stop_generation(username):
util.log(1, f"检测到停止标志中断React Agent文本生成: {username}")
break
react_response_text = ""
# 消息类型1检测工具调用开始
if "agent" in chunk and "tool_calls" in str(chunk):
@@ -925,6 +930,11 @@ def question(content, username, observation=None):
try:
# 2.2 使用全局定义的llm对象进行流式请求
for chunk in llm.stream(messages):
# 检查是否需要停止生成
if stream_manager.new_instance().should_stop_generation(username):
util.log(1, f"检测到停止标志中断LLM文本生成: {username}")
break
flush_text = chunk.content
if not flush_text:
continue

View File

@@ -69,6 +69,7 @@ class StreamTextProcessor:
util.log(1, f"文本已截断到: {len(accumulated_text)} 字符")
# 主处理循环,带安全保护
first_sentence_sent = False # 跟踪是否已发送第一个句子
while accumulated_text and iteration_count < self.max_iterations:
# 超时检查
if time.time() - start_time > self.timeout_seconds:
@@ -97,12 +98,15 @@ class StreamTextProcessor:
if len(sentence_text) >= self.min_length:
# 使用状态管理器准备句子
marked_text, is_first, is_end = state_manager.prepare_sentence(
username, sentence_text, force_first=False, force_end=False
username, sentence_text,
force_first=(not first_sentence_sent), # 第一句=True其他=False
force_end=False
)
success = stream_manager.new_instance().write_sentence(username, marked_text)
if success:
accumulated_text = accumulated_text[punct_index + 1:].lstrip()
first_sentence_sent = True # 标记已发送第一个句子
sent_successfully = True
break
else:
@@ -115,7 +119,16 @@ class StreamTextProcessor:
# 发送剩余文本,如果是最后的文本则标记为结束
if accumulated_text:
marked_text, _, _ = state_manager.prepare_sentence(
username, accumulated_text, force_first=False, force_end=True
username, accumulated_text,
force_first=(not first_sentence_sent), # 如果还没发送过句子,这是第一个
force_end=True
)
stream_manager.new_instance().write_sentence(username, marked_text)
first_sentence_sent = True
elif not first_sentence_sent:
# 如果整个文本都没有找到合适的分割点,作为完整句子发送
marked_text, _, _ = state_manager.prepare_sentence(
username, text, force_first=True, force_end=True
)
stream_manager.new_instance().write_sentence(username, marked_text)
else: