自然进化

1、修复qa标签误输到类gpt接口;
2、优化打断操作介入时机。
This commit is contained in:
guo zebin
2025-09-15 22:17:45 +08:00
parent 8540fa5dac
commit a2214f0b62
3 changed files with 55 additions and 61 deletions

View File

@@ -73,7 +73,7 @@ class FeiFei:
self.speaking = False #声音是否在播放
self.__running = True
self.sp.connect() #TODO 预连接
self.cemotion = None
self.timer = None
self.sound_query = Queue()
self.think_mode_users = {} # 使用字典存储每个用户的think模式状态
@@ -164,6 +164,13 @@ class FeiFei:
index = interact.interact_type
username = interact.data.get("user", "User")
uid = member_db.new_instance().find_user(username)
try:
from utils.stream_state_manager import get_state_manager
if get_state_manager().is_session_active(username):
stream_manager.new_instance().clear_Stream_with_audio(username)
except Exception:
pass
# 切换到新会话,令上一会话的流式输出/音频尽快结束
util.printInfo(1, username, "重置中断标志,开始新的对话处理")
@@ -268,10 +275,8 @@ class FeiFei:
if member_db.new_instance().is_username_exist(username) == "notexists":
member_db.new_instance().add_user(username)
# 判断调用来源如果是非stream调用则同步处理
if interact.data.get("stream", False):
MyThread(target=self.__process_interact, args=[interact]).start()
else:
return self.__process_interact(interact)
MyThread(target=self.__process_interact, args=[interact]).start()
else:
return self.__process_interact(interact)
@@ -296,7 +301,7 @@ class FeiFei:
try:
user_for_stop = interact.data.get("user", "User")
conv_id_for_stop = interact.data.get("conversation_id")
if stream_manager.new_instance().should_stop_generation(user_for_stop, conversation_id=conv_id_for_stop):
if not is_end and stream_manager.new_instance().should_stop_generation(user_for_stop, conversation_id=conv_id_for_stop):
return None
except Exception:
pass
@@ -339,7 +344,7 @@ class FeiFei:
try:
user_for_stop = interact.data.get("user", "User")
conv_id_for_stop = interact.data.get("conversation_id")
if not stream_manager.new_instance().should_stop_generation(user_for_stop, conversation_id=conv_id_for_stop):
if is_end or not stream_manager.new_instance().should_stop_generation(user_for_stop, conversation_id=conv_id_for_stop):
self.__process_text_output(text, interact.data.get('user'), uid, content_id, type)
except Exception:
self.__process_text_output(text, interact.data.get('user'), uid, content_id, type)
@@ -425,7 +430,7 @@ class FeiFei:
wsa_server.get_web_instance().add_cmd({"panelMsg": "", 'Username' : interact.data.get('user'), 'robot': f'{cfg.fay_url}/robot/Normal.jpg'})
if result is not None or is_first or is_end:
if is_end:#如果结束标记则延迟1秒处理,免得is end比前面的音频tts要快
if is_end:#TODO 临时方案:如果结束标记则延迟1秒处理,免得is end比前面的音频tts要快
time.sleep(1)
MyThread(target=self.__process_output_audio, args=[result, interact, text]).start()
return result
@@ -610,12 +615,6 @@ class FeiFei:
config_util.load_config()
if config_util.config["interact"]["playSound"]:
# 检查是否需要停止音频播放(按会话)
if stream_manager.new_instance().should_stop_generation(
interact.data.get("user", "User"),
conversation_id=interact.data.get("conversation_id")
):
util.printInfo(1, interact.data.get('user'), '音频播放被打断,跳过加入播放队列')
return
self.sound_query.put((file_url, audio_length, interact))
else:
if wsa_server.get_web_instance().is_connected(interact.data.get('user')):
@@ -648,9 +647,6 @@ class FeiFei:
#启动核心服务
def start(self):
if cfg.ltp_mode == "cemotion":
from cemotion import Cemotion
self.cemotion = Cemotion()
MyThread(target=self.__play_sound).start()
#停止核心服务

View File

