自然进化

进一步优化会话管理。
This commit is contained in:
guo zebin
2025-09-12 15:29:35 +08:00
parent 0e02e03d70
commit 30d342670c
5 changed files with 226 additions and 210 deletions

View File

@@ -1,4 +1,4 @@
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
#作用是处理交互逻辑,文字输入,语音、文字及情绪的发送、播放及展示输出
import math
from operator import index
@@ -140,7 +140,7 @@ class FeiFei:
if success:
# Q&A模式结束会话不再需要发送额外的结束标记
state_manager.end_session(username)
state_manager.end_session(username, conversation_id=stream_manager.new_instance().get_conversation_id(username))
else:
util.log(1, f"Q&A流式处理失败文本长度: {len(text)}")
# 失败时也要确保结束会话
@@ -166,7 +166,7 @@ class FeiFei:
if success:
# 普通模式结束会话
state_manager.end_session(username)
state_manager.end_session(username, conversation_id=stream_manager.new_instance().get_conversation_id(username))
else:
util.log(1, f"type=2流式处理失败文本长度: {len(text)}")
# 失败时也要确保结束会话

View File

@@ -48,36 +48,34 @@ class StreamManager:
self.stop_generation_flags = {} # 存储用户的停止生成标志
self.conversation_ids = {} # 存储每个用户的会话IDconv_前缀
def bump_session(self, username):
"""
切换到新会话:为用户的会话版本号 +1 并返回新版本号。
"""
with self.control_lock:
current = self.session_versions.get(username, 0)
current += 1
self.session_versions[username] = current
return current
def get_session_version(self, username):
"""获取用户当前会话版本不存在则为0"""
with self.control_lock:
return self.session_versions.get(username, 0)
def set_current_conversation(self, username, conversation_id):
"""设置当前会话IDconv_*"""
def set_current_conversation(self, username, conversation_id, session_type=None):
"""设置当前会话IDconv_*)并对齐状态管理器的会话。
session_type 可选;未提供则沿用已存在状态的类型或默认 'stream'
"""
with self.control_lock:
self.conversation_ids[username] = conversation_id
# 对齐 StreamStateManager 的会话,以防用户名级状态跨会话串线
try:
from utils.stream_state_manager import get_state_manager # 延迟导入避免循环依赖
smgr = get_state_manager()
info = smgr.get_session_info(username)
if (not info) or (info.get('conversation_id') != conversation_id):
smgr.start_new_session(
username,
session_type if session_type else (info.get('session_type') if info else 'stream'),
conversation_id=conversation_id,
)
except Exception:
# 状态对齐失败不阻断主流程
pass
def get_conversation_id(self, username):
"""获取当前会话ID可能为空字符串"""
with self.control_lock:
return self.conversation_ids.get(username, "")
def is_session_valid(self, username, version):
"""检查给定版本是否仍为该用户的当前会话版本。"""
with self.control_lock:
return version == self.session_versions.get(username, 0)
def _get_Stream_internal(self, username):
"""
内部方法获取指定用户ID的文本流不加锁调用者必须已持有stream_lock

View File

@@ -902,7 +902,7 @@ def question(content, username, observation=None):
if last_punct_pos > 10: # 确保有足够的内容发送
sentence_text = accumulated_text[:last_punct_pos + 1]
# 使用状态管理器准备句子
marked_text, _, _ = state_manager.prepare_sentence(username, sentence_text)
marked_text, _, _ = state_manager.prepare_sentence(username, sentence_text, conversation_id=conversation_id)
stream_manager.new_instance().write_sentence(username, marked_text, conversation_id=conversation_id)
accumulated_text = accumulated_text[last_punct_pos + 1:].lstrip()
@@ -922,14 +922,14 @@ def question(content, username, observation=None):
if not sm.should_stop_generation(username, conversation_id=conversation_id):
if accumulated_text:
# 使用状态管理器准备最后的文本,强制标记为结束
marked_text, _, _ = state_manager.prepare_sentence(username, accumulated_text, force_end=True)
marked_text, _, _ = state_manager.prepare_sentence(username, accumulated_text, force_end=True, conversation_id=conversation_id)
stream_manager.new_instance().write_sentence(username, marked_text, conversation_id=conversation_id)
else:
# 如果没有剩余文本,检查是否需要发送结束标记
session_info = state_manager.get_session_info(username)
session_info = state_manager.get_session_info(username, conversation_id=conversation_id)
if session_info and not session_info.get('is_end_sent', False):
# 发送一个空的结束标记
marked_text, _, _ = state_manager.prepare_sentence(username, "", force_end=True)
marked_text, _, _ = state_manager.prepare_sentence(username, "", force_end=True, conversation_id=conversation_id)
stream_manager.new_instance().write_sentence(username, marked_text, conversation_id=conversation_id)
@@ -969,26 +969,26 @@ def question(content, username, observation=None):
if last_punct_pos > 10: # 确保有足够的内容发送
sentence_text = accumulated_text[:last_punct_pos + 1]
# 使用状态管理器准备句子
marked_text, _, _ = state_manager.prepare_sentence(username, sentence_text)
marked_text, _, _ = state_manager.prepare_sentence(username, sentence_text, conversation_id=conversation_id)
stream_manager.new_instance().write_sentence(username, marked_text, conversation_id=conversation_id)
accumulated_text = accumulated_text[last_punct_pos + 1:].lstrip()
full_response_text += flush_text
# 确保最后一段文本也被发送,并标记为结束(若会话未被取消)
# 确保最后一段文本及结束标记也被发送(若会话未被取消)
from utils.stream_state_manager import get_state_manager
state_manager = get_state_manager()
if not sm.should_stop_generation(username, conversation_id=conversation_id):
if accumulated_text:
# 使用状态管理器准备最后的文本,强制标记为结束
marked_text, _, _ = state_manager.prepare_sentence(username, accumulated_text, force_end=True)
marked_text, _, _ = state_manager.prepare_sentence(username, accumulated_text, force_end=True, conversation_id=conversation_id)
stream_manager.new_instance().write_sentence(username, marked_text, conversation_id=conversation_id)
else:
# 如果没有剩余文本,检查是否需要发送结束标记
session_info = state_manager.get_session_info(username)
session_info = state_manager.get_session_info(username, conversation_id=conversation_id)
if session_info and not session_info.get('is_end_sent', False):
# 发送一个空的结束标记
marked_text, _, _ = state_manager.prepare_sentence(username, "", force_end=True)
marked_text, _, _ = state_manager.prepare_sentence(username, "", force_end=True, conversation_id=conversation_id)
stream_manager.new_instance().write_sentence(username, marked_text, conversation_id=conversation_id)
@@ -1003,7 +1003,7 @@ def question(content, username, observation=None):
# 结束会话(不再需要发送额外的结束标记)
from utils.stream_state_manager import get_state_manager
state_manager = get_state_manager()
state_manager.end_session(username)
state_manager.end_session(username, conversation_id=conversation_id)
# 在单独线程中记忆对话内容
MyThread(target=remember_conversation_thread, args=(username, content, full_response_text.split("</think>")[-1])).start()

View File

@@ -1,223 +1,242 @@
import threading
import time
from utils import util
from enum import Enum
from utils import util
class StreamState(Enum):
"""流式状态枚举"""
IDLE = "idle" # 空闲状态
FIRST_SENTENCE = "first" # 第一句话
MIDDLE_SENTENCE = "middle" # 中间句子
LAST_SENTENCE = "last" # 最后一句话
COMPLETED = "completed" # 完成状态
"""句子流的会话内状态枚举"""
IDLE = "idle"
FIRST_SENTENCE = "first"
MIDDLE_SENTENCE = "middle"
LAST_SENTENCE = "last"
COMPLETED = "completed"
class StreamStateManager:
"""
流式状态管理器 - 统一管理isfirst/isend标记
解决多处设置标记导致的状态不一致问题
流式会话状态管理器统一管理 isfirst/isend 标记)。
通过 conversation_id 感知并对齐会话,避免跨会话状态串线。
"""
def __init__(self):
self.lock = threading.RLock()
self.user_states = {} # 用户名 -> 状态信息
self.session_counters = {} # 用户名 -> 会话计数器
def start_new_session(self, username, session_type="stream"):
# username -> state info
self.user_states = {}
def start_new_session(self, username, session_type="stream", conversation_id=None):
"""
开始新的流式会话
启动或重置指定用户的流式会话
参数:
username: 用户名
session_type: 会话类型 (stream, qa, auto_play等)
session_type: 会话类型stream, qa, auto_play 等)
conversation_id: 外部对齐的会话ID为 None 时将尝试从 StreamManager 获取
返回:
session_id: 会话ID
conversation_id如果未知可能为 None
"""
with self.lock:
if username not in self.session_counters:
self.session_counters[username] = 0
self.session_counters[username] += 1
session_id = f"{username}_{session_type}_{self.session_counters[username]}_{int(time.time())}"
if conversation_id is None:
try:
from core import stream_manager # lazy import to avoid cycles
conversation_id = stream_manager.new_instance().get_conversation_id(username)
except Exception:
conversation_id = None
self.user_states[username] = {
'session_id': session_id,
'session_type': session_type,
'state': StreamState.IDLE,
'sentence_count': 0,
'start_time': time.time(),
'last_update': time.time(),
'is_first_sent': False,
'is_end_sent': False
"session_type": session_type,
"state": StreamState.IDLE,
"sentence_count": 0,
"start_time": time.time(),
"last_update": time.time(),
"is_first_sent": False,
"is_end_sent": False,
"conversation_id": conversation_id,
}
return session_id
def prepare_sentence(self, username, text, force_first=False, force_end=False):
return conversation_id
def prepare_sentence(self, username, text, force_first=False, force_end=False, conversation_id=None):
"""
准备发送句子,自动添加适当的标记
准备发送句子:根据需要追加首尾标记并安全更新状态。
参数:
username: 用户名
text: 文本内容
force_first: 强制设为第一句
force_end: 强制设为最后一句
返回:
tuple: (处理后的文本, 是否为第一句, 是否为最后一句)
返回: (marked_text, is_first, is_end)
"""
with self.lock:
# 与当前会话对齐(若提供或可获取)
current_cid = conversation_id
if current_cid is None:
try:
from core import stream_manager
current_cid = stream_manager.new_instance().get_conversation_id(username)
except Exception:
current_cid = None
if username not in self.user_states:
# 如果没有活跃会话,自动创建一个
self.start_new_session(username)
self.start_new_session(username, conversation_id=current_cid)
state_info = self.user_states[username]
state_info['last_update'] = time.time()
if state_info.get("conversation_id") != current_cid:
# 会话已切换,重置状态
self.start_new_session(username, session_type=state_info.get("session_type", "stream"), conversation_id=current_cid)
state_info = self.user_states[username]
# 判断是否为第一句
state_info["last_update"] = time.time()
# 判定是否为首句
is_first = False
if force_first or (not state_info['is_first_sent'] and state_info['sentence_count'] == 0):
if force_first or (not state_info["is_first_sent"] and state_info["sentence_count"] == 0):
is_first = True
state_info['is_first_sent'] = True
state_info['state'] = StreamState.FIRST_SENTENCE
elif state_info['sentence_count'] > 0:
state_info['state'] = StreamState.MIDDLE_SENTENCE
state_info["is_first_sent"] = True
state_info["state"] = StreamState.FIRST_SENTENCE
elif state_info["sentence_count"] > 0:
state_info["state"] = StreamState.MIDDLE_SENTENCE
# 判是否为最后一
is_end = force_end
# 判是否为
is_end = bool(force_end)
if is_end:
state_info['is_end_sent'] = True
state_info['state'] = StreamState.LAST_SENTENCE
state_info["is_end_sent"] = True
state_info["state"] = StreamState.LAST_SENTENCE
# 更新句子计数
state_info['sentence_count'] += 1
# 句子计数 +1
state_info["sentence_count"] += 1
# 构造带标记的文本
# 附加隐藏标记
marked_text = text
if is_first and not marked_text.endswith('_<isfirst>'):
if is_first and not marked_text.endswith("_<isfirst>"):
marked_text += "_<isfirst>"
if is_end and not marked_text.endswith('_<isend>'):
if is_end and not marked_text.endswith("_<isend>"):
marked_text += "_<isend>"
return marked_text, is_first, is_end
def end_session(self, username):
def end_session(self, username, conversation_id=None):
"""
结束当前会话
参数:
username: 用户名
返回:
str: 空字符串(结束标记应该已经附加到最后一句话上)
结束当前会话(此处不再追加任何结束标记,仅更新状态)。
"""
with self.lock:
if username not in self.user_states:
util.log(1, f"警告: 尝试结束不存在的会话 [{username}]")
util.log(1, f"警告尝试结束一个不存在的会话 [{username}]")
return ""
state_info = self.user_states[username]
if conversation_id is not None and state_info.get("conversation_id") != conversation_id:
util.log(1, f"警告end_session 会话不一致,当前={state_info.get('conversation_id')},传入={conversation_id}")
return ""
# 标记会话为完成状态
if state_info['state'] != StreamState.COMPLETED:
state_info['state'] = StreamState.COMPLETED
if state_info["state"] != StreamState.COMPLETED:
state_info["state"] = StreamState.COMPLETED
# 若未发送过结束标记,给出警告日志
if not state_info["is_end_sent"]:
util.log(1, "警告:本次会话未发送结束标记即已结束(可能存在异常路径)")
return ""
session_duration = time.time() - state_info['start_time']
# 检查是否已经发送过结束标记
if not state_info['is_end_sent']:
util.log(1, f"警告: 会话结束但未发送过结束标记,可能存在逻辑问题")
return "" # 不再返回单独的_<isend>标记
def get_session_info(self, username):
"""
获取用户的会话信息
参数:
username: 用户名
返回:
dict: 会话信息
"""
def get_session_info(self, username, conversation_id=None):
with self.lock:
if username in self.user_states:
return self.user_states[username].copy()
return None
def is_session_active(self, username):
"""
检查用户是否有活跃的会话
参数:
username: 用户名
返回:
bool: 是否有活跃会话
"""
info = self.user_states.get(username)
if not info:
return None
if conversation_id is not None and info.get("conversation_id") != conversation_id:
return None
return info.copy()
def is_session_active(self, username, conversation_id=None):
with self.lock:
if username not in self.user_states:
info = self.user_states.get(username)
if not info:
return False
state_info = self.user_states[username]
return state_info['state'] not in [StreamState.COMPLETED]
if conversation_id is not None and info.get("conversation_id") != conversation_id:
return False
return info["state"] != StreamState.COMPLETED
def cleanup_expired_sessions(self, timeout_seconds=300):
"""
清理过期的会话
参数:
timeout_seconds: 超时时间(秒)
清理超时未更新的会话,并与 StreamManager 协同释放资源(避免死锁)。
"""
# 1) 在持有自身锁时快照需要清理的会话项
with self.lock:
current_time = time.time()
expired_users = []
now = time.time()
expired = [] # [(username, conversation_id, state)]
for username, state_info in self.user_states.items():
if current_time - state_info['last_update'] > timeout_seconds:
expired_users.append(username)
for username in expired_users:
util.log(1, f"清理过期会话: {self.user_states[username]['session_id']}")
del self.user_states[username]
if now - state_info["last_update"] > timeout_seconds:
expired.append(
(
username,
state_info.get("conversation_id", ""),
state_info.get("state", None),
)
)
# 2) 释放自身锁后调用 StreamManager避免锁顺序反转导致死锁
sm = None
try:
from core import stream_manager
sm = stream_manager.new_instance()
except Exception as e:
util.log(1, f"清理过期会话:无法获取 StreamManager仅删除状态。错误={e}")
for username, state_cid, _ in expired:
try:
if sm is not None:
current_cid = sm.get_conversation_id(username)
# 仅在会话ID一致时清理流与音频避免误清刚切换的新会话
if (state_cid or "") == (current_cid or ""):
sm.clear_Stream_with_audio(username)
else:
util.log(1, f"跳过流清理会话ID不一致user={username}, state_cid={state_cid}, current_cid={current_cid}")
except Exception as e:
util.log(1, f"清理过期会话:清理 StreamManager 资源时出错 user={username}: {e}")
# 3) 最后在锁内删除仍然指向相同会话的状态
with self.lock:
for username, state_cid, _ in expired:
info = self.user_states.get(username)
if info and (info.get("conversation_id", "") == (state_cid or "")):
util.log(1, f"已清理过期会话:{state_cid}")
del self.user_states[username]
def force_reset_user_state(self, username):
"""
强制重置用户状态(用于异常恢复)
参数:
username: 用户名
强制重置用户状态,并安全清理 StreamManager 侧资源(避免死锁)。
"""
# 在锁内读取当前会话ID
cid = None
with self.lock:
if username in self.user_states:
old_session = self.user_states[username]['session_id']
del self.user_states[username]
util.log(1, f"强制重置用户状态: {username}, 旧会话: {old_session}")
def get_all_active_sessions(self):
"""
获取所有活跃会话的信息
返回:
dict: 用户名 -> 会话信息
"""
with self.lock:
active_sessions = {}
for username, state_info in self.user_states.items():
if state_info['state'] != StreamState.COMPLETED:
active_sessions[username] = state_info.copy()
return active_sessions
cid = self.user_states[username].get("conversation_id", "")
# 在不持有本地锁的情况下清理 StreamManager 资源
try:
from core import stream_manager
sm = stream_manager.new_instance()
sm.clear_Stream_with_audio(username)
except Exception as e:
util.log(1, f"强制重置:清理 StreamManager 资源失败 {username}: {e}")
# 若状态仍然匹配该会话,再次加锁后删除
with self.lock:
if username in self.user_states:
# 如果 cid 为空/相同则删除;否则视为已切换到新会话,跳过删除
cur_cid = self.user_states[username].get("conversation_id", "")
if (not cid) or (cur_cid == cid):
del self.user_states[username]
util.log(1, f"已强制重置用户状态 {username}, 会话 {cid}")
def get_all_active_sessions(self):
"""获取所有未标记为 COMPLETED 的活动会话信息(浅拷贝)。"""
with self.lock:
active = {}
for username, state_info in self.user_states.items():
if state_info["state"] != StreamState.COMPLETED:
active[username] = state_info.copy()
return active
# 全局单例实例
_state_manager_instance = None
_state_manager_lock = threading.Lock()
def get_state_manager():
"""
获取流式状态管理器单例
返回:
StreamStateManager: 状态管理器实例
"""
"""获取全局 StreamStateManager 单例实例(线程安全惰性初始化)。"""
global _state_manager_instance
if _state_manager_instance is None:
with _state_manager_lock:
@@ -225,24 +244,21 @@ def get_state_manager():
_state_manager_instance = StreamStateManager()
return _state_manager_instance
# 定时清理过期会话的线程
def start_cleanup_thread():
"""
启动定时清理线程
"""
import threading
"""启动定时清理线程,周期性清理超时会话。"""
def cleanup_worker():
while True:
try:
time.sleep(60) # 每分钟清理一次
time.sleep(60)
get_state_manager().cleanup_expired_sessions()
except Exception as e:
util.log(1, f"清理过期会话时出错: {str(e)}")
util.log(1, f"清理过期会话时发生错误: {str(e)}")
cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True)
cleanup_thread.start()
util.log(1, "流式状态管理器清理线程已启动")
# 自动启动清理线程
start_cleanup_thread()

View File

@@ -1,4 +1,4 @@
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
import time
from utils import util
from core import stream_manager
@@ -44,15 +44,16 @@ class StreamTextProcessor:
if not text or not text.strip():
return True
# 获取状态管理器并开始新会话(若未开始)
state_manager = get_state_manager()
if not state_manager.is_session_active(username):
state_manager.start_new_session(username, session_type)
# 捕获本次流式处理对应的会话ID用于精确隔离
sm = stream_manager.new_instance()
conversation_id = sm.get_conversation_id(username)
# 获取状态管理器并开始新会话(若未开始或会话不匹配则对齐)
state_manager = get_state_manager()
session_info = state_manager.get_session_info(username)
if (not session_info) or (session_info.get('conversation_id') != conversation_id):
state_manager.start_new_session(username, session_type, conversation_id=conversation_id)
try:
return self._safe_process_text(text, username, is_qa, state_manager, conversation_id)
except Exception as e:
@@ -110,6 +111,7 @@ class StreamTextProcessor:
sentence_text,
force_first=(not first_sentence_sent), # 第一段 True其它 False
force_end=False,
conversation_id=conversation_id,
)
success = stream_manager.new_instance().write_sentence(
@@ -134,6 +136,7 @@ class StreamTextProcessor:
accumulated_text,
force_first=(not first_sentence_sent), # 如果还没发送过句子,这是第一段
force_end=True,
conversation_id=conversation_id,
)
stream_manager.new_instance().write_sentence(
username, marked_text, conversation_id=conversation_id
@@ -142,7 +145,7 @@ class StreamTextProcessor:
elif not first_sentence_sent:
# 如果整个文本都没有找到合适的分割点,作为完整句子发送
marked_text, _, _ = state_manager.prepare_sentence(
username, text, force_first=True, force_end=True
username, text, force_first=True, force_end=True, conversation_id=conversation_id
)
stream_manager.new_instance().write_sentence(
username, marked_text, conversation_id=conversation_id
@@ -152,14 +155,14 @@ class StreamTextProcessor:
session_info = state_manager.get_session_info(username)
if session_info and not session_info.get("is_end_sent", False):
marked_text, _, _ = state_manager.prepare_sentence(
username, "", force_first=False, force_end=True
username, "", force_first=False, force_end=True, conversation_id=conversation_id
)
stream_manager.new_instance().write_sentence(
username, marked_text, conversation_id=conversation_id
)
# 结束会话
state_manager.end_session(username)
state_manager.end_session(username, conversation_id=conversation_id)
# 记录处理统计
if iteration_count >= self.max_iterations:
@@ -194,7 +197,7 @@ class StreamTextProcessor:
try:
# 使用状态管理器准备完整文本
marked_text, _, _ = state_manager.prepare_sentence(
username, text, force_first=True, force_end=True
username, text, force_first=True, force_end=True, conversation_id=conversation_id
)
stream_manager.new_instance().write_sentence(
username, marked_text, conversation_id=conversation_id
@@ -216,4 +219,3 @@ def get_processor():
if _processor_instance is None:
_processor_instance = StreamTextProcessor()
return _processor_instance