mirror of
https://github.com/xszyou/Fay.git
synced 2026-03-12 17:51:28 +08:00
自然进化
1、使用conversation_id规划session_id。
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
#作用是处理交互逻辑,文字输入,语音、文字及情绪的发送、播放及展示输出
|
||||
# -*- coding: utf-8 -*-
|
||||
#作用是处理交互逻辑,文字输入,语音、文字及情绪的发送、播放及展示输出
|
||||
import math
|
||||
from operator import index
|
||||
import os
|
||||
@@ -194,9 +195,11 @@ class FeiFei:
|
||||
# 切换到新会话,令上一会话的流式输出/音频尽快结束
|
||||
try:
|
||||
sm = stream_manager.new_instance()
|
||||
new_version = sm.bump_session(username)
|
||||
# 将当前会话版本附加到交互数据
|
||||
interact.data["session_version"] = new_version
|
||||
import uuid
|
||||
conv_id = "conv_" + str(uuid.uuid4())
|
||||
sm.set_current_conversation(username, conv_id)
|
||||
# 将当前会话ID附加到交互数据
|
||||
interact.data["conversation_id"] = conv_id
|
||||
# 允许新的生成
|
||||
sm.set_stop_generation(username, stop=False)
|
||||
except Exception:
|
||||
@@ -294,13 +297,13 @@ class FeiFei:
|
||||
# 提前进行会话有效性与中断检查,避免产生多余面板/数字人输出
|
||||
try:
|
||||
user_for_stop = interact.data.get("user", "User")
|
||||
session_ver = interact.data.get("session_version")
|
||||
if stream_manager.new_instance().should_stop_generation(user_for_stop, session_version=session_ver):
|
||||
conv_id_for_stop = interact.data.get("conversation_id")
|
||||
if stream_manager.new_instance().should_stop_generation(user_for_stop, conversation_id=conv_id_for_stop):
|
||||
return None
|
||||
except Exception:
|
||||
pass
|
||||
if is_first == True:
|
||||
conv = "conv_" + str(uuid.uuid4())
|
||||
conv = interact.data.get("conversation_id") or ("conv_" + str(uuid.uuid4()))
|
||||
conv_no = 0
|
||||
self.user_conv_map[interact.data.get("user", "User")] = {"conversation_id" : conv, "conversation_msg_no" : conv_no}
|
||||
else:
|
||||
@@ -312,8 +315,8 @@ class FeiFei:
|
||||
# 仅在会话有效时才发送面板消息
|
||||
try:
|
||||
user_for_stop = interact.data.get("user", "User")
|
||||
session_ver = interact.data.get("session_version")
|
||||
if not stream_manager.new_instance().should_stop_generation(user_for_stop, session_version=session_ver):
|
||||
conv_id_for_stop = interact.data.get("conversation_id")
|
||||
if not stream_manager.new_instance().should_stop_generation(user_for_stop, conversation_id=conv_id_for_stop):
|
||||
self.__send_panel_message(text, interact.data.get('user'), uid, 0, type)
|
||||
except Exception:
|
||||
self.__send_panel_message(text, interact.data.get('user'), uid, 0, type)
|
||||
@@ -345,8 +348,8 @@ class FeiFei:
|
||||
# 会话有效时才提示“思考中...”
|
||||
try:
|
||||
user_for_stop = interact.data.get("user", "User")
|
||||
session_ver = interact.data.get("session_version")
|
||||
should_block = stream_manager.new_instance().should_stop_generation(user_for_stop, session_version=session_ver)
|
||||
conv_id_for_stop = interact.data.get("conversation_id")
|
||||
should_block = stream_manager.new_instance().should_stop_generation(user_for_stop, conversation_id=conv_id_for_stop)
|
||||
except Exception:
|
||||
should_block = False
|
||||
if not should_block:
|
||||
@@ -374,7 +377,7 @@ class FeiFei:
|
||||
# 检查是否需要停止TTS处理(按会话)
|
||||
if stream_manager.new_instance().should_stop_generation(
|
||||
interact.data.get("user", "User"),
|
||||
session_version=interact.data.get("session_version")
|
||||
conversation_id=interact.data.get("conversation_id")
|
||||
):
|
||||
util.printInfo(1, interact.data.get('user'), 'TTS处理被打断,跳过音频合成')
|
||||
return None
|
||||
@@ -388,8 +391,8 @@ class FeiFei:
|
||||
# 合成完成后再次检查会话是否仍有效,避免继续输出旧会话结果
|
||||
try:
|
||||
user_for_stop = interact.data.get("user", "User")
|
||||
session_ver = interact.data.get("session_version")
|
||||
if stream_manager.new_instance().should_stop_generation(user_for_stop, session_version=session_ver):
|
||||
conv_id_for_stop = interact.data.get("conversation_id")
|
||||
if stream_manager.new_instance().should_stop_generation(user_for_stop, conversation_id=conv_id_for_stop):
|
||||
return None
|
||||
except Exception:
|
||||
pass
|
||||
@@ -480,6 +483,17 @@ class FeiFei:
|
||||
# 播放过程中计时,直到音频播放完毕
|
||||
length = 0
|
||||
while length < audio_length:
|
||||
try:
|
||||
user_for_stop = interact.data.get("user", "User")
|
||||
conv_id_for_stop = interact.data.get("conversation_id")
|
||||
if stream_manager.new_instance().should_stop_generation(user_for_stop, conversation_id=conv_id_for_stop):
|
||||
try:
|
||||
pygame.mixer.music.stop()
|
||||
except Exception:
|
||||
pass
|
||||
break
|
||||
except Exception:
|
||||
pass
|
||||
length += 0.01
|
||||
time.sleep(0.01)
|
||||
|
||||
@@ -532,8 +546,8 @@ class FeiFei:
|
||||
# 会话有效性与中断检查(最早返回,避免向面板/数字人发送任何旧会话输出)
|
||||
try:
|
||||
user_for_stop = interact.data.get("user", "User")
|
||||
session_ver = interact.data.get("session_version")
|
||||
if stream_manager.new_instance().should_stop_generation(user_for_stop, session_version=session_ver):
|
||||
conv_id_for_stop = interact.data.get("conversation_id")
|
||||
if stream_manager.new_instance().should_stop_generation(user_for_stop, conversation_id=conv_id_for_stop):
|
||||
return
|
||||
except Exception:
|
||||
pass
|
||||
@@ -575,7 +589,7 @@ class FeiFei:
|
||||
# 检查是否需要停止音频播放(按会话)
|
||||
if stream_manager.new_instance().should_stop_generation(
|
||||
interact.data.get("user", "User"),
|
||||
session_version=interact.data.get("session_version")
|
||||
conversation_id=interact.data.get("conversation_id")
|
||||
):
|
||||
util.printInfo(1, interact.data.get('user'), '音频播放被打断,跳过加入播放队列')
|
||||
return
|
||||
@@ -710,3 +724,4 @@ class FeiFei:
|
||||
|
||||
import importlib
|
||||
fay_booter = importlib.import_module('fay_booter')
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import threading
|
||||
import time
|
||||
from utils import stream_sentence
|
||||
@@ -45,7 +46,7 @@ class StreamManager:
|
||||
self._initialized = True # 标记是否已初始化
|
||||
self.msgid = "" # 消息ID
|
||||
self.stop_generation_flags = {} # 存储用户的停止生成标志
|
||||
self.session_versions = {} # 存储每个用户的会话版本(单调递增)
|
||||
self.conversation_ids = {} # 存储每个用户的会话ID(conv_前缀)
|
||||
|
||||
def bump_session(self, username):
|
||||
"""
|
||||
@@ -62,6 +63,16 @@ class StreamManager:
|
||||
with self.control_lock:
|
||||
return self.session_versions.get(username, 0)
|
||||
|
||||
def set_current_conversation(self, username, conversation_id):
|
||||
"""设置当前会话ID(conv_*)"""
|
||||
with self.control_lock:
|
||||
self.conversation_ids[username] = conversation_id
|
||||
|
||||
def get_conversation_id(self, username):
|
||||
"""获取当前会话ID(可能为空字符串)"""
|
||||
with self.control_lock:
|
||||
return self.conversation_ids.get(username, "")
|
||||
|
||||
def is_session_valid(self, username, version):
|
||||
"""检查给定版本是否仍为该用户的当前会话版本。"""
|
||||
with self.control_lock:
|
||||
@@ -98,12 +109,13 @@ class StreamManager:
|
||||
with self.stream_lock:
|
||||
return self._get_Stream_internal(username)
|
||||
|
||||
def write_sentence(self, username, sentence, session_version=None):
|
||||
def write_sentence(self, username, sentence, conversation_id=None, session_version=None):
|
||||
"""
|
||||
写入句子到指定用户的文本流(线程安全)
|
||||
:param username: 用户名
|
||||
:param sentence: 要写入的句子
|
||||
:param session_version: 句子产生时的会话版本(可选)
|
||||
:param conversation_id: 句子产生时的会话ID(可选,优先于版本判断)
|
||||
:param session_version: 句子产生时的会话版本(可选,兼容旧路径)
|
||||
:return: 写入是否成功
|
||||
"""
|
||||
# 检查句子长度,防止过大的句子导致内存问题
|
||||
@@ -113,13 +125,15 @@ class StreamManager:
|
||||
# 若当前处于停止状态且这不是新会话的首句,则丢弃写入,避免残余输出
|
||||
with self.control_lock:
|
||||
stop_flag = self.stop_generation_flags.get(username, False)
|
||||
current_version = self.session_versions.get(username, 0)
|
||||
current_cid = self.conversation_ids.get(username, "")
|
||||
if stop_flag and ('_<isfirst>' not in sentence):
|
||||
return False
|
||||
|
||||
# 如果提供了句子产生时的会话版本,且与当前版本不一致,则丢弃写入
|
||||
if session_version is not None and session_version != current_version:
|
||||
# 优先使用会话ID进行校验
|
||||
if conversation_id is not None and conversation_id != current_cid:
|
||||
return False
|
||||
# 兼容旧逻辑:按版本校验
|
||||
|
||||
|
||||
# 检查是否包含_<isfirst>标记(可能在句子中间)
|
||||
if '_<isfirst>' in sentence:
|
||||
@@ -138,7 +152,10 @@ class StreamManager:
|
||||
try:
|
||||
# 使用内部方法避免重复加锁
|
||||
Stream, nlp_Stream = self._get_Stream_internal(username)
|
||||
success = Stream.write(sentence)
|
||||
# 将会话ID以隐藏标签形式附在主流句子尾部,便于入口解析
|
||||
tag_cid = conversation_id if conversation_id is not None else current_cid
|
||||
tagged_sentence = f"{sentence}__<cid={tag_cid}>__" if tag_cid else sentence
|
||||
success = Stream.write(tagged_sentence)
|
||||
nlp_success = nlp_Stream.write(sentence)
|
||||
return success and nlp_success
|
||||
except Exception as e:
|
||||
@@ -164,7 +181,7 @@ class StreamManager:
|
||||
with self.control_lock:
|
||||
self.stop_generation_flags[username] = stop
|
||||
|
||||
def should_stop_generation(self, username, session_version=None):
|
||||
def should_stop_generation(self, username, conversation_id=None, session_version=None):
|
||||
"""
|
||||
检查指定用户是否应该停止生成
|
||||
:param username: 用户名
|
||||
@@ -174,9 +191,11 @@ class StreamManager:
|
||||
flag = self.stop_generation_flags.get(username, False)
|
||||
if flag:
|
||||
return True
|
||||
if session_version is not None:
|
||||
if session_version != self.session_versions.get(username, 0):
|
||||
return True
|
||||
# 优先按会话ID判断
|
||||
current_cid = self.conversation_ids.get(username, "")
|
||||
if conversation_id is not None and conversation_id != current_cid:
|
||||
return True
|
||||
# 兼容旧逻辑:按版本判断
|
||||
return False
|
||||
|
||||
# 内部方法已移除,直接使用带锁的公共方法
|
||||
@@ -267,6 +286,17 @@ class StreamManager:
|
||||
:param username: 用户名
|
||||
:param sentence: 要处理的句子
|
||||
"""
|
||||
# 从句子尾部解析隐藏的会话ID标签
|
||||
producer_cid = None
|
||||
try:
|
||||
import re as _re
|
||||
m = _re.search(r"__<cid=([^>]+)>__", sentence)
|
||||
if m:
|
||||
producer_cid = m.group(1)
|
||||
sentence = sentence.replace(m.group(0), "")
|
||||
except Exception:
|
||||
producer_cid = None
|
||||
|
||||
# 检查停止标志(使用control_lock)
|
||||
with self.control_lock:
|
||||
should_stop = self.stop_generation_flags.get(username, False)
|
||||
@@ -274,10 +304,11 @@ class StreamManager:
|
||||
if should_stop:
|
||||
return
|
||||
|
||||
# 进一步进行基于会话版本的快速拦截(避免进入下游 say)
|
||||
# 进一步进行基于会话ID/版本的快速拦截(避免进入下游 say)
|
||||
try:
|
||||
current_version = self.get_session_version(username)
|
||||
if self.should_stop_generation(username, session_version=current_version):
|
||||
current_cid = getattr(self, 'conversation_ids', {}).get(username, "")
|
||||
check_cid = producer_cid if producer_cid is not None else current_cid
|
||||
if self.should_stop_generation(username, conversation_id=check_cid):
|
||||
return
|
||||
except Exception:
|
||||
pass
|
||||
@@ -290,8 +321,12 @@ class StreamManager:
|
||||
# 执行实际处理(无锁,避免死锁)
|
||||
if sentence or is_first or is_end:
|
||||
fay_core = fay_booter.feiFei
|
||||
# 附带当前会话版本,方便下游按会话控制输出
|
||||
session_version = self.get_session_version(username)
|
||||
interact = Interact("stream", 1, {"user": username, "msg": sentence, "isfirst": is_first, "isend": is_end, "session_version": session_version})
|
||||
# 附带当前会话ID,方便下游按会话控制输出
|
||||
effective_cid = producer_cid if producer_cid is not None else getattr(self, 'conversation_ids', {}).get(username, "")
|
||||
interact = Interact("stream", 1, {"user": username, "msg": sentence, "isfirst": is_first, "isend": is_end, "conversation_id": effective_cid})
|
||||
fay_core.say(interact, sentence) # 调用核心处理模块进行响应
|
||||
time.sleep(0.01) # 短暂休眠以控制处理频率
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import importlib
|
||||
import json
|
||||
import time
|
||||
@@ -411,11 +412,11 @@ def adopt_msg():
|
||||
def gpt_stream_response(last_content, username):
|
||||
sm = stream_manager.new_instance()
|
||||
_, nlp_Stream = sm.get_Stream(username)
|
||||
session_version = sm.get_session_version(username)
|
||||
conversation_id = sm.get_conversation_id(username)
|
||||
def generate():
|
||||
while True:
|
||||
# If interrupted or session switched, end the SSE stream promptly
|
||||
if sm.should_stop_generation(username, session_version=session_version):
|
||||
if sm.should_stop_generation(username, conversation_id=conversation_id):
|
||||
yield 'data: [DONE]\n\n'
|
||||
break
|
||||
sentence = nlp_Stream.read()
|
||||
@@ -465,11 +466,11 @@ def gpt_stream_response(last_content, username):
|
||||
def non_streaming_response(last_content, username):
|
||||
sm = stream_manager.new_instance()
|
||||
_, nlp_Stream = sm.get_Stream(username)
|
||||
session_version = sm.get_session_version(username)
|
||||
conversation_id = sm.get_conversation_id(username)
|
||||
text = ""
|
||||
while True:
|
||||
# If interrupted or session switched, stop waiting and return what we have
|
||||
if sm.should_stop_generation(username, session_version=session_version):
|
||||
if sm.should_stop_generation(username, conversation_id=conversation_id):
|
||||
break
|
||||
sentence = nlp_Stream.read()
|
||||
if sentence is None:
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import os
|
||||
import json
|
||||
import time
|
||||
@@ -726,7 +727,7 @@ def question(content, username, observation=None):
|
||||
# 记录当前会话版本,用于精准中断
|
||||
from core import stream_manager
|
||||
sm = stream_manager.new_instance()
|
||||
session_version = sm.get_session_version(username)
|
||||
conversation_id = sm.get_conversation_id(username)
|
||||
|
||||
# 创建代理
|
||||
agent = create_agent(username)
|
||||
@@ -836,7 +837,7 @@ def question(content, username, observation=None):
|
||||
{"messages": messages}, {"configurable": {"thread_id": "tid{}".format(username)}}
|
||||
):
|
||||
# 检查是否需要停止生成
|
||||
if sm.should_stop_generation(username, session_version=session_version):
|
||||
if sm.should_stop_generation(username, conversation_id=conversation_id):
|
||||
util.log(1, f"检测到停止标志,中断React Agent文本生成: {username}")
|
||||
break
|
||||
|
||||
@@ -858,16 +859,16 @@ def question(content, username, observation=None):
|
||||
else:
|
||||
content_temp = react_response_text
|
||||
|
||||
stream_manager.new_instance().write_sentence(username, content_temp, session_version=session_version)
|
||||
stream_manager.new_instance().write_sentence(username, content_temp, conversation_id=conversation_id)
|
||||
except (KeyError, IndexError, AttributeError) as e:
|
||||
# 如果提取失败,使用通用提示
|
||||
react_response_text = f"正在调用MCP工具。\n"
|
||||
stream_manager.new_instance().write_sentence(username, react_response_text, session_version=session_version)
|
||||
stream_manager.new_instance().write_sentence(username, react_response_text, conversation_id=conversation_id)
|
||||
|
||||
# 消息类型2:检测工具执行结果
|
||||
elif "tools" in chunk and current_tool_name:
|
||||
react_response_text = f"{current_tool_name}工具已经执行成功。\n"
|
||||
stream_manager.new_instance().write_sentence(username, react_response_text, session_version=session_version)
|
||||
stream_manager.new_instance().write_sentence(username, react_response_text, conversation_id=conversation_id)
|
||||
|
||||
# 消息类型3:检测最终回复
|
||||
else:
|
||||
@@ -902,7 +903,7 @@ def question(content, username, observation=None):
|
||||
sentence_text = accumulated_text[:last_punct_pos + 1]
|
||||
# 使用状态管理器准备句子
|
||||
marked_text, _, _ = state_manager.prepare_sentence(username, sentence_text)
|
||||
stream_manager.new_instance().write_sentence(username, marked_text, session_version=session_version)
|
||||
stream_manager.new_instance().write_sentence(username, marked_text, conversation_id=conversation_id)
|
||||
accumulated_text = accumulated_text[last_punct_pos + 1:].lstrip()
|
||||
|
||||
except (KeyError, IndexError, AttributeError):
|
||||
@@ -910,7 +911,7 @@ def question(content, username, observation=None):
|
||||
if is_first_sentence:
|
||||
react_response_text = "_<isfirst>" + react_response_text
|
||||
is_first_sentence = False
|
||||
stream_manager.new_instance().write_sentence(username, react_response_text, session_version=session_version)
|
||||
stream_manager.new_instance().write_sentence(username, react_response_text, conversation_id=conversation_id)
|
||||
|
||||
full_response_text += react_response_text
|
||||
|
||||
@@ -918,18 +919,18 @@ def question(content, username, observation=None):
|
||||
from utils.stream_state_manager import get_state_manager
|
||||
state_manager = get_state_manager()
|
||||
|
||||
if not sm.should_stop_generation(username, session_version=session_version):
|
||||
if not sm.should_stop_generation(username, conversation_id=conversation_id):
|
||||
if accumulated_text:
|
||||
# 使用状态管理器准备最后的文本,强制标记为结束
|
||||
marked_text, _, _ = state_manager.prepare_sentence(username, accumulated_text, force_end=True)
|
||||
stream_manager.new_instance().write_sentence(username, marked_text, session_version=session_version)
|
||||
stream_manager.new_instance().write_sentence(username, marked_text, conversation_id=conversation_id)
|
||||
else:
|
||||
# 如果没有剩余文本,检查是否需要发送结束标记
|
||||
session_info = state_manager.get_session_info(username)
|
||||
if session_info and not session_info.get('is_end_sent', False):
|
||||
# 发送一个空的结束标记
|
||||
marked_text, _, _ = state_manager.prepare_sentence(username, "", force_end=True)
|
||||
stream_manager.new_instance().write_sentence(username, marked_text, session_version=session_version)
|
||||
stream_manager.new_instance().write_sentence(username, marked_text, conversation_id=conversation_id)
|
||||
|
||||
|
||||
else:
|
||||
@@ -937,7 +938,7 @@ def question(content, username, observation=None):
|
||||
# 2.2 使用全局定义的llm对象进行流式请求
|
||||
for chunk in llm.stream(messages):
|
||||
# 检查是否需要停止生成
|
||||
if sm.should_stop_generation(username, session_version=session_version):
|
||||
if sm.should_stop_generation(username, conversation_id=conversation_id):
|
||||
util.log(1, f"检测到停止标志,中断LLM文本生成: {username}")
|
||||
break
|
||||
|
||||
@@ -969,7 +970,7 @@ def question(content, username, observation=None):
|
||||
sentence_text = accumulated_text[:last_punct_pos + 1]
|
||||
# 使用状态管理器准备句子
|
||||
marked_text, _, _ = state_manager.prepare_sentence(username, sentence_text)
|
||||
stream_manager.new_instance().write_sentence(username, marked_text, session_version=session_version)
|
||||
stream_manager.new_instance().write_sentence(username, marked_text, conversation_id=conversation_id)
|
||||
accumulated_text = accumulated_text[last_punct_pos + 1:].lstrip()
|
||||
|
||||
full_response_text += flush_text
|
||||
@@ -977,26 +978,26 @@ def question(content, username, observation=None):
|
||||
from utils.stream_state_manager import get_state_manager
|
||||
state_manager = get_state_manager()
|
||||
|
||||
if not sm.should_stop_generation(username, session_version=session_version):
|
||||
if not sm.should_stop_generation(username, conversation_id=conversation_id):
|
||||
if accumulated_text:
|
||||
# 使用状态管理器准备最后的文本,强制标记为结束
|
||||
marked_text, _, _ = state_manager.prepare_sentence(username, accumulated_text, force_end=True)
|
||||
stream_manager.new_instance().write_sentence(username, marked_text, session_version=session_version)
|
||||
stream_manager.new_instance().write_sentence(username, marked_text, conversation_id=conversation_id)
|
||||
else:
|
||||
# 如果没有剩余文本,检查是否需要发送结束标记
|
||||
session_info = state_manager.get_session_info(username)
|
||||
if session_info and not session_info.get('is_end_sent', False):
|
||||
# 发送一个空的结束标记
|
||||
marked_text, _, _ = state_manager.prepare_sentence(username, "", force_end=True)
|
||||
stream_manager.new_instance().write_sentence(username, marked_text, session_version=session_version)
|
||||
stream_manager.new_instance().write_sentence(username, marked_text, conversation_id=conversation_id)
|
||||
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
util.log(1, f"请求失败: {e}")
|
||||
error_message = "抱歉,我现在太忙了,休息一会,请稍后再试。"
|
||||
# 会话未被取消时才发送错误提示
|
||||
if not sm.should_stop_generation(username, session_version=session_version):
|
||||
stream_manager.new_instance().write_sentence(username, "_<isfirst>" + error_message + "_<isend>", session_version=session_version)
|
||||
if not sm.should_stop_generation(username, conversation_id=conversation_id):
|
||||
stream_manager.new_instance().write_sentence(username, "_<isfirst>" + error_message + "_<isend>", conversation_id=conversation_id)
|
||||
full_response_text = error_message
|
||||
|
||||
# 结束会话(不再需要发送额外的结束标记)
|
||||
|
||||
@@ -1,13 +1,16 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import time
|
||||
from utils import util
|
||||
from core import stream_manager
|
||||
from utils.stream_state_manager import get_state_manager
|
||||
|
||||
|
||||
class StreamTextProcessor:
|
||||
"""
|
||||
安全的流式文本处理器,防止死循环和性能问题
|
||||
安全的流式文本处理器,负责将长文本按句子切分并逐句写入流,
|
||||
同时具备超时、迭代上限、缓存上限等保护,避免死循环与性能问题。
|
||||
"""
|
||||
|
||||
|
||||
def __init__(self, min_length=10, max_iterations=100, timeout_seconds=30, max_cache_size=10240):
|
||||
"""
|
||||
初始化流式文本处理器
|
||||
@@ -22,16 +25,17 @@ class StreamTextProcessor:
|
||||
self.max_iterations = max_iterations
|
||||
self.timeout_seconds = timeout_seconds
|
||||
self.max_cache_size = max_cache_size
|
||||
self.punctuation_marks = [",", ",", "。", "、", "!", "?", ".", "!", "?", "\n"]
|
||||
|
||||
# 常用中英文分句标点(UTF-8)
|
||||
self.punctuation_marks = [",", "。", ";", ":", "、", "!", "?", ".", "!", "?", "\n"]
|
||||
|
||||
def process_stream_text(self, text, username, is_qa=False, session_type="stream"):
|
||||
"""
|
||||
安全地处理流式文本分割和发送
|
||||
安全地处理流式文本分割与发送
|
||||
|
||||
参数:
|
||||
text: 要处理的文本
|
||||
username: 用户名
|
||||
is_qa: 是否为Q&A模式
|
||||
is_qa: 是否为 Q&A 模式
|
||||
session_type: 会话类型
|
||||
|
||||
返回:
|
||||
@@ -40,25 +44,24 @@ class StreamTextProcessor:
|
||||
if not text or not text.strip():
|
||||
return True
|
||||
|
||||
# 获取状态管理器并开始新会话
|
||||
# 获取状态管理器并开始新会话(若未开始)
|
||||
state_manager = get_state_manager()
|
||||
if not state_manager.is_session_active(username):
|
||||
state_manager.start_new_session(username, session_type)
|
||||
|
||||
# 捕获本次流式处理对应的会话版本(用于精确隔离)
|
||||
from core import stream_manager
|
||||
# 捕获本次流式处理对应的会话ID(用于精确隔离)
|
||||
sm = stream_manager.new_instance()
|
||||
session_version = sm.get_session_version(username)
|
||||
conversation_id = sm.get_conversation_id(username)
|
||||
|
||||
try:
|
||||
return self._safe_process_text(text, username, is_qa, state_manager, session_version)
|
||||
return self._safe_process_text(text, username, is_qa, state_manager, conversation_id)
|
||||
except Exception as e:
|
||||
util.log(1, f"流式文本处理出错: {str(e)}")
|
||||
# 发生异常时,直接发送完整文本作为备用方案
|
||||
self._send_fallback_text(text, username, state_manager, session_version)
|
||||
self._send_fallback_text(text, username, state_manager, conversation_id)
|
||||
return False
|
||||
|
||||
def _safe_process_text(self, text, username, is_qa, state_manager, session_version):
|
||||
|
||||
def _safe_process_text(self, text, username, is_qa, state_manager, conversation_id):
|
||||
"""
|
||||
安全的文本处理核心逻辑,包含缓存溢出保护
|
||||
"""
|
||||
@@ -103,12 +106,15 @@ class StreamTextProcessor:
|
||||
if len(sentence_text) >= self.min_length:
|
||||
# 使用状态管理器准备句子
|
||||
marked_text, is_first, is_end = state_manager.prepare_sentence(
|
||||
username, sentence_text,
|
||||
force_first=(not first_sentence_sent), # 第一句=True,其他=False
|
||||
force_end=False
|
||||
username,
|
||||
sentence_text,
|
||||
force_first=(not first_sentence_sent), # 第一段 True,其它 False
|
||||
force_end=False,
|
||||
)
|
||||
|
||||
success = stream_manager.new_instance().write_sentence(username, marked_text, session_version=session_version)
|
||||
success = stream_manager.new_instance().write_sentence(
|
||||
username, marked_text, conversation_id=conversation_id
|
||||
)
|
||||
if success:
|
||||
accumulated_text = accumulated_text[punct_index + 1:].lstrip()
|
||||
first_sentence_sent = True # 标记已发送第一个句子
|
||||
@@ -124,26 +130,33 @@ class StreamTextProcessor:
|
||||
# 发送剩余文本,如果是最后的文本则标记为结束
|
||||
if accumulated_text:
|
||||
marked_text, _, _ = state_manager.prepare_sentence(
|
||||
username, accumulated_text,
|
||||
force_first=(not first_sentence_sent), # 如果还没发送过句子,这是第一个
|
||||
force_end=True
|
||||
username,
|
||||
accumulated_text,
|
||||
force_first=(not first_sentence_sent), # 如果还没发送过句子,这是第一段
|
||||
force_end=True,
|
||||
)
|
||||
stream_manager.new_instance().write_sentence(
|
||||
username, marked_text, conversation_id=conversation_id
|
||||
)
|
||||
stream_manager.new_instance().write_sentence(username, marked_text, session_version=session_version)
|
||||
first_sentence_sent = True
|
||||
elif not first_sentence_sent:
|
||||
# 如果整个文本都没有找到合适的分割点,作为完整句子发送
|
||||
marked_text, _, _ = state_manager.prepare_sentence(
|
||||
username, text, force_first=True, force_end=True
|
||||
)
|
||||
stream_manager.new_instance().write_sentence(username, marked_text, session_version=session_version)
|
||||
stream_manager.new_instance().write_sentence(
|
||||
username, marked_text, conversation_id=conversation_id
|
||||
)
|
||||
else:
|
||||
# 如果没有剩余文本,需要确保最后发送的句子包含结束标记
|
||||
session_info = state_manager.get_session_info(username)
|
||||
if session_info and not session_info.get('is_end_sent', False):
|
||||
if session_info and not session_info.get("is_end_sent", False):
|
||||
marked_text, _, _ = state_manager.prepare_sentence(
|
||||
username, "", force_first=False, force_end=True
|
||||
)
|
||||
stream_manager.new_instance().write_sentence(username, marked_text, session_version=session_version)
|
||||
stream_manager.new_instance().write_sentence(
|
||||
username, marked_text, conversation_id=conversation_id
|
||||
)
|
||||
|
||||
# 结束会话
|
||||
state_manager.end_session(username)
|
||||
@@ -153,7 +166,7 @@ class StreamTextProcessor:
|
||||
util.log(1, f"流式处理达到最大迭代次数限制: {self.max_iterations}")
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def _find_punctuation_indices(self, text):
|
||||
"""
|
||||
安全地查找标点符号位置
|
||||
@@ -168,29 +181,33 @@ class StreamTextProcessor:
|
||||
except Exception as e:
|
||||
util.log(1, f"查找标点符号 '{punct}' 时出错: {str(e)}")
|
||||
continue
|
||||
|
||||
|
||||
return sorted([i for i in indices if i != -1])
|
||||
except Exception as e:
|
||||
util.log(1, f"查找标点符号时出错: {str(e)}")
|
||||
return []
|
||||
|
||||
def _send_fallback_text(self, text, username, state_manager, session_version):
|
||||
|
||||
def _send_fallback_text(self, text, username, state_manager, conversation_id):
|
||||
"""
|
||||
备用发送方案,直接发送完整文本
|
||||
备用发送方案:直接发送完整文本(含首尾标记)
|
||||
"""
|
||||
try:
|
||||
# 使用状态管理器准备完整文本
|
||||
marked_text, _, _ = state_manager.prepare_sentence(
|
||||
username, text, force_first=True, force_end=True
|
||||
)
|
||||
stream_manager.new_instance().write_sentence(username, marked_text, session_version=session_version)
|
||||
stream_manager.new_instance().write_sentence(
|
||||
username, marked_text, conversation_id=conversation_id
|
||||
)
|
||||
util.log(1, "使用备用方案发送完整文本")
|
||||
except Exception as e:
|
||||
util.log(1, f"备用发送方案也失败: {str(e)}")
|
||||
|
||||
|
||||
# 全局单例实例
|
||||
_processor_instance = None
|
||||
|
||||
|
||||
def get_processor():
|
||||
"""
|
||||
获取流式文本处理器单例
|
||||
@@ -199,3 +216,4 @@ def get_processor():
|
||||
if _processor_instance is None:
|
||||
_processor_instance = StreamTextProcessor()
|
||||
return _processor_instance
|
||||
|
||||
|
||||
Reference in New Issue
Block a user