@@ -135,12 +135,6 @@ class StreamManager:
# 检查是否包含_<isfirst>标记(可能在句子中间)
if '_<isfirst>' in sentence:
# 直接使用stream_lock清除文本流
with self.stream_lock:
self._clear_Stream_internal(username)
# 清空音频队列Queue本身线程安全不需要锁
self._clear_audio_queue(username)
# 收到新处理的第一个句子,重置停止标志,允许后续处理
with self.control_lock:
self.stop_generation_flags[username] = False
@@ -154,7 +148,8 @@ class StreamManager:
tag_cid = conversation_id if conversation_id is not None else current_cid
tagged_sentence = f"{sentence}__<cid={tag_cid}>__" if tag_cid else sentence
success = Stream.write(tagged_sentence)
nlp_success = nlp_Stream.write(sentence)
# 让 NLP 流也携带隐藏的会话ID便于前端按会话过滤
nlp_success = nlp_Stream.write(tagged_sentence)
return success and nlp_success
except Exception as e:
print(f"写入句子时出错: {e}")
@@ -170,6 +165,21 @@ class StreamManager:
if username in self.nlp_streams:
self.nlp_streams[username].clear()
# 清除后写入一条结束标记分别通知主流与NLP流结束
try:
# 确保流存在(监听线程也会在首次创建时启动)
stream, nlp_stream = self._get_Stream_internal(username)
cid = self.conversation_ids.get(username, "")
end_marker = "_<isend>"
# 主流带会话ID隐藏标签供下游按会话拦截
tagged = f"{end_marker}__<cid={cid}>__" if cid else end_marker
stream.write(tagged)
# NLP 流也写入带会话ID的结束标记前端会按会话过滤
nlp_stream.write(tagged)
except Exception:
# 忽略写入哨兵失败
pass
def set_stop_generation(self, username, stop=True):
"""
设置指定用户的停止生成标志
@@ -241,15 +251,6 @@ class StreamManager:
# 只清理特定用户的音频项,保留其他用户的音频
self._clear_user_specific_audio(username, fay_core.sound_query)
def clear_Stream(self, username):
"""
清除指定用户ID的文本流数据外部调用接口仅清除文本流
:param username: 用户名
"""
# 直接使用stream_lock不再需要clear_lock
with self.stream_lock:
self._clear_Stream_internal(username)
def clear_Stream_with_audio(self, username):
"""
清除指定用户ID的文本流数据和音频队列完全清除
@@ -262,13 +263,15 @@ class StreamManager:
# 第二步:设置停止标志(独立操作)
with self.control_lock:
self.stop_generation_flags[username] = True
# 第三步清除音频队列Queue线程安全不需要锁
self._clear_audio_queue(username)
# 第步:清除文本流(独立操作)
# 第步:清除文本流(独立操作)
with self.stream_lock:
self._clear_Stream_internal(username)
# 第四步清除音频队列Queue线程安全不需要锁
self._clear_audio_queue(username)
def listen(self, username, stream, nlp_stream):
while self.running:

View File

