diff --git a/core/fay_core.py b/core/fay_core.py index cd3a79f..9c84723 100644 --- a/core/fay_core.py +++ b/core/fay_core.py @@ -1,843 +1,850 @@ -# -*- coding: utf-8 -*- -#作用是处理交互逻辑,文字输入,语音、文字及情绪的发送、播放及展示输出 -import math -from operator import index -import os -import time -import socket -import requests -from pydub import AudioSegment -from queue import Queue -import re # 添加正则表达式模块用于过滤表情符号 -import uuid - -# 适应模型使用 -import numpy as np -from ai_module import baidu_emotion -from core import wsa_server -from core.interact import Interact -from tts.tts_voice import EnumVoice -from scheduler.thread_manager import MyThread -from tts import tts_voice -from utils import util, config_util -from core import qa_service -from utils import config_util as cfg -from core import content_db -from ai_module import nlp_cemotion -from core import stream_manager - -from core import member_db -import threading - -#加载配置 -cfg.load_config() -if cfg.tts_module =='ali': - from tts.ali_tss import Speech -elif cfg.tts_module == 'gptsovits': - from tts.gptsovits import Speech -elif cfg.tts_module == 'gptsovits_v3': - from tts.gptsovits_v3 import Speech -elif cfg.tts_module == 'volcano': - from tts.volcano_tts import Speech -else: - from tts.ms_tts_sdk import Speech - -#windows运行推送唇形数据 -import platform -if platform.system() == "Windows": - import sys - sys.path.append("test/ovr_lipsync") - from test_olipsync import LipSyncGenerator - - -#可以使用自动播报的标记 -can_auto_play = True -auto_play_lock = threading.RLock() - -class FeiFei: - def __init__(self): - self.lock = threading.Lock() - self.nlp_streams = {} # 存储用户ID到句子缓存的映射 - self.nlp_stream_lock = threading.Lock() # 保护nlp_streams字典的锁 - self.mood = 0.0 # 情绪值 - self.old_mood = 0.0 - self.item_index = 0 - self.X = np.array([1, 0, 0, 0, 0, 0, 0, 0]).reshape(1, -1) # 适应模型变量矩阵 - # self.W = np.array([0.01577594,1.16119452,0.75828,0.207746,1.25017864,0.1044121,0.4294899,0.2770932]).reshape(-1,1) #适应模型变量矩阵 - self.W = np.array([0.0, 0.6, 0.1, 0.7, 0.3, 0.0, 0.0, 0.0]).reshape(-1, 1) # 适应模型变量矩阵 - - self.wsParam = None - self.wss = None - self.sp = Speech() - self.speaking = False #声音是否在播放 - self.__running = True - self.sp.connect() #TODO 预连接 - - self.timer = None - self.sound_query = Queue() - self.think_mode_users = {} # 使用字典存储每个用户的think模式状态 - self.think_time_users = {} #使用字典存储每个用户的think开始时间 - self.user_conv_map = {} #存储用户对话id及句子流序号,key为(username, conversation_id) - self.pending_isfirst = {} # 存储因prestart被过滤而延迟的isfirst标记,key为username - - def __remove_emojis(self, text): - """ - 改进的表情包过滤,避免误删除正常Unicode字符 - """ - # 更精确的emoji范围,避免误删除正常字符 - emoji_pattern = re.compile( - "[" - "\U0001F600-\U0001F64F" # 表情符号 (Emoticons) - "\U0001F300-\U0001F5FF" # 杂项符号和象形文字 (Miscellaneous Symbols and Pictographs) - "\U0001F680-\U0001F6FF" # 交通和地图符号 (Transport and Map Symbols) - "\U0001F1E0-\U0001F1FF" # 区域指示符号 (Regional Indicator Symbols) - "\U0001F900-\U0001F9FF" # 补充符号和象形文字 (Supplemental Symbols and Pictographs) - "\U0001FA70-\U0001FAFF" # 扩展A符号和象形文字 (Symbols and Pictographs Extended-A) - "\U00002600-\U000026FF" # 杂项符号 (Miscellaneous Symbols) - "\U00002700-\U000027BF" # 装饰符号 (Dingbats) - "\U0000FE00-\U0000FE0F" # 变体选择器 (Variation Selectors) - "\U0001F000-\U0001F02F" # 麻将牌 (Mahjong Tiles) - "\U0001F0A0-\U0001F0FF" # 扑克牌 (Playing Cards) - "]+", - flags=re.UNICODE, - ) - - # 保护常用的中文标点符号和特殊字符 - protected_chars = ["。", ",", "!", "?", ":", ";", "、", """, """, "'", "'", "(", ")", "【", "】", "《", "》"] - - # 先保存保护字符的位置 - protected_positions = {} - for i, char in enumerate(text): - if char in protected_chars: - protected_positions[i] = char - - # 执行emoji过滤 - filtered_text = emoji_pattern.sub('', text) - - # 如果过滤后文本长度变化太大,可能误删了正常字符,返回原文本 - if len(filtered_text) < len(text) * 0.5: # 如果删除了超过50%的内容 - return text - - return filtered_text - - def __process_stream_output(self, text, username, session_type="type2_stream", is_qa=False): - """ - 按流式方式分割和发送 type=2 的文本 - 使用安全的流式文本处理器和状态管理器 - """ - if not text or text.strip() == "": - return - - # 使用安全的流式文本处理器 - from utils.stream_text_processor import get_processor - from utils.stream_state_manager import get_state_manager - - processor = get_processor() - state_manager = get_state_manager() - - # 处理流式文本,is_qa=False表示普通模式 - success = processor.process_stream_text(text, username, is_qa=is_qa, session_type=session_type) - - if success: - # 普通模式结束会话 - state_manager.end_session(username, conversation_id=stream_manager.new_instance().get_conversation_id(username)) - else: - util.log(1, f"type=2流式处理失败,文本长度: {len(text)}") - # 失败时也要确保结束会话 - state_manager.force_reset_user_state(username) - - #语音消息处理检查是否命中q&a - def __get_answer(self, interleaver, text): - answer = None - # 全局问答 - answer, type = qa_service.QAService().question('qa',text) - if answer is not None: - return answer, type - else: - return None, None - - - #消息处理 - def __process_interact(self, interact: Interact): - if self.__running: - try: - index = interact.interact_type - username = interact.data.get("user", "User") - uid = member_db.new_instance().find_user(username) - - if index == 1: #语音、文字交互 - - #记录用户问题,方便obs等调用 - self.write_to_file("./logs", "asr_result.txt", interact.data["msg"]) - - #同步用户问题到数字人 - if wsa_server.get_instance().is_connected(username): - content = {'Topic': 'human', 'Data': {'Key': 'question', 'Value': interact.data["msg"]}, 'Username' : interact.data.get("user")} - wsa_server.get_instance().add_cmd(content) - - #记录用户问题 - content_id = content_db.new_instance().add_content('member','speak',interact.data["msg"], username, uid) - if wsa_server.get_web_instance().is_connected(username): - wsa_server.get_web_instance().add_cmd({"panelReply": {"type":"member","content":interact.data["msg"], "username":username, "uid":uid, "id":content_id}, "Username" : username}) - - #确定是否命中q&a - answer, type = self.__get_answer(interact.interleaver, interact.data["msg"]) - - #大语言模型回复 - text = '' - if answer is None or type != "qa": - if wsa_server.get_web_instance().is_connected(username): - wsa_server.get_web_instance().add_cmd({"panelMsg": "思考中...", "Username" : username, 'robot': f'{cfg.fay_url}/robot/Thinking.jpg'}) - if wsa_server.get_instance().is_connected(username): - content = {'Topic': 'human', 'Data': {'Key': 'log', 'Value': "思考中..."}, 'Username' : username, 'robot': f'{cfg.fay_url}/robot/Thinking.jpg'} - wsa_server.get_instance().add_cmd(content) - - # 根据配置动态调用不同的NLP模块 - if cfg.config["memory"].get("use_bionic_memory", False): - from llm import nlp_bionicmemory_stream - text = nlp_bionicmemory_stream.question(interact.data["msg"], username, interact.data.get("observation", None)) - else: - from llm import nlp_cognitive_stream - text = nlp_cognitive_stream.question(interact.data["msg"], username, interact.data.get("observation", None)) - - else: - text = answer - # 使用流式分割处理Q&A答案 - self.__process_stream_output(text, username, session_type="qa", is_qa=True) - - - return text - - elif (index == 2):#透传模式:有音频则仅播音频;仅文本则流式+TTS - audio_url = interact.data.get("audio") - text = interact.data.get("text") - - # 1) 存在音频:忽略文本,仅播放音频 - if audio_url and str(audio_url).strip(): - try: - audio_interact = Interact( - "stream", 1, - {"user": username, "msg": "", "isfirst": True, "isend": True, "audio": audio_url} - ) - self.say(audio_interact, "") - except Exception: - pass - return 'success' - - # 2) 只有文本:执行流式切分并TTS - if text and str(text).strip(): - # 进行流式处理(用于TTS,流式处理中会记录到数据库) - self.__process_stream_output(text, username, f"type2_{interact.interleaver}", is_qa=False) - - # 不再需要额外记录,因为流式处理已经记录了 - # self.__process_text_output(text, username, uid) - - return 'success' - - # 没有有效内容 - return 'success' - - except BaseException as e: - print(e) - return e - else: - return "还没有开始运行" - - #记录问答到log - def write_to_file(self, path, filename, content): - if not os.path.exists(path): - os.makedirs(path) - full_path = os.path.join(path, filename) - with open(full_path, 'w', encoding='utf-8') as file: - file.write(content) - file.flush() - os.fsync(file.fileno()) - - #触发交互 - def on_interact(self, interact: Interact): - #创建用户 - username = interact.data.get("user", "User") - if member_db.new_instance().is_username_exist(username) == "notexists": - member_db.new_instance().add_user(username) - try: - from utils.stream_state_manager import get_state_manager - import uuid - if get_state_manager().is_session_active(username): - stream_manager.new_instance().clear_Stream_with_audio(username) - conv_id = "conv_" + str(uuid.uuid4()) - stream_manager.new_instance().set_current_conversation(username, conv_id) - # 将当前会话ID附加到交互数据 - interact.data["conversation_id"] = conv_id - # 允许新的生成 - stream_manager.new_instance().set_stop_generation(username, stop=False) - except Exception: - util.log(3, "开启新会话失败") - - if interact.interact_type == 1: - MyThread(target=self.__process_interact, args=[interact]).start() - else: - return self.__process_interact(interact) - - #获取不同情绪声音 - def __get_mood_voice(self): - voice = tts_voice.get_voice_of(config_util.config["attribute"]["voice"]) - if voice is None: - voice = EnumVoice.XIAO_XIAO - styleList = voice.value["styleList"] - sayType = styleList["calm"] - return sayType - - # 合成声音 - def say(self, interact, text, type = ""): - try: - uid = member_db.new_instance().find_user(interact.data.get("user")) - is_end = interact.data.get("isend", False) - is_first = interact.data.get("isfirst", False) - username = interact.data.get("user", "User") - - # 提前进行会话有效性与中断检查,避免产生多余面板/数字人输出 - try: - user_for_stop = interact.data.get("user", "User") - conv_id_for_stop = interact.data.get("conversation_id") - if not is_end and stream_manager.new_instance().should_stop_generation(user_for_stop, conversation_id=conv_id_for_stop): - return None - except Exception: - pass - - #无效流式文本提前结束 - if not is_first and not is_end and (text is None or text.strip() == ""): - return None - - # 检查是否是 prestart 内容(不应该影响 thinking 状态) - is_prestart_content = self.__has_prestart(text) - - # 流式文本拼接存库 - content_id = 0 - # 使用 (username, conversation_id) 作为 key,避免并发会话覆盖 - conv = interact.data.get("conversation_id") or "" - conv_map_key = (username, conv) - - if is_first == True: - # reset any leftover think-mode at the start of a new reply - # 但如果是 prestart 内容,不重置 thinking 状态 - try: - if uid is not None and not is_prestart_content: - self.think_mode_users[uid] = False - if uid in self.think_time_users: - del self.think_time_users[uid] - except Exception: - pass - # 如果没有 conversation_id,生成一个新的 - if not conv: - conv = "conv_" + str(uuid.uuid4()) - conv_map_key = (username, conv) - conv_no = 0 - # 创建第一条数据库记录,获得content_id - if text and text.strip(): - content_id = content_db.new_instance().add_content('fay', 'speak', text, username, uid) - else: - content_id = content_db.new_instance().add_content('fay', 'speak', '', username, uid) - - # 保存content_id到会话映射中,使用 (username, conversation_id) 作为 key - self.user_conv_map[conv_map_key] = { - "conversation_id": conv, - "conversation_msg_no": conv_no, - "content_id": content_id - } - util.log(1, f"流式会话开始: key={conv_map_key}, content_id={content_id}") - else: - # 获取之前保存的content_id - conv_info = self.user_conv_map.get(conv_map_key, {}) - content_id = conv_info.get("content_id", 0) - - # 如果 conv_map_key 不存在,尝试使用 username 作为备用查找 - if not conv_info and text and text.strip(): - # 查找所有匹配用户名的会话 - for (u, c), info in list(self.user_conv_map.items()): - if u == username and info.get("content_id", 0) > 0: - content_id = info.get("content_id", 0) - conv_info = info - util.log(1, f"警告:使用备用会话 ({u}, {c}) 的 content_id={content_id},原 key=({username}, {conv})") - break - - if conv_info: - conv_info["conversation_msg_no"] = conv_info.get("conversation_msg_no", 0) + 1 - - # 如果有新内容,更新数据库 - if content_id > 0 and text and text.strip(): - # 获取当前已有内容 - existing_content = content_db.new_instance().get_content_by_id(content_id) - if existing_content: - # 累积内容 - accumulated_text = existing_content[3] + text - content_db.new_instance().update_content(content_id, accumulated_text) - elif content_id == 0 and text and text.strip(): - # content_id 为 0 表示可能会话 key 不匹配,记录警告 - util.log(1, f"警告:content_id=0,无法更新数据库。user={username}, conv={conv}, text片段={text[:50] if len(text) > 50 else text}") - - # 会话结束时清理 user_conv_map 中的对应条目,避免内存泄漏 - if is_end and conv_map_key in self.user_conv_map: - del self.user_conv_map[conv_map_key] - - # 推送给前端和数字人 - try: - user_for_stop = interact.data.get("user", "User") - conv_id_for_stop = interact.data.get("conversation_id") - if is_end or not stream_manager.new_instance().should_stop_generation(user_for_stop, conversation_id=conv_id_for_stop): - self.__process_text_output(text, interact.data.get('user'), uid, content_id, type, is_first, is_end) - except Exception: - self.__process_text_output(text, interact.data.get('user'), uid, content_id, type, is_first, is_end) - - # 处理think标签 - # 第一步:处理结束标记 - if "" in text: - # 设置用户退出思考模式 - self.think_mode_users[uid] = False - - # 分割文本,提取后面的内容 - # 如果有多个,我们只关心最后一个后面的内容 - parts = text.split("") - text = parts[-1].strip() - - # 如果提取出的文本为空,则不需要继续处理 - if text == "": - return None - # 第二步:处理开始标记 - # 注意:这里要检查经过上面处理后的text - if "" in text: - self.think_mode_users[uid] = True - self.think_time_users[uid] = time.time() - - #”思考中“的输出 - if self.think_mode_users.get(uid, False): - try: - user_for_stop = interact.data.get("user", "User") - conv_id_for_stop = interact.data.get("conversation_id") - should_block = stream_manager.new_instance().should_stop_generation(user_for_stop, conversation_id=conv_id_for_stop) - except Exception: - should_block = False - if not should_block: - if wsa_server.get_web_instance().is_connected(interact.data.get('user')): - wsa_server.get_web_instance().add_cmd({"panelMsg": "思考中...", "Username" : interact.data.get('user'), 'robot': f'{cfg.fay_url}/robot/Thinking.jpg'}) - if wsa_server.get_instance().is_connected(interact.data.get("user")): - content = {'Topic': 'human', 'Data': {'Key': 'log', 'Value': "思考中..."}, 'Username' : interact.data.get('user'), 'robot': f'{cfg.fay_url}/robot/Thinking.jpg'} - wsa_server.get_instance().add_cmd(content) - - #”请稍等“的音频输出(不影响文本输出) - if self.think_mode_users.get(uid, False) == True and time.time() - self.think_time_users[uid] >= 5: - self.think_time_users[uid] = time.time() - text = "请稍等..." - elif self.think_mode_users.get(uid, False) == True and "" not in text: - return None - - result = None - audio_url = interact.data.get('audio', None)#透传的音频 - - # 移除 prestart 标签内容,不进行TTS - tts_text = self.__remove_prestart_tags(text) if text else text - - if audio_url is not None:#透传音频下载 - file_name = 'sample-' + str(int(time.time() * 1000)) + audio_url[-4:] - result = self.download_wav(audio_url, './samples/', file_name) - elif config_util.config["interact"]["playSound"] or wsa_server.get_instance().get_client_output(interact.data.get("user")) or self.__is_send_remote_device_audio(interact):#tts - if tts_text != None and tts_text.replace("*", "").strip() != "": - # 检查是否需要停止TTS处理(按会话) - if stream_manager.new_instance().should_stop_generation( - interact.data.get("user", "User"), - conversation_id=interact.data.get("conversation_id") - ): - util.printInfo(1, interact.data.get('user'), 'TTS处理被打断,跳过音频合成') - return None - - # 先过滤表情符号,然后再合成语音 - filtered_text = self.__remove_emojis(tts_text.replace("*", "")) - if filtered_text is not None and filtered_text.strip() != "": - util.printInfo(1, interact.data.get('user'), '合成音频...') - tm = time.time() - result = self.sp.to_sample(filtered_text, self.__get_mood_voice()) - # 合成完成后再次检查会话是否仍有效,避免继续输出旧会话结果 - try: - user_for_stop = interact.data.get("user", "User") - conv_id_for_stop = interact.data.get("conversation_id") - if stream_manager.new_instance().should_stop_generation(user_for_stop, conversation_id=conv_id_for_stop): - return None - except Exception: - pass - util.printInfo(1, interact.data.get("user"), "合成音频完成. 耗时: {} ms 文件:{}".format(math.floor((time.time() - tm) * 1000), result)) - else: - # prestart 内容不应该触发机器人表情重置 - if is_end and not is_prestart_content and wsa_server.get_web_instance().is_connected(interact.data.get('user')): - wsa_server.get_web_instance().add_cmd({"panelMsg": "", 'Username' : interact.data.get('user'), 'robot': f'{cfg.fay_url}/robot/Normal.jpg'}) - - if result is not None or is_first or is_end: - # prestart 内容不需要进入音频处理流程 - if is_prestart_content: - return result - if is_end:#TODO 临时方案:如果结束标记,则延迟1秒处理,免得is end比前面的音频tts要快 - time.sleep(1) - MyThread(target=self.__process_output_audio, args=[result, interact, text]).start() - return result - - except BaseException as e: - print(e) - return None - - #下载wav - def download_wav(self, url, save_directory, filename): - try: - # 发送HTTP GET请求以获取WAV文件内容 - response = requests.get(url, stream=True) - response.raise_for_status() # 检查请求是否成功 - - # 确保保存目录存在 - if not os.path.exists(save_directory): - os.makedirs(save_directory) - - # 构建保存文件的路径 - save_path = os.path.join(save_directory, filename) - - # 将WAV文件内容保存到指定文件 - with open(save_path, 'wb') as f: - for chunk in response.iter_content(chunk_size=1024): - if chunk: - f.write(chunk) - - return save_path - except requests.exceptions.RequestException as e: - print(f"[Error] Failed to download file: {e}") - return None - - - #面板播放声音 - def __play_sound(self): - try: - import pygame - pygame.mixer.init() # 初始化pygame.mixer,只需要在此处初始化一次, 如果初始化失败,则不播放音频 - except Exception as e: - util.printInfo(1, "System", "音频播放初始化失败,本机无法播放音频") - return - - while self.__running: - time.sleep(0.01) - if not self.sound_query.empty(): # 如果队列不为空则播放音频 - file_url, audio_length, interact = self.sound_query.get() - - is_first = interact.data.get('isfirst') is True - is_end = interact.data.get('isend') is True - - - - if file_url is not None: - util.printInfo(1, interact.data.get('user'), '播放音频...') - - if is_first: - self.speaking = True - elif not is_end: - self.speaking = True - - #自动播报关闭 - global auto_play_lock - global can_auto_play - with auto_play_lock: - if self.timer is not None: - self.timer.cancel() - self.timer = None - can_auto_play = False - - if wsa_server.get_web_instance().is_connected(interact.data.get('user')): - wsa_server.get_web_instance().add_cmd({"panelMsg": "播放中 ...", "Username" : interact.data.get('user'), 'robot': f'{cfg.fay_url}/robot/Speaking.jpg'}) - - if file_url is not None: - pygame.mixer.music.load(file_url) - pygame.mixer.music.play() - - # 播放过程中计时,直到音频播放完毕 - length = 0 - while length < audio_length: - try: - user_for_stop = interact.data.get("user", "User") - conv_id_for_stop = interact.data.get("conversation_id") - if stream_manager.new_instance().should_stop_generation(user_for_stop, conversation_id=conv_id_for_stop): - try: - pygame.mixer.music.stop() - except Exception: - pass - break - except Exception: - pass - length += 0.01 - time.sleep(0.01) - - if is_end: - self.play_end(interact) - - if wsa_server.get_web_instance().is_connected(interact.data.get('user')): - wsa_server.get_web_instance().add_cmd({"panelMsg": "", "Username" : interact.data.get('user'), 'robot': f'{cfg.fay_url}/robot/Normal.jpg'}) - # 播放完毕后通知 - if wsa_server.get_web_instance().is_connected(interact.data.get("user")): - wsa_server.get_web_instance().add_cmd({"panelMsg": "", 'Username': interact.data.get('user')}) - - #推送远程音频 - def __send_remote_device_audio(self, file_url, interact): - if file_url is None: - return - delkey = None - for key, value in fay_booter.DeviceInputListenerDict.items(): - if value.username == interact.data.get("user") and value.isOutput: #按username选择推送,booter.devicelistenerdice按用户名记录 - try: - value.deviceConnector.send(b"\x00\x01\x02\x03\x04\x05\x06\x07\x08") # 发送音频开始标志,同时也检查设备是否在线 - wavfile = open(os.path.abspath(file_url), "rb") - data = wavfile.read(102400) - total = 0 - while data: - total += len(data) - value.deviceConnector.send(data) - data = wavfile.read(102400) - time.sleep(0.0001) - value.deviceConnector.send(b'\x08\x07\x06\x05\x04\x03\x02\x01\x00')# 发送音频结束标志 - util.printInfo(1, value.username, "远程音频发送完成:{}".format(total)) - except socket.error as serr: - util.printInfo(1, value.username, "远程音频输入输出设备已经断开:{}".format(key)) - value.stop() - delkey = key - if delkey: - value = fay_booter.DeviceInputListenerDict.pop(delkey) - if wsa_server.get_web_instance().is_connected(interact.data.get('user')): - wsa_server.get_web_instance().add_cmd({"remote_audio_connect": False, "Username" : interact.data.get('user')}) - - def __is_send_remote_device_audio(self, interact): - for key, value in fay_booter.DeviceInputListenerDict.items(): - if value.username == interact.data.get("user") and value.isOutput: - return True - return False - - #输出音频处理 - def __process_output_audio(self, file_url, interact, text): - try: - # 会话有效性与中断检查(最早返回,避免向面板/数字人发送任何旧会话输出) - try: - user_for_stop = interact.data.get("user", "User") - conv_id_for_stop = interact.data.get("conversation_id") - if stream_manager.new_instance().should_stop_generation(user_for_stop, conversation_id=conv_id_for_stop): - return - except Exception: - pass - try: - if file_url is None: - audio_length = 0 - elif file_url.endswith('.wav'): - audio = AudioSegment.from_wav(file_url) - audio_length = len(audio) / 1000.0 # 时长以秒为单位 - elif file_url.endswith('.mp3'): - audio = AudioSegment.from_mp3(file_url) - audio_length = len(audio) / 1000.0 # 时长以秒为单位 - except Exception as e: - audio_length = 3 - - #推送远程音频 - if file_url is not None: - MyThread(target=self.__send_remote_device_audio, args=[file_url, interact]).start() - - #发送音频给数字人接口 - if file_url is not None and wsa_server.get_instance().get_client_output(interact.data.get("user")): - # 使用 (username, conversation_id) 作为 key 获取会话信息 - audio_username = interact.data.get("user", "User") - audio_conv_id = interact.data.get("conversation_id") or "" - audio_conv_info = self.user_conv_map.get((audio_username, audio_conv_id), {}) - content = {'Topic': 'human', 'Data': {'Key': 'audio', 'Value': os.path.abspath(file_url), 'HttpValue': f'{cfg.fay_url}/audio/' + os.path.basename(file_url), 'Text': text, 'Time': audio_length, 'Type': interact.interleaver, 'IsFirst': 1 if interact.data.get("isfirst", False) else 0, 'IsEnd': 1 if interact.data.get("isend", False) else 0, 'CONV_ID' : audio_conv_info.get("conversation_id", ""), 'CONV_MSG_NO' : audio_conv_info.get("conversation_msg_no", 0) }, 'Username' : interact.data.get('user'), 'robot': f'{cfg.fay_url}/robot/Speaking.jpg'} - #计算lips - if platform.system() == "Windows": - try: - lip_sync_generator = LipSyncGenerator() - viseme_list = lip_sync_generator.generate_visemes(os.path.abspath(file_url)) - consolidated_visemes = lip_sync_generator.consolidate_visemes(viseme_list) - content["Data"]["Lips"] = consolidated_visemes - except Exception as e: - print(e) - util.printInfo(1, interact.data.get("user"), "唇型数据生成失败") - wsa_server.get_instance().add_cmd(content) - util.printInfo(1, interact.data.get("user"), "数字人接口发送音频数据成功") - - #面板播放 - config_util.load_config() - # 检查是否是 prestart 内容 - is_prestart = self.__has_prestart(text) - if config_util.config["interact"]["playSound"]: - # prestart 内容不应该进入播放队列,避免触发 Normal 状态 - if not is_prestart: - self.sound_query.put((file_url, audio_length, interact)) - else: - # prestart 内容不应该重置机器人表情 - if not is_prestart and wsa_server.get_web_instance().is_connected(interact.data.get('user')): - wsa_server.get_web_instance().add_cmd({"panelMsg": "", 'Username' : interact.data.get('user'), 'robot': f'{cfg.fay_url}/robot/Normal.jpg'}) - - except Exception as e: - print(e) - - def play_end(self, interact): - self.speaking = False - global can_auto_play - global auto_play_lock - with auto_play_lock: - if self.timer: - self.timer.cancel() - self.timer = None - if interact.interleaver != 'auto_play': #交互后暂停自动播报30秒 - self.timer = threading.Timer(30, self.set_auto_play) - self.timer.start() - else: - can_auto_play = True - - #恢复自动播报(如果有) - def set_auto_play(self): - global auto_play_lock - global can_auto_play - with auto_play_lock: - can_auto_play = True - self.timer = None - - #启动核心服务 - def start(self): - MyThread(target=self.__play_sound).start() - - #停止核心服务 - def stop(self): - self.__running = False - self.speaking = False - self.sp.close() - wsa_server.get_web_instance().add_cmd({"panelMsg": ""}) - content = {'Topic': 'human', 'Data': {'Key': 'log', 'Value': ""}} - wsa_server.get_instance().add_cmd(content) - - def __record_response(self, text, username, uid): - """ - 记录AI的回复内容 - :param text: 回复文本 - :param username: 用户名 - :param uid: 用户ID - :return: content_id - """ - self.write_to_file("./logs", "answer_result.txt", text) - return content_db.new_instance().add_content('fay', 'speak', text, username, uid) - - def __remove_prestart_tags(self, text): - """ - 移除文本中的 prestart 标签及其内容 - :param text: 原始文本 - :return: 移除 prestart 标签后的文本 - """ - if not text: - return text - import re - # 移除 ... 标签及其内容(支持属性) - cleaned = re.sub(r']*>[\s\S]*?', '', text, flags=re.IGNORECASE) - return cleaned.strip() - - def __has_prestart(self, text): - """ - 判断文本中是否包含 prestart 标签(支持属性) - """ - if not text: - return False - return re.search(r']*>[\s\S]*?', text, flags=re.IGNORECASE) is not None - - def __send_panel_message(self, text, username, uid, content_id=None, type=None): - """ - 发送消息到Web面板 - :param text: 消息文本 - :param username: 用户名 - :param uid: 用户ID - :param content_id: 内容ID - :param type: 消息类型 - """ - if not wsa_server.get_web_instance().is_connected(username): - return - - # 检查是否是 prestart 内容,prestart 内容不应该更新日志区消息 - # 因为这会覆盖掉"思考中..."的状态显示 - is_prestart = self.__has_prestart(text) - - # gui日志区消息(prestart 内容跳过,保持"思考中..."状态) - if not is_prestart: - wsa_server.get_web_instance().add_cmd({ - "panelMsg": text, - "Username": username - }) - - # 聊天窗消息 - if content_id is not None: - wsa_server.get_web_instance().add_cmd({ - "panelReply": { - "type": "fay", - "content": text, - "username": username, - "uid": uid, - "id": content_id, - "is_adopted": type == 'qa' - }, - "Username": username - }) - - def __send_digital_human_message(self, text, username, is_first=False, is_end=False): - """ - 发送消息到数字人(语音应该在say方法驱动数字人输出) - :param text: 消息文本 - :param username: 用户名 - :param is_first: 是否是第一段文本 - :param is_end: 是否是最后一段文本 - """ - # 移除 prestart 标签内容,不发送给数字人 - cleaned_text = self.__remove_prestart_tags(text) if text else "" - full_text = self.__remove_emojis(cleaned_text.replace("*", "")) if cleaned_text else "" - - # 如果文本为空且不是结束标记,则不发送,但需保留 is_first - if not full_text and not is_end: - if is_first: - self.pending_isfirst[username] = True - return - - # 检查是否有延迟的 is_first 需要应用 - if self.pending_isfirst.get(username, False): - is_first = True - self.pending_isfirst[username] = False - - if wsa_server.get_instance().is_connected(username): - content = { - 'Topic': 'human', - 'Data': { - 'Key': 'text', - 'Value': full_text, - 'IsFirst': 1 if is_first else 0, - 'IsEnd': 1 if is_end else 0 - }, - 'Username': username - } - wsa_server.get_instance().add_cmd(content) - - def __process_text_output(self, text, username, uid, content_id, type, is_first=False, is_end=False): - """ - 完整文本输出到各个终端 - :param text: 主要回复文本 - :param textlist: 额外回复列表 - :param username: 用户名 - :param uid: 用户ID - :param type: 消息类型 - :param is_first: 是否是第一段文本 - :param is_end: 是否是最后一段文本 - """ - if text: - text = text.strip() - - # 记录主回复 - # content_id = self.__record_response(text, username, uid) - - # 发送主回复到面板和数字人 - self.__send_panel_message(text, username, uid, content_id, type) - self.__send_digital_human_message(text, username, is_first, is_end) - - # 打印日志 - util.printInfo(1, username, '({}) {}'.format("llm", text)) - -import importlib -fay_booter = importlib.import_module('fay_booter') - +# -*- coding: utf-8 -*- +#作用是处理交互逻辑,文字输入,语音、文字及情绪的发送、播放及展示输出 +import math +from operator import index +import os +import time +import socket +import requests +from pydub import AudioSegment +from queue import Queue +import re # 添加正则表达式模块用于过滤表情符号 +import uuid + +# 适应模型使用 +import numpy as np +from ai_module import baidu_emotion +from core import wsa_server +from core.interact import Interact +from tts.tts_voice import EnumVoice +from scheduler.thread_manager import MyThread +from tts import tts_voice +from utils import util, config_util +from core import qa_service +from utils import config_util as cfg +from core import content_db +from ai_module import nlp_cemotion +from core import stream_manager + +from core import member_db +import threading + +#加载配置 +cfg.load_config() +if cfg.tts_module =='ali': + from tts.ali_tss import Speech +elif cfg.tts_module == 'gptsovits': + from tts.gptsovits import Speech +elif cfg.tts_module == 'gptsovits_v3': + from tts.gptsovits_v3 import Speech +elif cfg.tts_module == 'volcano': + from tts.volcano_tts import Speech +else: + from tts.ms_tts_sdk import Speech + +#windows运行推送唇形数据 +import platform +if platform.system() == "Windows": + import sys + sys.path.append("test/ovr_lipsync") + from test_olipsync import LipSyncGenerator + + +#可以使用自动播报的标记 +can_auto_play = True +auto_play_lock = threading.RLock() + +class FeiFei: + def __init__(self): + self.lock = threading.Lock() + self.nlp_streams = {} # 存储用户ID到句子缓存的映射 + self.nlp_stream_lock = threading.Lock() # 保护nlp_streams字典的锁 + self.mood = 0.0 # 情绪值 + self.old_mood = 0.0 + self.item_index = 0 + self.X = np.array([1, 0, 0, 0, 0, 0, 0, 0]).reshape(1, -1) # 适应模型变量矩阵 + # self.W = np.array([0.01577594,1.16119452,0.75828,0.207746,1.25017864,0.1044121,0.4294899,0.2770932]).reshape(-1,1) #适应模型变量矩阵 + self.W = np.array([0.0, 0.6, 0.1, 0.7, 0.3, 0.0, 0.0, 0.0]).reshape(-1, 1) # 适应模型变量矩阵 + + self.wsParam = None + self.wss = None + self.sp = Speech() + self.speaking = False #声音是否在播放 + self.__running = True + self.sp.connect() #TODO 预连接 + + self.timer = None + self.sound_query = Queue() + self.think_mode_users = {} # 使用字典存储每个用户的think模式状态 + self.think_time_users = {} #使用字典存储每个用户的think开始时间 + self.user_conv_map = {} #存储用户对话id及句子流序号,key为(username, conversation_id) + self.pending_isfirst = {} # 存储因prestart被过滤而延迟的isfirst标记,key为username + + def __remove_emojis(self, text): + """ + 改进的表情包过滤,避免误删除正常Unicode字符 + """ + # 更精确的emoji范围,避免误删除正常字符 + emoji_pattern = re.compile( + "[" + "\U0001F600-\U0001F64F" # 表情符号 (Emoticons) + "\U0001F300-\U0001F5FF" # 杂项符号和象形文字 (Miscellaneous Symbols and Pictographs) + "\U0001F680-\U0001F6FF" # 交通和地图符号 (Transport and Map Symbols) + "\U0001F1E0-\U0001F1FF" # 区域指示符号 (Regional Indicator Symbols) + "\U0001F900-\U0001F9FF" # 补充符号和象形文字 (Supplemental Symbols and Pictographs) + "\U0001FA70-\U0001FAFF" # 扩展A符号和象形文字 (Symbols and Pictographs Extended-A) + "\U00002600-\U000026FF" # 杂项符号 (Miscellaneous Symbols) + "\U00002700-\U000027BF" # 装饰符号 (Dingbats) + "\U0000FE00-\U0000FE0F" # 变体选择器 (Variation Selectors) + "\U0001F000-\U0001F02F" # 麻将牌 (Mahjong Tiles) + "\U0001F0A0-\U0001F0FF" # 扑克牌 (Playing Cards) + "]+", + flags=re.UNICODE, + ) + + # 保护常用的中文标点符号和特殊字符 + protected_chars = ["。", ",", "!", "?", ":", ";", "、", """, """, "'", "'", "(", ")", "【", "】", "《", "》"] + + # 先保存保护字符的位置 + protected_positions = {} + for i, char in enumerate(text): + if char in protected_chars: + protected_positions[i] = char + + # 执行emoji过滤 + filtered_text = emoji_pattern.sub('', text) + + # 如果过滤后文本长度变化太大,可能误删了正常字符,返回原文本 + if len(filtered_text) < len(text) * 0.5: # 如果删除了超过50%的内容 + return text + + return filtered_text + + def __process_stream_output(self, text, username, session_type="type2_stream", is_qa=False): + """ + 按流式方式分割和发送 type=2 的文本 + 使用安全的流式文本处理器和状态管理器 + """ + if not text or text.strip() == "": + return + + # 使用安全的流式文本处理器 + from utils.stream_text_processor import get_processor + from utils.stream_state_manager import get_state_manager + + processor = get_processor() + state_manager = get_state_manager() + + # 处理流式文本,is_qa=False表示普通模式 + success = processor.process_stream_text(text, username, is_qa=is_qa, session_type=session_type) + + if success: + # 普通模式结束会话 + state_manager.end_session(username, conversation_id=stream_manager.new_instance().get_conversation_id(username)) + else: + util.log(1, f"type=2流式处理失败,文本长度: {len(text)}") + # 失败时也要确保结束会话 + state_manager.force_reset_user_state(username) + + #语音消息处理检查是否命中q&a + def __get_answer(self, interleaver, text): + answer = None + # 全局问答 + answer, type = qa_service.QAService().question('qa',text) + if answer is not None: + return answer, type + else: + return None, None + + + #消息处理 + def __process_interact(self, interact: Interact): + if self.__running: + try: + index = interact.interact_type + username = interact.data.get("user", "User") + uid = member_db.new_instance().find_user(username) no_reply = interact.data.get("no_reply", False) if isinstance(no_reply, str): no_reply = no_reply.strip().lower() in ("1", "true", "yes", "y", "on") else: no_reply = bool(no_reply) + + if index == 1: #语音、文字交互 + + #记录用户问题,方便obs等调用 + self.write_to_file("./logs", "asr_result.txt", interact.data["msg"]) + + #同步用户问题到数字人 + if wsa_server.get_instance().is_connected(username): + content = {'Topic': 'human', 'Data': {'Key': 'question', 'Value': interact.data["msg"]}, 'Username' : interact.data.get("user")} + wsa_server.get_instance().add_cmd(content) + + #记录用户问题 + content_id = content_db.new_instance().add_content('member','speak',interact.data["msg"], username, uid) + if wsa_server.get_web_instance().is_connected(username): + wsa_server.get_web_instance().add_cmd({"panelReply": {"type":"member","content":interact.data["msg"], "username":username, "uid":uid, "id":content_id}, "Username" : username}) + + if no_reply: return "" #确定是否命中q&a + answer, type = self.__get_answer(interact.interleaver, interact.data["msg"]) + + #大语言模型回复 + text = '' + if answer is None or type != "qa": + if wsa_server.get_web_instance().is_connected(username): + wsa_server.get_web_instance().add_cmd({"panelMsg": "思考中...", "Username" : username, 'robot': f'{cfg.fay_url}/robot/Thinking.jpg'}) + if wsa_server.get_instance().is_connected(username): + content = {'Topic': 'human', 'Data': {'Key': 'log', 'Value': "思考中..."}, 'Username' : username, 'robot': f'{cfg.fay_url}/robot/Thinking.jpg'} + wsa_server.get_instance().add_cmd(content) + + # 根据配置动态调用不同的NLP模块 + if cfg.config["memory"].get("use_bionic_memory", False): + from llm import nlp_bionicmemory_stream + text = nlp_bionicmemory_stream.question(interact.data["msg"], username, interact.data.get("observation", None)) + else: + from llm import nlp_cognitive_stream + text = nlp_cognitive_stream.question(interact.data["msg"], username, interact.data.get("observation", None)) + + else: + text = answer + # 使用流式分割处理Q&A答案 + self.__process_stream_output(text, username, session_type="qa", is_qa=True) + + + return text + + elif (index == 2):#透传模式:有音频则仅播音频;仅文本则流式+TTS + audio_url = interact.data.get("audio") + text = interact.data.get("text") + + # 1) 存在音频:忽略文本,仅播放音频 + if audio_url and str(audio_url).strip(): + try: + audio_interact = Interact( + "stream", 1, + {"user": username, "msg": "", "isfirst": True, "isend": True, "audio": audio_url} + ) + self.say(audio_interact, "") + except Exception: + pass + return 'success' + + # 2) 只有文本:执行流式切分并TTS + if text and str(text).strip(): + # 进行流式处理(用于TTS,流式处理中会记录到数据库) + self.__process_stream_output(text, username, f"type2_{interact.interleaver}", is_qa=False) + + # 不再需要额外记录,因为流式处理已经记录了 + # self.__process_text_output(text, username, uid) + + return 'success' + + # 没有有效内容 + return 'success' + + except BaseException as e: + print(e) + return e + else: + return "还没有开始运行" + + #记录问答到log + def write_to_file(self, path, filename, content): + if not os.path.exists(path): + os.makedirs(path) + full_path = os.path.join(path, filename) + with open(full_path, 'w', encoding='utf-8') as file: + file.write(content) + file.flush() + os.fsync(file.fileno()) + + #触发交互 + def on_interact(self, interact: Interact): + #创建用户 + username = interact.data.get("user", "User") + if member_db.new_instance().is_username_exist(username) == "notexists": + member_db.new_instance().add_user(username) + no_reply = interact.data.get("no_reply", False) + if isinstance(no_reply, str): + no_reply = no_reply.strip().lower() in ("1", "true", "yes", "y", "on") + else: + no_reply = bool(no_reply) + + if not no_reply: + try: + from utils.stream_state_manager import get_state_manager + import uuid + if get_state_manager().is_session_active(username): + stream_manager.new_instance().clear_Stream_with_audio(username) + conv_id = "conv_" + str(uuid.uuid4()) + stream_manager.new_instance().set_current_conversation(username, conv_id) + # 将当前会话ID附加到交互数据 + interact.data["conversation_id"] = conv_id + # 允许新的生成 + stream_manager.new_instance().set_stop_generation(username, stop=False) + except Exception: + util.log(3, "开启新会话失败") + + if interact.interact_type == 1: + MyThread(target=self.__process_interact, args=[interact]).start() + else: + return self.__process_interact(interact) + + #获取不同情绪声音 + def __get_mood_voice(self): + voice = tts_voice.get_voice_of(config_util.config["attribute"]["voice"]) + if voice is None: + voice = EnumVoice.XIAO_XIAO + styleList = voice.value["styleList"] + sayType = styleList["calm"] + return sayType + + # 合成声音 + def say(self, interact, text, type = ""): + try: + uid = member_db.new_instance().find_user(interact.data.get("user")) + is_end = interact.data.get("isend", False) + is_first = interact.data.get("isfirst", False) + username = interact.data.get("user", "User") + + # 提前进行会话有效性与中断检查,避免产生多余面板/数字人输出 + try: + user_for_stop = interact.data.get("user", "User") + conv_id_for_stop = interact.data.get("conversation_id") + if not is_end and stream_manager.new_instance().should_stop_generation(user_for_stop, conversation_id=conv_id_for_stop): + return None + except Exception: + pass + + #无效流式文本提前结束 + if not is_first and not is_end and (text is None or text.strip() == ""): + return None + + # 检查是否是 prestart 内容(不应该影响 thinking 状态) + is_prestart_content = self.__has_prestart(text) + + # 流式文本拼接存库 + content_id = 0 + # 使用 (username, conversation_id) 作为 key,避免并发会话覆盖 + conv = interact.data.get("conversation_id") or "" + conv_map_key = (username, conv) + + if is_first == True: + # reset any leftover think-mode at the start of a new reply + # 但如果是 prestart 内容,不重置 thinking 状态 + try: + if uid is not None and not is_prestart_content: + self.think_mode_users[uid] = False + if uid in self.think_time_users: + del self.think_time_users[uid] + except Exception: + pass + # 如果没有 conversation_id,生成一个新的 + if not conv: + conv = "conv_" + str(uuid.uuid4()) + conv_map_key = (username, conv) + conv_no = 0 + # 创建第一条数据库记录,获得content_id + if text and text.strip(): + content_id = content_db.new_instance().add_content('fay', 'speak', text, username, uid) + else: + content_id = content_db.new_instance().add_content('fay', 'speak', '', username, uid) + + # 保存content_id到会话映射中,使用 (username, conversation_id) 作为 key + self.user_conv_map[conv_map_key] = { + "conversation_id": conv, + "conversation_msg_no": conv_no, + "content_id": content_id + } + util.log(1, f"流式会话开始: key={conv_map_key}, content_id={content_id}") + else: + # 获取之前保存的content_id + conv_info = self.user_conv_map.get(conv_map_key, {}) + content_id = conv_info.get("content_id", 0) + + # 如果 conv_map_key 不存在,尝试使用 username 作为备用查找 + if not conv_info and text and text.strip(): + # 查找所有匹配用户名的会话 + for (u, c), info in list(self.user_conv_map.items()): + if u == username and info.get("content_id", 0) > 0: + content_id = info.get("content_id", 0) + conv_info = info + util.log(1, f"警告:使用备用会话 ({u}, {c}) 的 content_id={content_id},原 key=({username}, {conv})") + break + + if conv_info: + conv_info["conversation_msg_no"] = conv_info.get("conversation_msg_no", 0) + 1 + + # 如果有新内容,更新数据库 + if content_id > 0 and text and text.strip(): + # 获取当前已有内容 + existing_content = content_db.new_instance().get_content_by_id(content_id) + if existing_content: + # 累积内容 + accumulated_text = existing_content[3] + text + content_db.new_instance().update_content(content_id, accumulated_text) + elif content_id == 0 and text and text.strip(): + # content_id 为 0 表示可能会话 key 不匹配,记录警告 + util.log(1, f"警告:content_id=0,无法更新数据库。user={username}, conv={conv}, text片段={text[:50] if len(text) > 50 else text}") + + # 会话结束时清理 user_conv_map 中的对应条目,避免内存泄漏 + if is_end and conv_map_key in self.user_conv_map: + del self.user_conv_map[conv_map_key] + + # 推送给前端和数字人 + try: + user_for_stop = interact.data.get("user", "User") + conv_id_for_stop = interact.data.get("conversation_id") + if is_end or not stream_manager.new_instance().should_stop_generation(user_for_stop, conversation_id=conv_id_for_stop): + self.__process_text_output(text, interact.data.get('user'), uid, content_id, type, is_first, is_end) + except Exception: + self.__process_text_output(text, interact.data.get('user'), uid, content_id, type, is_first, is_end) + + # 处理think标签 + # 第一步:处理结束标记 + if "" in text: + # 设置用户退出思考模式 + self.think_mode_users[uid] = False + + # 分割文本,提取后面的内容 + # 如果有多个,我们只关心最后一个后面的内容 + parts = text.split("") + text = parts[-1].strip() + + # 如果提取出的文本为空,则不需要继续处理 + if text == "": + return None + # 第二步:处理开始标记 + # 注意:这里要检查经过上面处理后的text + if "" in text: + self.think_mode_users[uid] = True + self.think_time_users[uid] = time.time() + + #”思考中“的输出 + if self.think_mode_users.get(uid, False): + try: + user_for_stop = interact.data.get("user", "User") + conv_id_for_stop = interact.data.get("conversation_id") + should_block = stream_manager.new_instance().should_stop_generation(user_for_stop, conversation_id=conv_id_for_stop) + except Exception: + should_block = False + if not should_block: + if wsa_server.get_web_instance().is_connected(interact.data.get('user')): + wsa_server.get_web_instance().add_cmd({"panelMsg": "思考中...", "Username" : interact.data.get('user'), 'robot': f'{cfg.fay_url}/robot/Thinking.jpg'}) + if wsa_server.get_instance().is_connected(interact.data.get("user")): + content = {'Topic': 'human', 'Data': {'Key': 'log', 'Value': "思考中..."}, 'Username' : interact.data.get('user'), 'robot': f'{cfg.fay_url}/robot/Thinking.jpg'} + wsa_server.get_instance().add_cmd(content) + + #”请稍等“的音频输出(不影响文本输出) + if self.think_mode_users.get(uid, False) == True and time.time() - self.think_time_users[uid] >= 5: + self.think_time_users[uid] = time.time() + text = "请稍等..." + elif self.think_mode_users.get(uid, False) == True and "" not in text: + return None + + result = None + audio_url = interact.data.get('audio', None)#透传的音频 + + # 移除 prestart 标签内容,不进行TTS + tts_text = self.__remove_prestart_tags(text) if text else text + + if audio_url is not None:#透传音频下载 + file_name = 'sample-' + str(int(time.time() * 1000)) + audio_url[-4:] + result = self.download_wav(audio_url, './samples/', file_name) + elif config_util.config["interact"]["playSound"] or wsa_server.get_instance().get_client_output(interact.data.get("user")) or self.__is_send_remote_device_audio(interact):#tts + if tts_text != None and tts_text.replace("*", "").strip() != "": + # 检查是否需要停止TTS处理(按会话) + if stream_manager.new_instance().should_stop_generation( + interact.data.get("user", "User"), + conversation_id=interact.data.get("conversation_id") + ): + util.printInfo(1, interact.data.get('user'), 'TTS处理被打断,跳过音频合成') + return None + + # 先过滤表情符号,然后再合成语音 + filtered_text = self.__remove_emojis(tts_text.replace("*", "")) + if filtered_text is not None and filtered_text.strip() != "": + util.printInfo(1, interact.data.get('user'), '合成音频...') + tm = time.time() + result = self.sp.to_sample(filtered_text, self.__get_mood_voice()) + # 合成完成后再次检查会话是否仍有效,避免继续输出旧会话结果 + try: + user_for_stop = interact.data.get("user", "User") + conv_id_for_stop = interact.data.get("conversation_id") + if stream_manager.new_instance().should_stop_generation(user_for_stop, conversation_id=conv_id_for_stop): + return None + except Exception: + pass + util.printInfo(1, interact.data.get("user"), "合成音频完成. 耗时: {} ms 文件:{}".format(math.floor((time.time() - tm) * 1000), result)) + else: + # prestart 内容不应该触发机器人表情重置 + if is_end and not is_prestart_content and wsa_server.get_web_instance().is_connected(interact.data.get('user')): + wsa_server.get_web_instance().add_cmd({"panelMsg": "", 'Username' : interact.data.get('user'), 'robot': f'{cfg.fay_url}/robot/Normal.jpg'}) + + if result is not None or is_first or is_end: + # prestart 内容不需要进入音频处理流程 + if is_prestart_content: + return result + if is_end:#TODO 临时方案:如果结束标记,则延迟1秒处理,免得is end比前面的音频tts要快 + time.sleep(1) + MyThread(target=self.__process_output_audio, args=[result, interact, text]).start() + return result + + except BaseException as e: + print(e) + return None + + #下载wav + def download_wav(self, url, save_directory, filename): + try: + # 发送HTTP GET请求以获取WAV文件内容 + response = requests.get(url, stream=True) + response.raise_for_status() # 检查请求是否成功 + + # 确保保存目录存在 + if not os.path.exists(save_directory): + os.makedirs(save_directory) + + # 构建保存文件的路径 + save_path = os.path.join(save_directory, filename) + + # 将WAV文件内容保存到指定文件 + with open(save_path, 'wb') as f: + for chunk in response.iter_content(chunk_size=1024): + if chunk: + f.write(chunk) + + return save_path + except requests.exceptions.RequestException as e: + print(f"[Error] Failed to download file: {e}") + return None + + + #面板播放声音 + def __play_sound(self): + try: + import pygame + pygame.mixer.init() # 初始化pygame.mixer,只需要在此处初始化一次, 如果初始化失败,则不播放音频 + except Exception as e: + util.printInfo(1, "System", "音频播放初始化失败,本机无法播放音频") + return + + while self.__running: + time.sleep(0.01) + if not self.sound_query.empty(): # 如果队列不为空则播放音频 + file_url, audio_length, interact = self.sound_query.get() + + is_first = interact.data.get('isfirst') is True + is_end = interact.data.get('isend') is True + + + + if file_url is not None: + util.printInfo(1, interact.data.get('user'), '播放音频...') + + if is_first: + self.speaking = True + elif not is_end: + self.speaking = True + + #自动播报关闭 + global auto_play_lock + global can_auto_play + with auto_play_lock: + if self.timer is not None: + self.timer.cancel() + self.timer = None + can_auto_play = False + + if wsa_server.get_web_instance().is_connected(interact.data.get('user')): + wsa_server.get_web_instance().add_cmd({"panelMsg": "播放中 ...", "Username" : interact.data.get('user'), 'robot': f'{cfg.fay_url}/robot/Speaking.jpg'}) + + if file_url is not None: + pygame.mixer.music.load(file_url) + pygame.mixer.music.play() + + # 播放过程中计时,直到音频播放完毕 + length = 0 + while length < audio_length: + try: + user_for_stop = interact.data.get("user", "User") + conv_id_for_stop = interact.data.get("conversation_id") + if stream_manager.new_instance().should_stop_generation(user_for_stop, conversation_id=conv_id_for_stop): + try: + pygame.mixer.music.stop() + except Exception: + pass + break + except Exception: + pass + length += 0.01 + time.sleep(0.01) + + if is_end: + self.play_end(interact) + + if wsa_server.get_web_instance().is_connected(interact.data.get('user')): + wsa_server.get_web_instance().add_cmd({"panelMsg": "", "Username" : interact.data.get('user'), 'robot': f'{cfg.fay_url}/robot/Normal.jpg'}) + # 播放完毕后通知 + if wsa_server.get_web_instance().is_connected(interact.data.get("user")): + wsa_server.get_web_instance().add_cmd({"panelMsg": "", 'Username': interact.data.get('user')}) + + #推送远程音频 + def __send_remote_device_audio(self, file_url, interact): + if file_url is None: + return + delkey = None + for key, value in fay_booter.DeviceInputListenerDict.items(): + if value.username == interact.data.get("user") and value.isOutput: #按username选择推送,booter.devicelistenerdice按用户名记录 + try: + value.deviceConnector.send(b"\x00\x01\x02\x03\x04\x05\x06\x07\x08") # 发送音频开始标志,同时也检查设备是否在线 + wavfile = open(os.path.abspath(file_url), "rb") + data = wavfile.read(102400) + total = 0 + while data: + total += len(data) + value.deviceConnector.send(data) + data = wavfile.read(102400) + time.sleep(0.0001) + value.deviceConnector.send(b'\x08\x07\x06\x05\x04\x03\x02\x01\x00')# 发送音频结束标志 + util.printInfo(1, value.username, "远程音频发送完成:{}".format(total)) + except socket.error as serr: + util.printInfo(1, value.username, "远程音频输入输出设备已经断开:{}".format(key)) + value.stop() + delkey = key + if delkey: + value = fay_booter.DeviceInputListenerDict.pop(delkey) + if wsa_server.get_web_instance().is_connected(interact.data.get('user')): + wsa_server.get_web_instance().add_cmd({"remote_audio_connect": False, "Username" : interact.data.get('user')}) + + def __is_send_remote_device_audio(self, interact): + for key, value in fay_booter.DeviceInputListenerDict.items(): + if value.username == interact.data.get("user") and value.isOutput: + return True + return False + + #输出音频处理 + def __process_output_audio(self, file_url, interact, text): + try: + # 会话有效性与中断检查(最早返回,避免向面板/数字人发送任何旧会话输出) + try: + user_for_stop = interact.data.get("user", "User") + conv_id_for_stop = interact.data.get("conversation_id") + if stream_manager.new_instance().should_stop_generation(user_for_stop, conversation_id=conv_id_for_stop): + return + except Exception: + pass + try: + if file_url is None: + audio_length = 0 + elif file_url.endswith('.wav'): + audio = AudioSegment.from_wav(file_url) + audio_length = len(audio) / 1000.0 # 时长以秒为单位 + elif file_url.endswith('.mp3'): + audio = AudioSegment.from_mp3(file_url) + audio_length = len(audio) / 1000.0 # 时长以秒为单位 + except Exception as e: + audio_length = 3 + + #推送远程音频 + if file_url is not None: + MyThread(target=self.__send_remote_device_audio, args=[file_url, interact]).start() + + #发送音频给数字人接口 + if file_url is not None and wsa_server.get_instance().get_client_output(interact.data.get("user")): + # 使用 (username, conversation_id) 作为 key 获取会话信息 + audio_username = interact.data.get("user", "User") + audio_conv_id = interact.data.get("conversation_id") or "" + audio_conv_info = self.user_conv_map.get((audio_username, audio_conv_id), {}) + content = {'Topic': 'human', 'Data': {'Key': 'audio', 'Value': os.path.abspath(file_url), 'HttpValue': f'{cfg.fay_url}/audio/' + os.path.basename(file_url), 'Text': text, 'Time': audio_length, 'Type': interact.interleaver, 'IsFirst': 1 if interact.data.get("isfirst", False) else 0, 'IsEnd': 1 if interact.data.get("isend", False) else 0, 'CONV_ID' : audio_conv_info.get("conversation_id", ""), 'CONV_MSG_NO' : audio_conv_info.get("conversation_msg_no", 0) }, 'Username' : interact.data.get('user'), 'robot': f'{cfg.fay_url}/robot/Speaking.jpg'} + #计算lips + if platform.system() == "Windows": + try: + lip_sync_generator = LipSyncGenerator() + viseme_list = lip_sync_generator.generate_visemes(os.path.abspath(file_url)) + consolidated_visemes = lip_sync_generator.consolidate_visemes(viseme_list) + content["Data"]["Lips"] = consolidated_visemes + except Exception as e: + print(e) + util.printInfo(1, interact.data.get("user"), "唇型数据生成失败") + wsa_server.get_instance().add_cmd(content) + util.printInfo(1, interact.data.get("user"), "数字人接口发送音频数据成功") + + #面板播放 + config_util.load_config() + # 检查是否是 prestart 内容 + is_prestart = self.__has_prestart(text) + if config_util.config["interact"]["playSound"]: + # prestart 内容不应该进入播放队列,避免触发 Normal 状态 + if not is_prestart: + self.sound_query.put((file_url, audio_length, interact)) + else: + # prestart 内容不应该重置机器人表情 + if not is_prestart and wsa_server.get_web_instance().is_connected(interact.data.get('user')): + wsa_server.get_web_instance().add_cmd({"panelMsg": "", 'Username' : interact.data.get('user'), 'robot': f'{cfg.fay_url}/robot/Normal.jpg'}) + + except Exception as e: + print(e) + + def play_end(self, interact): + self.speaking = False + global can_auto_play + global auto_play_lock + with auto_play_lock: + if self.timer: + self.timer.cancel() + self.timer = None + if interact.interleaver != 'auto_play': #交互后暂停自动播报30秒 + self.timer = threading.Timer(30, self.set_auto_play) + self.timer.start() + else: + can_auto_play = True + + #恢复自动播报(如果有) + def set_auto_play(self): + global auto_play_lock + global can_auto_play + with auto_play_lock: + can_auto_play = True + self.timer = None + + #启动核心服务 + def start(self): + MyThread(target=self.__play_sound).start() + + #停止核心服务 + def stop(self): + self.__running = False + self.speaking = False + self.sp.close() + wsa_server.get_web_instance().add_cmd({"panelMsg": ""}) + content = {'Topic': 'human', 'Data': {'Key': 'log', 'Value': ""}} + wsa_server.get_instance().add_cmd(content) + + def __record_response(self, text, username, uid): + """ + 记录AI的回复内容 + :param text: 回复文本 + :param username: 用户名 + :param uid: 用户ID + :return: content_id + """ + self.write_to_file("./logs", "answer_result.txt", text) + return content_db.new_instance().add_content('fay', 'speak', text, username, uid) + + def __remove_prestart_tags(self, text): + """ + 移除文本中的 prestart 标签及其内容 + :param text: 原始文本 + :return: 移除 prestart 标签后的文本 + """ + if not text: + return text + import re + # 移除 ... 标签及其内容(支持属性) + cleaned = re.sub(r']*>[\s\S]*?', '', text, flags=re.IGNORECASE) + return cleaned.strip() + + def __has_prestart(self, text): + """ + 判断文本中是否包含 prestart 标签(支持属性) + """ + if not text: + return False + return re.search(r']*>[\s\S]*?', text, flags=re.IGNORECASE) is not None + + def __send_panel_message(self, text, username, uid, content_id=None, type=None): + """ + 发送消息到Web面板 + :param text: 消息文本 + :param username: 用户名 + :param uid: 用户ID + :param content_id: 内容ID + :param type: 消息类型 + """ + if not wsa_server.get_web_instance().is_connected(username): + return + + # 检查是否是 prestart 内容,prestart 内容不应该更新日志区消息 + # 因为这会覆盖掉"思考中..."的状态显示 + is_prestart = self.__has_prestart(text) + + # gui日志区消息(prestart 内容跳过,保持"思考中..."状态) + if not is_prestart: + wsa_server.get_web_instance().add_cmd({ + "panelMsg": text, + "Username": username + }) + + # 聊天窗消息 + if content_id is not None: + wsa_server.get_web_instance().add_cmd({ + "panelReply": { + "type": "fay", + "content": text, + "username": username, + "uid": uid, + "id": content_id, + "is_adopted": type == 'qa' + }, + "Username": username + }) + + def __send_digital_human_message(self, text, username, is_first=False, is_end=False): + """ + 发送消息到数字人(语音应该在say方法驱动数字人输出) + :param text: 消息文本 + :param username: 用户名 + :param is_first: 是否是第一段文本 + :param is_end: 是否是最后一段文本 + """ + # 移除 prestart 标签内容,不发送给数字人 + cleaned_text = self.__remove_prestart_tags(text) if text else "" + full_text = self.__remove_emojis(cleaned_text.replace("*", "")) if cleaned_text else "" + + # 如果文本为空且不是结束标记,则不发送,但需保留 is_first + if not full_text and not is_end: + if is_first: + self.pending_isfirst[username] = True + return + + # 检查是否有延迟的 is_first 需要应用 + if self.pending_isfirst.get(username, False): + is_first = True + self.pending_isfirst[username] = False + + if wsa_server.get_instance().is_connected(username): + content = { + 'Topic': 'human', + 'Data': { + 'Key': 'text', + 'Value': full_text, + 'IsFirst': 1 if is_first else 0, + 'IsEnd': 1 if is_end else 0 + }, + 'Username': username + } + wsa_server.get_instance().add_cmd(content) + + def __process_text_output(self, text, username, uid, content_id, type, is_first=False, is_end=False): + """ + 完整文本输出到各个终端 + :param text: 主要回复文本 + :param textlist: 额外回复列表 + :param username: 用户名 + :param uid: 用户ID + :param type: 消息类型 + :param is_first: 是否是第一段文本 + :param is_end: 是否是最后一段文本 + """ + if text: + text = text.strip() + + # 记录主回复 + # content_id = self.__record_response(text, username, uid) + + # 发送主回复到面板和数字人 + self.__send_panel_message(text, username, uid, content_id, type) + self.__send_digital_human_message(text, username, is_first, is_end) + + # 打印日志 + util.printInfo(1, username, '({}) {}'.format("llm", text)) + +import importlib +fay_booter = importlib.import_module('fay_booter') + diff --git a/gui/flask_server.py b/gui/flask_server.py index e42ffbe..9773b85 100644 --- a/gui/flask_server.py +++ b/gui/flask_server.py @@ -74,9 +74,9 @@ def __get_template(): except Exception as e: return f"Error rendering template: {e}", 500 -def __get_device_list(): - try: - if config_util.start_mode == 'common': +def __get_device_list(): + try: + if config_util.start_mode == 'common': audio = pyaudio.PyAudio() device_list = [] for i in range(audio.get_device_count()): @@ -86,12 +86,33 @@ def __get_device_list(): return list(set(device_list)) else: return [] - except Exception as e: - print(f"Error getting device list: {e}") - return [] - -@__app.route('/api/submit', methods=['post']) -def api_submit(): + except Exception as e: + print(f"Error getting device list: {e}") + return [] + +def _as_bool(value): + if isinstance(value, bool): + return value + if value is None: + return False + if isinstance(value, (int, float)): + return value != 0 + if isinstance(value, str): + return value.strip().lower() in ("1", "true", "yes", "y", "on") + return False + +def _build_llm_url(base_url: str) -> str: + if not base_url: + return "" + url = base_url.rstrip("/") + if url.endswith("/chat/completions"): + return url + if url.endswith("/v1"): + return url + "/chat/completions" + return url + "/v1/chat/completions" + +@__app.route('/api/submit', methods=['post']) +def api_submit(): data = request.values.get('data') if not data: return jsonify({'result': 'error', 'message': '未提供数据'}) @@ -288,23 +309,27 @@ def api_send(): # 获取指定用户的消息记录(支持分页) @__app.route('/api/get-msg', methods=['post']) -def api_get_Msg(): - try: - data = request.form.get('data') - if data is None: - data = request.get_json() - else: - data = json.loads(data) - uid = member_db.new_instance().find_user(data["username"]) - limit = data.get("limit", 30) # 默认每页30条 - offset = data.get("offset", 0) # 默认从0开始 - contentdb = content_db.new_instance() - if uid == 0: - return json.dumps({'list': [], 'total': 0, 'hasMore': False}) - else: - # 获取总数用于判断是否还有更多 - total = contentdb.get_message_count(uid) - list = contentdb.get_list('all', 'desc', limit, uid, offset) +def api_get_Msg(): + try: + data = request.form.get('data') + if data is None: + data = request.get_json(silent=True) or {} + else: + data = json.loads(data) + if not isinstance(data, dict): + data = {} + username = data.get("username") + limit = data.get("limit", 30) # 默认每页30条 + offset = data.get("offset", 0) # 默认从0开始 + contentdb = content_db.new_instance() + uid = 0 + if username: + uid = member_db.new_instance().find_user(username) + if uid == 0: + return json.dumps({'list': [], 'total': 0, 'hasMore': False}) + # 获取总数用于判断是否还有更多 + total = contentdb.get_message_count(uid) + list = contentdb.get_list('all', 'desc', limit, uid, offset) relist = [] i = len(list) - 1 while i >= 0: @@ -329,23 +354,128 @@ def api_send_v1_chat_completions(): data = request.get_json() if not data: return jsonify({'error': '未提供数据'}) - try: - last_content = "" - if 'messages' in data and data['messages']: - last_message = data['messages'][-1] - username = last_message.get('role', 'User') - if username == 'user': - username = 'User' + try: + model = data.get('model', 'fay') + if model == 'llm': + try: + config_util.load_config() + llm_url = _build_llm_url(config_util.gpt_base_url) + api_key = config_util.key_gpt_api_key + model_engine = config_util.gpt_model_engine + except Exception as exc: + return jsonify({'error': f'LLM config load failed: {exc}'}), 500 + + if not llm_url: + return jsonify({'error': 'LLM base_url is not configured'}), 500 + + payload = dict(data) + if payload.get('model') == 'llm' and model_engine: + payload['model'] = model_engine + + stream_requested = _as_bool(payload.get('stream', False)) + headers = {'Content-Type': 'application/json'} + if api_key: + headers['Authorization'] = f'Bearer {api_key}' + + try: + if stream_requested: + resp = requests.post(llm_url, headers=headers, json=payload, stream=True) + + def generate(): + try: + for line in resp.iter_lines(decode_unicode=True): + if line is None: + continue + yield f"{line}\n" + finally: + resp.close() + + return Response( + generate(), + status=resp.status_code, + mimetype=resp.headers.get("Content-Type", "text/event-stream"), + ) + + resp = requests.post(llm_url, headers=headers, json=payload, timeout=60) + return Response( + resp.content, + status=resp.status_code, + content_type=resp.headers.get("Content-Type", "application/json"), + ) + except Exception as exc: + return jsonify({'error': f'LLM request failed: {exc}'}), 500 + + last_content = "" + if 'messages' in data and data['messages']: + last_message = data['messages'][-1] + username = last_message.get('role', 'User') + if username == 'user': + username = 'User' last_content = last_message.get('content', 'No content provided') else: last_content = 'No messages found' username = 'User' - model = data.get('model', 'fay') - observation = data.get('observation', '') + observation = data.get('observation', '') # 检查请求中是否指定了流式传输 - stream_requested = data.get('stream', False) - if stream_requested or model == 'fay-streaming': + stream_requested = data.get('stream', False) + no_reply = _as_bool(data.get('no_reply', data.get('noReply', False))) + if no_reply: + interact = Interact("text", 1, {'user': username, 'msg': last_content, 'observation': str(observation), 'stream': bool(stream_requested), 'no_reply': True}) + util.printInfo(1, username, '[text chat no_reply]{}'.format(interact.data["msg"]), time.time()) + fay_booter.feiFei.on_interact(interact) + if stream_requested or model == 'fay-streaming': + def generate(): + message = { + "id": "faystreaming-" + str(uuid.uuid4()), + "object": "chat.completion.chunk", + "created": int(time.time()), + "model": model, + "choices": [ + { + "delta": { + "content": "" + }, + "index": 0, + "finish_reason": "stop" + } + ], + "usage": { + "prompt_tokens": len(last_content), + "completion_tokens": 0, + "total_tokens": len(last_content) + }, + "system_fingerprint": "", + "no_reply": True + } + yield f"data: {json.dumps(message)}\n\n" + yield 'data: [DONE]\n\n' + return Response(generate(), mimetype='text/event-stream') + return jsonify({ + "id": "fay-" + str(uuid.uuid4()), + "object": "chat.completion", + "created": int(time.time()), + "model": model, + "choices": [ + { + "index": 0, + "message": { + "role": "assistant", + "content": "" + }, + "logprobs": "", + "finish_reason": "stop" + } + ], + "usage": { + "prompt_tokens": len(last_content), + "completion_tokens": 0, + "total_tokens": len(last_content) + }, + "system_fingerprint": "", + "no_reply": True + }) + if stream_requested or model == 'fay-streaming': interact = Interact("text", 1, {'user': username, 'msg': last_content, 'observation': str(observation), 'stream':True}) util.printInfo(1, username, '[文字沟通接口(流式)]{}'.format(interact.data["msg"]), time.time()) fay_booter.feiFei.on_interact(interact) diff --git a/llm/nlp_cognitive_stream.py b/llm/nlp_cognitive_stream.py index 19227a3..4e7f92c 100644 --- a/llm/nlp_cognitive_stream.py +++ b/llm/nlp_cognitive_stream.py @@ -1851,19 +1851,22 @@ def question(content, username, observation=None): or messages_buffer[-1]['content'] != content ): messages_buffer.append({"role": "user", "content": content}) - else: - # 不隔离:按独立消息存储,保留用户名信息 - def append_to_buffer_multi(role: str, text_value: str, msg_username: str = "") -> None: - if not text_value: - return - messages_buffer.append({"role": role, "content": text_value, "username": msg_username}) - if len(messages_buffer) > 60: - del messages_buffer[:-60] - - for record in history_records: - msg_type, msg_text, msg_username = record - if not msg_text: - continue + else: + # 不隔离:按独立消息存储,保留用户名信息 + def append_to_buffer_multi(role: str, text_value: str, msg_username: str = "") -> None: + if not text_value: + return + messages_buffer.append({"role": role, "content": text_value, "username": msg_username}) + if len(messages_buffer) > 60: + del messages_buffer[:-60] + + def append_to_buffer(role: str, text_value: str) -> None: + append_to_buffer_multi(role, text_value, "") + + for record in history_records: + msg_type, msg_text, msg_username = record + if not msg_text: + continue if msg_type and msg_type.lower() in ('member', 'user'): append_to_buffer_multi("user", msg_text, msg_username) else: diff --git a/test/test_fay_gpt_stream.py b/test/test_fay_gpt_stream.py index 00206a5..ec70612 100644 --- a/test/test_fay_gpt_stream.py +++ b/test/test_fay_gpt_stream.py @@ -1,7 +1,7 @@ import requests import json -def test_gpt(prompt): +def test_gpt(prompt, username="张三", observation="", no_reply=False): url = 'http://127.0.0.1:5000/v1/chat/completions' # 替换为您的接口地址 headers = { 'Content-Type': 'application/json', @@ -10,11 +10,19 @@ def test_gpt(prompt): data = { 'model': 'fay-streaming', 'messages': [ - {'role': '张三', 'content': prompt} + {'role': username, 'content': prompt} ], - 'stream': True # 启用流式传输 + 'stream': True, # 启用流式传输 + 'observation': observation, # 观察数据 + 'no_reply': no_reply } + print(f"[用户] {username}: {prompt}") + if observation: + print(f"[观察数据] {observation}") + print("-" * 50) + print("[Fay回复] ", end="") + response = requests.post(url, headers=headers, data=json.dumps(data), stream=True) if response.status_code != 200: @@ -45,8 +53,57 @@ def test_gpt(prompt): else: print(f"\n收到未知格式的数据:{line}") +# 观察数据样本 +OBSERVATION_SAMPLES = { + "张三": """识别到对话的人是张三 +认知状态:正常 +听力:正常 +视力:正常 +兴趣爱好:写代码、音乐、电影 +避免话题:学习成绩""", + + "李奶奶": """识别到对话的人是李奶奶 +认知状态:轻度记忆衰退 +听力:需要大声说话 +视力:正常 +兴趣爱好:养花、看戏曲、聊家常 +避免话题:子女工作压力""", + + "王叔叔": """识别到对话的人是王叔叔 +认知状态:正常 +听力:正常 +视力:老花眼 +兴趣爱好:钓鱼、下棋、看新闻 +避免话题:退休金""", + + "小明": """识别到对话的人是小明 +认知状态:正常 +听力:正常 +视力:正常 +年龄:10岁 +兴趣爱好:玩游戏、看动画片、踢足球 +避免话题:考试分数、作业""", +} + if __name__ == "__main__": - user_input = "你好" - print("GPT 的回复:") - test_gpt(user_input) + # 示例1:带观察数据的对话 + print("=" * 60) + print("示例1:张三的对话(带观察数据)") + print("=" * 60) + test_gpt("你好,今天天气不错啊", username="张三", observation=OBSERVATION_SAMPLES["张三"]) + + print("\n") + + # 示例2:不带观察数据的对话 + # print("=" * 60) + # print("示例2:普通对话(不带观察数据)") + # print("=" * 60) + # test_gpt("你好", username="User", observation="") + + # 示例3:李奶奶的对话 + # print("=" * 60) + # print("示例3:李奶奶的对话") + # print("=" * 60) + # test_gpt("小菲啊,我今天有点闷", username="李奶奶", observation=OBSERVATION_SAMPLES["李奶奶"]) + print("\n请求完成")