@@ -246,7 +246,7 @@ def api_start_live():
# 启动
try:
fay_booter.start()
time.sleep(1)
gsleep(1)
wsa_server.get_web_instance().add_cmd({"liveState": 1})
return '{"result":"successful"}'
except Exception as e:
@@ -257,7 +257,7 @@ def api_stop_live():
# 停止
try:
fay_booter.stop()
time.sleep(1)
gsleep(1)
wsa_server.get_web_instance().add_cmd({"liveState": 0})
return '{"result":"successful"}'
except Exception as e:
@@ -276,12 +276,7 @@ def api_send():
if not username or not msg:
return jsonify({'result': 'error', 'message': '用户名和消息内容不能为空'})
msg = msg.strip()
# 新消息到达,立即中断该用户之前的所有处理(文本流+音频队列)
util.printInfo(1, username, f'[API中断] 新消息到达,完整中断用户 {username} 之前的所有处理')
stream_manager.new_instance().clear_Stream_with_audio(username)
util.printInfo(1, username, f'[API中断] 用户 {username} 的文本流和音频队列已清空,准备处理新消息')
interact = Interact("text", 1, {'user': username, 'msg': msg})
util.printInfo(1, username, '[文字发送按钮]{}'.format(interact.data["msg"]), time.time())
fay_booter.feiFei.on_interact(interact)
@@ -343,12 +338,8 @@ def api_send_v1_chat_completions():
model = data.get('model', 'fay')
observation = data.get('observation', '')
# 检查请求中是否指定了流式传输
stream_requested = data.get('stream', False)
# 优先使用请求中的stream参数如果没有指定则使用配置中的设置
if stream_requested or model == 'fay-streaming':
interact = Interact("text", 1, {'user': username, 'msg': last_content, 'observation': str(observation), 'stream':True})
util.printInfo(1, username, '[文字沟通接口(流式)]{}'.format(interact.data["msg"]), time.time())
@@ -416,7 +407,6 @@ def adopt_msg():
def gpt_stream_response(last_content, username):
sm = stream_manager.new_instance()
_, nlp_Stream = sm.get_Stream(username)
conversation_id = sm.get_conversation_id(username)
def generate():
while True:
sentence = nlp_Stream.read()
@@ -424,10 +414,16 @@ def gpt_stream_response(last_content, username):
gsleep(0.01)
continue
# 处理特殊标记
# 解析并移除隐藏会话ID标签
try:
m = re.search(r"__<cid=([^>]+)>__", sentence)
if m:
sentence = sentence.replace(m.group(0), "")
except Exception:
pass
is_first = "_<isfirst>" in sentence
is_end = "_<isend>" in sentence
content = sentence.replace("_<isfirst>", "").replace("_<isend>", "")
content = sentence.replace("_<isfirst>", "").replace("_<isend>", "").replace("_<isqa>", "")
if content or is_first or is_end: # 只有当有实际内容时才发送
message = {
"id": "faystreaming-" + str(uuid.uuid4()),
@@ -451,9 +447,6 @@ def gpt_stream_response(last_content, username):
},
"system_fingerprint": ""
}
if is_end:
if username in fay_booter.feiFei.nlp_streams:
stream_manager.new_instance().clear_Stream(username)
yield f"data: {json.dumps(message)}\n\n"
if is_end:
break
@@ -466,7 +459,6 @@ def gpt_stream_response(last_content, username):
def non_streaming_response(last_content, username):
sm = stream_manager.new_instance()
_, nlp_Stream = sm.get_Stream(username)
conversation_id = sm.get_conversation_id(username)
text = ""
while True:
sentence = nlp_Stream.read()
@@ -475,12 +467,16 @@ def non_streaming_response(last_content, username):
continue
# 处理特殊标记
try:
m = re.search(r"__<cid=([^>]+)>__", sentence)
if m:
sentence = sentence.replace(m.group(0), "")
except Exception:
pass
is_first = "_<isfirst>" in sentence
is_end = "_<isend>" in sentence
text += sentence.replace("_<isfirst>", "").replace("_<isend>", "")
text += sentence.replace("_<isfirst>", "").replace("_<isend>", "").replace("_<isqa>", "")
if is_end:
if username in fay_booter.feiFei.nlp_streams:
stream_manager.new_instance().clear_Stream(username)
break
return jsonify({
"id": "fay-" + str(uuid.uuid4()),
@@ -581,10 +577,10 @@ def to_stop_talking():
try:
data = request.get_json()
username = data.get('username', 'User')
stream_manager.new_instance().clear_Stream_with_audio(username)
observation = data.get('observation', '')
util.printInfo(1, username, f"开始执行打断操作,清空用户 {username} 的处理队列")
stream_manager.new_instance().clear_Stream_with_audio(username)
util.printInfo(1, username, f"打断操作完成,用户 {username} 的所有队列已清空")
result = "interrupted" # 简单的结果标识
@@ -617,7 +613,6 @@ def transparent_pass():
if response_text or audio_url:
# 新消息到达,立即中断该用户之前的所有处理(文本流+音频队列)
util.printInfo(1, username, f'[API中断] 新消息到达,完整中断用户 {username} 之前的所有处理')
stream_manager.new_instance().clear_Stream_with_audio(username)
util.printInfo(1, username, f'[API中断] 用户 {username} 的文本流和音频队列已清空,准备处理新消息')
interact = Interact('transparent_pass', 2, {'user': username, 'text': response_text, 'audio': audio_url, 'isend':True, 'isfirst':True})
util.printInfo(1, username, '透传播放:{}{}'.format(response_text, audio_url), time.time())
@@ -755,7 +750,7 @@ def api_start_genagents():
def monitor_shutdown():
try:
while not is_shutdown_requested():
time.sleep(1)
gsleep(1)
util.log(1, f"检测到关闭请求,正在关闭决策分析服务...")
genagents_server.shutdown()
except Exception as e: