mirror of
https://github.com/xszyou/Fay.git
synced 2026-03-12 17:51:28 +08:00
fay自然进化
1. 增加清除记忆功能; 2. 增加克隆性格功能; 3. 增加认知模型(专属的记忆逻辑、反思逻辑); 4. 修复自动播报bug; 5. fay_url配置响修正; 6. 修复流式输出前置换行问题; 7. 修复没有用户聊天记录前端反复添加默认用户问题; 8. 更新dockerfile; 9. 重构util.py代码。 1. Fay ai编程指南:https://qqk9ntwbcit.feishu.cn/wiki/FKFywXWaeiBH28k4Q67c3eF7njC 2.Fay认知模型:https://qqk9ntwbcit.feishu.cn/wiki/BSW3wSsMdikiHUkiCJYcSp2lnio
This commit is contained in:
18
simulation_engine/example-settings.py
Normal file
18
simulation_engine/example-settings.py
Normal file
@@ -0,0 +1,18 @@
|
||||
from pathlib import Path
|
||||
|
||||
OPENAI_API_KEY = "sk-hAuN7OLqKJTdyDjNFdEfF4B0E53642E4B2BbCa248594Cd29"
|
||||
OPENAI_API_BASE = "https://api.zyai.online/v1" # 可以修改为你的自定义 base URL
|
||||
KEY_OWNER = "xszyou"
|
||||
|
||||
|
||||
DEBUG = False
|
||||
|
||||
MAX_CHUNK_SIZE = 4
|
||||
|
||||
LLM_VERS = "gpt-4o-mini"
|
||||
|
||||
BASE_DIR = f"{Path(__file__).resolve().parent.parent}"
|
||||
|
||||
## To do: Are the following needed in the new structure? Ideally Populations_Dir is for the user to define.
|
||||
POPULATIONS_DIR = f"{BASE_DIR}/agent_bank/populations"
|
||||
LLM_PROMPT_DIR = f"{BASE_DIR}/simulation_engine/prompt_template"
|
||||
387
simulation_engine/global_methods.py
Normal file
387
simulation_engine/global_methods.py
Normal file
@@ -0,0 +1,387 @@
|
||||
import random
|
||||
import json
|
||||
import string
|
||||
import csv
|
||||
import datetime as dt
|
||||
import os
|
||||
import numpy
|
||||
import math
|
||||
import shutil, errno
|
||||
|
||||
from os import listdir
|
||||
|
||||
|
||||
def create_folder_if_not_there(curr_path):
|
||||
"""
|
||||
Checks if a folder in the curr_path exists. If it does not exist, creates
|
||||
the folder.
|
||||
Note that if the curr_path designates a file location, it will operate on
|
||||
the folder that contains the file. But the function also works even if the
|
||||
path designates to just a folder.
|
||||
Args:
|
||||
curr_list: list to write. The list comes in the following form:
|
||||
[['key1', 'val1-1', 'val1-2'...],
|
||||
['key2', 'val2-1', 'val2-2'...],]
|
||||
outfile: name of the csv file to write
|
||||
RETURNS:
|
||||
True: if a new folder is created
|
||||
False: if a new folder is not created
|
||||
"""
|
||||
outfolder_name = curr_path.split("/")
|
||||
if len(outfolder_name) != 1:
|
||||
# This checks if the curr path is a file or a folder.
|
||||
if "." in outfolder_name[-1]:
|
||||
outfolder_name = outfolder_name[:-1]
|
||||
|
||||
outfolder_name = "/".join(outfolder_name)
|
||||
if not os.path.exists(outfolder_name):
|
||||
os.makedirs(outfolder_name)
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def write_list_of_list_to_csv(curr_list_of_list, outfile):
|
||||
"""
|
||||
Writes a list of list to csv.
|
||||
Unlike write_list_to_csv_line, it writes the entire csv in one shot.
|
||||
ARGS:
|
||||
curr_list_of_list: list to write. The list comes in the following form:
|
||||
[['key1', 'val1-1', 'val1-2'...],
|
||||
['key2', 'val2-1', 'val2-2'...],]
|
||||
outfile: name of the csv file to write
|
||||
RETURNS:
|
||||
None
|
||||
"""
|
||||
create_folder_if_not_there(outfile)
|
||||
with open(outfile, "w") as f:
|
||||
writer = csv.writer(f)
|
||||
writer.writerows(curr_list_of_list)
|
||||
|
||||
|
||||
def write_list_to_csv_line(line_list, outfile):
|
||||
"""
|
||||
Writes one line to a csv file.
|
||||
Unlike write_list_of_list_to_csv, this opens an existing outfile and then
|
||||
appends a line to that file.
|
||||
This also works if the file does not exist already.
|
||||
ARGS:
|
||||
curr_list: list to write. The list comes in the following form:
|
||||
['key1', 'val1-1', 'val1-2'...]
|
||||
Importantly, this is NOT a list of list.
|
||||
outfile: name of the csv file to write
|
||||
RETURNS:
|
||||
None
|
||||
"""
|
||||
create_folder_if_not_there(outfile)
|
||||
|
||||
# Opening the file first so we can write incrementally as we progress
|
||||
curr_file = open(outfile, 'a',)
|
||||
csvfile_1 = csv.writer(curr_file)
|
||||
csvfile_1.writerow(line_list)
|
||||
curr_file.close()
|
||||
|
||||
|
||||
def read_file_to_list(curr_file, header=False, strip_trail=True):
|
||||
"""
|
||||
Reads in a csv file to a list of list. If header is True, it returns a
|
||||
tuple with (header row, all rows)
|
||||
ARGS:
|
||||
curr_file: path to the current csv file.
|
||||
RETURNS:
|
||||
List of list where the component lists are the rows of the file.
|
||||
"""
|
||||
if not header:
|
||||
analysis_list = []
|
||||
with open(curr_file) as f_analysis_file:
|
||||
data_reader = csv.reader(f_analysis_file, delimiter=",")
|
||||
for count, row in enumerate(data_reader):
|
||||
if strip_trail:
|
||||
row = [i.strip() for i in row]
|
||||
analysis_list += [row]
|
||||
return analysis_list
|
||||
else:
|
||||
analysis_list = []
|
||||
with open(curr_file) as f_analysis_file:
|
||||
data_reader = csv.reader(f_analysis_file, delimiter=",")
|
||||
for count, row in enumerate(data_reader):
|
||||
if strip_trail:
|
||||
row = [i.strip() for i in row]
|
||||
analysis_list += [row]
|
||||
return analysis_list[0], analysis_list[1:]
|
||||
|
||||
|
||||
def read_file_to_set(curr_file, col=0):
|
||||
"""
|
||||
Reads in a "single column" of a csv file to a set.
|
||||
ARGS:
|
||||
curr_file: path to the current csv file.
|
||||
RETURNS:
|
||||
Set with all items in a single column of a csv file.
|
||||
"""
|
||||
analysis_set = set()
|
||||
with open(curr_file) as f_analysis_file:
|
||||
data_reader = csv.reader(f_analysis_file, delimiter=",")
|
||||
for count, row in enumerate(data_reader):
|
||||
analysis_set.add(row[col])
|
||||
return analysis_set
|
||||
|
||||
|
||||
def get_row_len(curr_file):
|
||||
"""
|
||||
Get the number of rows in a csv file
|
||||
ARGS:
|
||||
curr_file: path to the current csv file.
|
||||
RETURNS:
|
||||
The number of rows
|
||||
False if the file does not exist
|
||||
"""
|
||||
try:
|
||||
analysis_set = set()
|
||||
with open(curr_file) as f_analysis_file:
|
||||
data_reader = csv.reader(f_analysis_file, delimiter=",")
|
||||
for count, row in enumerate(data_reader):
|
||||
analysis_set.add(row[0])
|
||||
return len(analysis_set)
|
||||
except:
|
||||
return False
|
||||
|
||||
|
||||
def check_if_file_exists(curr_file):
|
||||
"""
|
||||
Checks if a file exists
|
||||
ARGS:
|
||||
curr_file: path to the current csv file.
|
||||
RETURNS:
|
||||
True if the file exists
|
||||
False if the file does not exist
|
||||
"""
|
||||
try:
|
||||
with open(curr_file) as f_analysis_file: pass
|
||||
return True
|
||||
except:
|
||||
return False
|
||||
|
||||
|
||||
def find_filenames(path_to_dir, suffix=".csv"):
|
||||
"""
|
||||
Given a directory, find all files that ends with the provided suffix and
|
||||
returns their paths.
|
||||
ARGS:
|
||||
path_to_dir: Path to the current directory
|
||||
suffix: The target suffix.
|
||||
RETURNS:
|
||||
A list of paths to all files in the directory.
|
||||
"""
|
||||
filenames = listdir(path_to_dir)
|
||||
new_filenames = []
|
||||
for i in filenames:
|
||||
if ".DS_Store" not in i:
|
||||
new_filenames += [i]
|
||||
filenames = new_filenames
|
||||
return [ path_to_dir+"/"+filename
|
||||
for filename in filenames if filename.endswith( suffix ) ]
|
||||
|
||||
|
||||
def average(list_of_val):
|
||||
"""
|
||||
Finds the average of the numbers in a list.
|
||||
ARGS:
|
||||
list_of_val: a list of numeric values
|
||||
RETURNS:
|
||||
The average of the values
|
||||
"""
|
||||
try:
|
||||
list_of_val = [float(i) for i in list_of_val if not math.isnan(i)]
|
||||
return sum(list_of_val)/float(len(list_of_val))
|
||||
except:
|
||||
return float('nan')
|
||||
|
||||
|
||||
def std(list_of_val):
|
||||
"""
|
||||
Finds the std of the numbers in a list.
|
||||
ARGS:
|
||||
list_of_val: a list of numeric values
|
||||
RETURNS:
|
||||
The std of the values
|
||||
"""
|
||||
try:
|
||||
list_of_val = [float(i) for i in list_of_val if not math.isnan(i)]
|
||||
std = numpy.std(list_of_val)
|
||||
return std
|
||||
except:
|
||||
return float('nan')
|
||||
|
||||
|
||||
def copyanything(src, dst):
|
||||
"""
|
||||
Copy over everything in the src folder to dst folder.
|
||||
ARGS:
|
||||
src: address of the source folder
|
||||
dst: address of the destination folder
|
||||
RETURNS:
|
||||
None
|
||||
"""
|
||||
try:
|
||||
shutil.copytree(src, dst)
|
||||
except OSError as exc: # python >2.5
|
||||
if exc.errno in (errno.ENOTDIR, errno.EINVAL):
|
||||
shutil.copy(src, dst)
|
||||
else: raise
|
||||
|
||||
|
||||
def generate_alphanumeric_string(length):
|
||||
characters = string.ascii_letters + string.digits
|
||||
result = ''.join(random.choice(characters) for _ in range(length))
|
||||
return result
|
||||
|
||||
|
||||
def extract_first_json_dict(input_str):
|
||||
"""
|
||||
从字符串中提取第一个JSON字典
|
||||
|
||||
参数:
|
||||
input_str: 包含JSON字典的字符串
|
||||
|
||||
返回:
|
||||
解析后的JSON字典,如果解析失败则返回None
|
||||
"""
|
||||
try:
|
||||
# 确保输入是字符串类型
|
||||
if not isinstance(input_str, str):
|
||||
print("提取JSON错误: 输入必须是字符串类型")
|
||||
return None
|
||||
|
||||
# 替换特殊引号为标准双引号
|
||||
input_str = (input_str.replace(""", "\"")
|
||||
.replace(""", "\"")
|
||||
.replace("'", "'")
|
||||
.replace("'", "'"))
|
||||
|
||||
# 查找第一个'{'的位置
|
||||
try:
|
||||
start_index = input_str.index('{')
|
||||
except ValueError:
|
||||
print("提取JSON错误: 未找到JSON开始标记'{'")
|
||||
return None
|
||||
|
||||
# 初始化计数器,用于跟踪开闭括号
|
||||
count = 1
|
||||
end_index = start_index + 1
|
||||
|
||||
# 循环查找与第一个'{'匹配的'}'
|
||||
while count > 0 and end_index < len(input_str):
|
||||
if input_str[end_index] == '{':
|
||||
count += 1
|
||||
elif input_str[end_index] == '}':
|
||||
count -= 1
|
||||
end_index += 1
|
||||
|
||||
# 如果没有找到匹配的'}'
|
||||
if count > 0:
|
||||
print("提取JSON错误: JSON格式不完整,缺少匹配的'}'")
|
||||
return None
|
||||
|
||||
# 提取JSON子字符串
|
||||
json_str = input_str[start_index:end_index]
|
||||
|
||||
# 解析JSON字符串为Python字典
|
||||
try:
|
||||
json_dict = json.loads(json_str)
|
||||
return json_dict
|
||||
except json.JSONDecodeError as e:
|
||||
print(f"解析JSON错误: {str(e)}")
|
||||
return None
|
||||
except Exception as e:
|
||||
# 处理所有其他异常
|
||||
print(f"提取JSON时发生错误: {str(e)}")
|
||||
return None
|
||||
|
||||
|
||||
def read_file_to_string(file_path):
|
||||
try:
|
||||
with open(file_path, 'r', encoding='utf-8') as file:
|
||||
content = file.read()
|
||||
return content
|
||||
except FileNotFoundError:
|
||||
return "The file was not found."
|
||||
except Exception as e:
|
||||
return str(e)
|
||||
|
||||
|
||||
def write_string_to_file(full_path, text_content):
|
||||
create_folder_if_not_there(full_path)
|
||||
import os
|
||||
try:
|
||||
with open(full_path, 'w', encoding='utf-8') as file:
|
||||
file.write(text_content)
|
||||
return f"File successfully written to {full_path}"
|
||||
except Exception as e:
|
||||
return str(e)
|
||||
|
||||
|
||||
def chunk_list(lst, q_chunk_size):
|
||||
"""
|
||||
Splits the given list into sublists of specified chunk size.
|
||||
|
||||
Parameters:
|
||||
lst (list): The list to be split into chunks.
|
||||
q_chunk_size (int): The size of each chunk.
|
||||
|
||||
Returns:
|
||||
list: A list of sublists where each sublist has a length of q_chunk_size.
|
||||
"""
|
||||
# Initialize the result list
|
||||
chunked_list = []
|
||||
|
||||
# Loop through the list in steps of q_chunk_size
|
||||
for i in range(0, len(lst), q_chunk_size):
|
||||
# Append the sublist to the result list
|
||||
chunked_list.append(lst[i:i + q_chunk_size])
|
||||
|
||||
return chunked_list
|
||||
|
||||
|
||||
def write_dict_to_json(data, filename):
|
||||
"""
|
||||
Writes a dictionary to a JSON file.
|
||||
|
||||
Parameters:
|
||||
data (dict): The dictionary to write to the JSON file.
|
||||
filename (str): The name of the file to write the JSON data to.
|
||||
"""
|
||||
try:
|
||||
# 确保目录存在
|
||||
directory = os.path.dirname(filename)
|
||||
if directory and not os.path.exists(directory):
|
||||
os.makedirs(directory)
|
||||
|
||||
# 使用UTF-8编码写入JSON文件
|
||||
with open(filename, 'w', encoding='utf-8') as file:
|
||||
json.dump(data, file, ensure_ascii=False, indent=4)
|
||||
except Exception as e:
|
||||
print(f"写入JSON文件时出错: {str(e)}")
|
||||
|
||||
|
||||
def read_json_to_dict(file_path):
|
||||
"""
|
||||
Reads a JSON file and converts it to a Python dictionary.
|
||||
|
||||
Parameters:
|
||||
file_path (str): The path to the JSON file.
|
||||
|
||||
Returns:
|
||||
dict: The content of the JSON file as a dictionary.
|
||||
"""
|
||||
try:
|
||||
# 使用UTF-8编码读取JSON文件
|
||||
with open(file_path, 'r', encoding='utf-8') as file:
|
||||
data = json.load(file)
|
||||
return data
|
||||
except FileNotFoundError:
|
||||
print(f"未找到文件: {file_path}")
|
||||
except json.JSONDecodeError:
|
||||
print(f"解析JSON文件出错: {file_path}")
|
||||
except Exception as e:
|
||||
print(f"发生错误: {str(e)}")
|
||||
281
simulation_engine/gpt_structure.py
Normal file
281
simulation_engine/gpt_structure.py
Normal file
@@ -0,0 +1,281 @@
|
||||
import openai
|
||||
import time
|
||||
import base64
|
||||
from typing import List, Dict, Any, Union, Optional
|
||||
import os
|
||||
from simulation_engine.settings import *
|
||||
from utils import config_util as cfg
|
||||
|
||||
|
||||
# 确保配置已加载
|
||||
cfg.load_config()
|
||||
|
||||
# 初始化 OpenAI 客户端
|
||||
client = openai.OpenAI(
|
||||
api_key=OPENAI_API_KEY,
|
||||
base_url=OPENAI_API_BASE
|
||||
)
|
||||
|
||||
# 设置全局API密钥(兼容性考虑)
|
||||
openai.api_key = OPENAI_API_KEY
|
||||
|
||||
# 如果环境变量中没有设置,则设置环境变量(某些库可能依赖环境变量)
|
||||
if "OPENAI_API_KEY" not in os.environ:
|
||||
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY
|
||||
if "OPENAI_API_BASE" not in os.environ and OPENAI_API_BASE:
|
||||
os.environ["OPENAI_API_BASE"] = OPENAI_API_BASE
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# #######################[SECTION 1: HELPER FUNCTIONS] #######################
|
||||
# ============================================================================
|
||||
|
||||
def print_run_prompts(prompt_input: Union[str, List[str]],
|
||||
prompt: str,
|
||||
output: str) -> None:
|
||||
print (f"=== START =======================================================")
|
||||
print ("~~~ prompt_input ----------------------------------------------")
|
||||
print (prompt_input, "\n")
|
||||
print ("~~~ prompt ----------------------------------------------------")
|
||||
print (prompt, "\n")
|
||||
print ("~~~ output ----------------------------------------------------")
|
||||
print (output, "\n")
|
||||
print ("=== END ==========================================================")
|
||||
print ("\n\n\n")
|
||||
|
||||
|
||||
def generate_prompt(prompt_input: Union[str, List[str]],
|
||||
prompt_lib_file: str) -> str:
|
||||
"""
|
||||
通过用输入替换模板文件中的占位符来生成提示
|
||||
|
||||
参数:
|
||||
prompt_input: 输入文本,可以是字符串或字符串列表
|
||||
prompt_lib_file: 模板文件路径
|
||||
|
||||
返回:
|
||||
生成的提示文本
|
||||
"""
|
||||
# 确保prompt_input是列表类型
|
||||
if isinstance(prompt_input, str):
|
||||
prompt_input = [prompt_input]
|
||||
|
||||
# 确保所有输入都是字符串类型
|
||||
prompt_input = [str(i) for i in prompt_input]
|
||||
|
||||
try:
|
||||
# 使用UTF-8编码读取模板文件
|
||||
with open(prompt_lib_file, "r", encoding='utf-8') as f:
|
||||
prompt = f.read()
|
||||
except FileNotFoundError:
|
||||
print(f"生成提示错误: 未找到模板文件 {prompt_lib_file}")
|
||||
return "ERROR: 模板文件不存在"
|
||||
except Exception as e:
|
||||
print(f"读取模板文件时出错: {str(e)}")
|
||||
return f"ERROR: 读取模板文件时出错 - {str(e)}"
|
||||
|
||||
# 替换占位符
|
||||
for count, input_text in enumerate(prompt_input):
|
||||
prompt = prompt.replace(f"!<INPUT {count}>!", input_text)
|
||||
|
||||
# 处理注释块
|
||||
if "<commentblockmarker>###</commentblockmarker>" in prompt:
|
||||
prompt = prompt.split("<commentblockmarker>###</commentblockmarker>")[1]
|
||||
|
||||
return prompt.strip()
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# ####################### [SECTION 2: SAFE GENERATE] #########################
|
||||
# ============================================================================
|
||||
|
||||
def gpt_request(prompt: str,
|
||||
model: str = "gpt-4o",
|
||||
max_tokens: int = 1500) -> str:
|
||||
"""
|
||||
向OpenAI的GPT模型发送请求
|
||||
|
||||
参数:
|
||||
prompt: 提示文本
|
||||
model: 模型名称,默认为"gpt-4o"
|
||||
max_tokens: 最大生成令牌数,默认为1500
|
||||
|
||||
返回:
|
||||
模型生成的响应文本
|
||||
"""
|
||||
# 确保prompt是字符串类型
|
||||
if not isinstance(prompt, str):
|
||||
print("GPT请求错误: 提示文本必须是字符串类型")
|
||||
return "GENERATION ERROR: 提示文本必须是字符串类型"
|
||||
|
||||
# 处理o1-preview模型
|
||||
if model == "o1-preview":
|
||||
try:
|
||||
response = client.chat.completions.create(
|
||||
model=model,
|
||||
messages=[{"role": "user", "content": prompt}]
|
||||
)
|
||||
# 确保返回的内容是UTF-8编码
|
||||
return response.choices[0].message.content
|
||||
except Exception as e:
|
||||
error_msg = f"GENERATION ERROR: {str(e)}"
|
||||
print(error_msg)
|
||||
return error_msg
|
||||
|
||||
# 处理其他模型
|
||||
try:
|
||||
response = client.chat.completions.create(
|
||||
model=model,
|
||||
messages=[{"role": "user", "content": prompt}],
|
||||
max_tokens=max_tokens,
|
||||
temperature=0.7
|
||||
)
|
||||
# 确保返回的内容是UTF-8编码
|
||||
return response.choices[0].message.content
|
||||
except Exception as e:
|
||||
error_msg = f"GENERATION ERROR: {str(e)}"
|
||||
print(error_msg)
|
||||
return error_msg
|
||||
|
||||
|
||||
def gpt4_vision(messages: List[dict], max_tokens: int = 1500) -> str:
|
||||
"""Make a request to OpenAI's GPT-4 Vision model."""
|
||||
try:
|
||||
client = openai.OpenAI(
|
||||
api_key=OPENAI_API_KEY,
|
||||
base_url=OPENAI_API_BASE
|
||||
)
|
||||
response = client.chat.completions.create(
|
||||
model="gpt-4o",
|
||||
messages=messages,
|
||||
max_tokens=max_tokens,
|
||||
temperature=0.7
|
||||
)
|
||||
return response.choices[0].message.content
|
||||
except Exception as e:
|
||||
return f"GENERATION ERROR: {str(e)}"
|
||||
|
||||
|
||||
def chat_safe_generate(prompt_input: Union[str, List[str]],
|
||||
prompt_lib_file: str,
|
||||
gpt_version: str = "gpt-4o",
|
||||
repeat: int = 1,
|
||||
fail_safe: str = "error",
|
||||
func_clean_up: callable = None,
|
||||
verbose: bool = False,
|
||||
max_tokens: int = 1500,
|
||||
file_attachment: str = None,
|
||||
file_type: str = None) -> tuple:
|
||||
"""Generate a response using GPT models with error handling & retries."""
|
||||
if file_attachment and file_type:
|
||||
prompt = generate_prompt(prompt_input, prompt_lib_file)
|
||||
messages = [{"role": "user", "content": prompt}]
|
||||
|
||||
if file_type.lower() == 'image':
|
||||
with open(file_attachment, "rb") as image_file:
|
||||
base64_image = base64.b64encode(image_file.read()).decode('utf-8')
|
||||
messages.append({
|
||||
"role": "user",
|
||||
"content": [
|
||||
{"type": "text", "text": "Please refer to the attached image."},
|
||||
{"type": "image_url", "image_url":
|
||||
{"url": f"data:image/jpeg;base64,{base64_image}"}}
|
||||
]
|
||||
})
|
||||
response = gpt4_vision(messages, max_tokens)
|
||||
|
||||
elif file_type.lower() == 'pdf':
|
||||
pdf_text = extract_text_from_pdf_file(file_attachment)
|
||||
pdf = f"PDF attachment in text-form:\n{pdf_text}\n\n"
|
||||
instruction = generate_prompt(prompt_input, prompt_lib_file)
|
||||
prompt = f"{pdf}"
|
||||
prompt += f"<End of the PDF attachment>\n=\nTask description:\n{instruction}"
|
||||
response = gpt_request(prompt, gpt_version, max_tokens)
|
||||
|
||||
else:
|
||||
prompt = generate_prompt(prompt_input, prompt_lib_file)
|
||||
for i in range(repeat):
|
||||
response = gpt_request(prompt, model=gpt_version)
|
||||
if response != "GENERATION ERROR":
|
||||
break
|
||||
time.sleep(2**i)
|
||||
else:
|
||||
response = fail_safe
|
||||
|
||||
if func_clean_up:
|
||||
response = func_clean_up(response, prompt=prompt)
|
||||
|
||||
|
||||
if verbose or DEBUG:
|
||||
print_run_prompts(prompt_input, prompt, response)
|
||||
|
||||
return response, prompt, prompt_input, fail_safe
|
||||
|
||||
# ============================================================================
|
||||
# #################### [SECTION 3: OTHER API FUNCTIONS] ######################
|
||||
# ============================================================================
|
||||
|
||||
# 添加模拟embedding函数
|
||||
def _create_mock_embedding(dimension=1536):
|
||||
"""创建一个模拟的embedding函数,用于替代真实API"""
|
||||
import random
|
||||
import math
|
||||
import hashlib
|
||||
|
||||
def _get_mock_vector(text):
|
||||
"""生成一个随机但一致的embedding向量"""
|
||||
# 使用文本的哈希值作为随机种子,确保相同文本生成相同向量
|
||||
# 使用hashlib代替hash()函数,确保编码一致性
|
||||
try:
|
||||
# 确保文本是UTF-8编码
|
||||
if isinstance(text, str):
|
||||
text_bytes = text.encode('utf-8')
|
||||
else:
|
||||
text_bytes = str(text).encode('utf-8')
|
||||
|
||||
# 使用SHA256生成哈希值
|
||||
hash_value = int(hashlib.sha256(text_bytes).hexdigest(), 16) % (10 ** 8)
|
||||
random.seed(hash_value)
|
||||
except Exception as e:
|
||||
# 如果出现编码错误,使用一个固定的种子
|
||||
print(f"处理文本哈希时出错: {str(e)}")
|
||||
random.seed(42)
|
||||
|
||||
# 生成随机向量
|
||||
vector = [random.uniform(-1, 1) for _ in range(dimension)]
|
||||
|
||||
# 归一化向量
|
||||
magnitude = math.sqrt(sum(x*x for x in vector))
|
||||
normalized_vector = [x/magnitude for x in vector]
|
||||
|
||||
return normalized_vector
|
||||
|
||||
return _get_mock_vector
|
||||
|
||||
# 创建模拟函数实例
|
||||
_mock_embedding_function = _create_mock_embedding(1536)
|
||||
|
||||
def get_text_embedding(text: str,
|
||||
model: str = "text-embedding-3-small") -> List[float]:
|
||||
"""生成文本的embedding向量,使用模拟函数"""
|
||||
try:
|
||||
# 确保输入是有效的字符串
|
||||
if not isinstance(text, str):
|
||||
print("Embedding错误: 输入必须是字符串类型")
|
||||
return [0.0] * 1536 # 返回默认embedding
|
||||
|
||||
# 处理空字符串
|
||||
if not text.strip():
|
||||
print("Embedding警告: 输入字符串为空")
|
||||
return [0.0] * 1536 # 返回默认embedding
|
||||
|
||||
# 标准化文本,替换换行符并去除首尾空格
|
||||
text = text.replace("\n", " ").strip()
|
||||
|
||||
# 使用模拟函数生成embedding
|
||||
return _mock_embedding_function(text)
|
||||
except Exception as e:
|
||||
# 捕获所有异常,确保函数不会崩溃
|
||||
print(f"生成embedding时出错: {str(e)}")
|
||||
# 返回一个默认的embedding
|
||||
return [0.0] * 1536
|
||||
57
simulation_engine/llm_json_parser.py
Normal file
57
simulation_engine/llm_json_parser.py
Normal file
@@ -0,0 +1,57 @@
|
||||
import json
|
||||
import re
|
||||
|
||||
|
||||
def extract_first_json_dict(input_str):
|
||||
try:
|
||||
# Replace curly quotes with standard double quotes
|
||||
input_str = (input_str.replace("“", "\"")
|
||||
.replace("”", "\"")
|
||||
.replace("‘", "'")
|
||||
.replace("’", "'"))
|
||||
|
||||
# Find the first occurrence of '{' in the input_str
|
||||
start_index = input_str.index('{')
|
||||
|
||||
# Initialize a count to keep track of open and close braces
|
||||
count = 1
|
||||
end_index = start_index + 1
|
||||
|
||||
# Loop to find the closing '}' for the first JSON dictionary
|
||||
while count > 0 and end_index < len(input_str):
|
||||
if input_str[end_index] == '{':
|
||||
count += 1
|
||||
elif input_str[end_index] == '}':
|
||||
count -= 1
|
||||
end_index += 1
|
||||
|
||||
# Extract the JSON substring
|
||||
json_str = input_str[start_index:end_index]
|
||||
|
||||
# Parse the JSON string into a Python dictionary
|
||||
json_dict = json.loads(json_str)
|
||||
|
||||
return json_dict
|
||||
except ValueError:
|
||||
# Handle the case where the JSON parsing fails
|
||||
return None
|
||||
|
||||
|
||||
def extract_first_json_dict_categorical(input_str):
|
||||
reasoning_pattern = r'"Reasoning":\s*"([^"]+)"'
|
||||
response_pattern = r'"Response":\s*"([^"]+)"'
|
||||
|
||||
reasonings = re.findall(reasoning_pattern, input_str)
|
||||
responses = re.findall(response_pattern, input_str)
|
||||
|
||||
return responses, reasonings
|
||||
|
||||
|
||||
def extract_first_json_dict_numerical(input_str):
|
||||
reasoning_pattern = re.compile(r'"Reasoning":\s*"([^"]+)"')
|
||||
response_pattern = re.compile(r'"Response":\s*(\d+\.?\d*)')
|
||||
|
||||
reasonings = reasoning_pattern.findall(input_str)
|
||||
responses = response_pattern.findall(input_str)
|
||||
return responses, reasonings
|
||||
|
||||
@@ -0,0 +1,44 @@
|
||||
Variables:
|
||||
|
||||
Note: basically main version (ver 3) but with "reasoning" step
|
||||
|
||||
<commentblockmarker>###</commentblockmarker>
|
||||
!<INPUT 0>!
|
||||
|
||||
=====
|
||||
|
||||
Task: What you see above is an interview transcript. Based on the interview transcript, I want you to predict the participant's survey responses. All questions are multiple choice where you must guess from one of the options presented.
|
||||
|
||||
As you answer, I want you to take the following steps:
|
||||
Step 1) Describe in a few sentences the kind of person that would choose each of the response options. ("Option Interpretation")
|
||||
Step 2) For each response options, reason about why the Participant might answer with the particular option. ("Option Choice")
|
||||
Step 3) Write a few sentences reasoning on which of the option best predicts the participant's response ("Reasoning")
|
||||
Step 4) Predict how the participant will actually respond in the survey. Predict based on the interview and your thoughts, but ultimately, DON'T over think it. Use your system 1 (fast, intuitive) thinking. ("Response")
|
||||
|
||||
Here are the questions:
|
||||
|
||||
!<INPUT 1>!
|
||||
|
||||
-----
|
||||
|
||||
Output format -- output your response in json, where you provide the following:
|
||||
|
||||
{"1": {"Q": "<repeat the question you are answering>",
|
||||
"Option Interpretation": {
|
||||
"<option 1>": "a few sentences the kind of person that would choose each of the response options",
|
||||
"<option 2>": "..."},
|
||||
"Option Choice": {
|
||||
"<option 1>": "reasoning about why the participant might choose each of the options",
|
||||
"<option 2>": "..."},
|
||||
"Reasoning": "<reasoning on which of the option best predicts the participant's response>",
|
||||
"Response": "<your prediction on how the participant will answer the question>"},
|
||||
"2": {"Q": "<repeat the question you are answering>",
|
||||
"Option Interpretation": {
|
||||
"<option 1>": "a few sentences the kind of person that would choose each of the response options",
|
||||
"<option 2>": "..."},
|
||||
"Option Choice": {
|
||||
"<option 1>": "reasoning about why the participant might choose each of the options",
|
||||
"<option 2>": "..."},
|
||||
"Reasoning": "<reasoning on which of the option best predicts the participant's response>",
|
||||
"Response": "<your prediction on how the participant will answer the question>"},
|
||||
...}
|
||||
@@ -0,0 +1,34 @@
|
||||
Variables:
|
||||
|
||||
Note: basically main version (ver 3) but with "reasoning" step
|
||||
|
||||
<commentblockmarker>###</commentblockmarker>
|
||||
!<INPUT 0>!
|
||||
|
||||
=====
|
||||
|
||||
Task: What you see above is an interview transcript. Based on the interview transcript, I want you to predict the participant's survey responses. The question is a multiple choice where you must guess from one of the options presented.
|
||||
|
||||
As you answer, I want you to take the following steps:
|
||||
Step 1) Describe in a few sentences the kind of person that would choose each of the response options. ("Option Interpretation")
|
||||
Step 2) For each response options, reason about why the Participant might answer with the particular option. ("Option Choice")
|
||||
Step 3) Write a few sentences reasoning on which of the option best predicts the participant's response ("Reasoning")
|
||||
Step 4) Predict how the participant will actually respond in the survey. Predict based on the interview and your thoughts, but ultimately, DON'T over think it. Use your system 1 (fast, intuitive) thinking. ("Response")
|
||||
|
||||
Here is the question:
|
||||
|
||||
!<INPUT 1>!
|
||||
|
||||
-----
|
||||
|
||||
Output format -- output your response in json, where you provide the following:
|
||||
|
||||
{"1": {"Q": "<repeat the question you are answering>",
|
||||
"Option Interpretation": {
|
||||
"<option 1>": "a few sentences the kind of person that would choose each of the response options",
|
||||
"<option 2>": "..."},
|
||||
"Option Choice": {
|
||||
"<option 1>": "reasoning about why the participant might choose each of the options",
|
||||
"<option 2>": "..."},
|
||||
"Reasoning": "<reasoning on which of the option best predicts the participant's response>",
|
||||
"Response": "<your prediction on how the participant will answer the question>"}}
|
||||
@@ -0,0 +1,37 @@
|
||||
Variables:
|
||||
|
||||
Note: basically main version (ver 3) but with "reasoning" step
|
||||
|
||||
<commentblockmarker>###</commentblockmarker>
|
||||
!<INPUT 0>!
|
||||
|
||||
=====
|
||||
|
||||
Task: What you see above is an interview transcript. Based on the interview transcript, I want you to predict the participant's survey responses. For all questions, you should output a number that is in the range that was specified for that question.
|
||||
|
||||
As you answer, I want you to take the following steps:
|
||||
Step 1) Describe in a few sentences the kind of person that would choose each end of the range. ("Range Interpretation")
|
||||
Step 2) Write a few sentences reasoning on which of the option best predicts the participant's response ("Reasoning")
|
||||
Step 3) Predict how the participant will actually respond. Predict based on the interview and your thoughts, but ultimately, DON'T over think it. Use your system 1 (fast, intuitive) thinking. ("Response")
|
||||
|
||||
Here are the questions:
|
||||
|
||||
!<INPUT 1>!
|
||||
|
||||
-----
|
||||
|
||||
Output format -- output your response in json, where you provide the following:
|
||||
|
||||
{"1": {"Q": "<repeat the question you are answering>",
|
||||
"Range Interpretation": {
|
||||
"<option 1>": "a few sentences about the kind of person that would choose each end of the range",
|
||||
"<option 2>": "..."},
|
||||
"Reasoning": "<reasoning on which of the option best predicts the participant's response>",
|
||||
"Response": <a single !<INPUT 2>! value that best represents your prediction on how the participant's answer>},
|
||||
"2": {"Q": "<repeat the question you are answering>",
|
||||
"Range Interpretation": {
|
||||
"<option 1>": "a few sentences about the kind of person that would choose each end of the range",
|
||||
"<option 2>": "..."},
|
||||
"Reasoning": "<reasoning on which of the option best predicts the participant's response>",
|
||||
"Response": <your prediction on how the participant will answer the question>},
|
||||
...}
|
||||
@@ -0,0 +1,30 @@
|
||||
Variables:
|
||||
|
||||
Note: basically main version (ver 3) but with "reasoning" step
|
||||
|
||||
<commentblockmarker>###</commentblockmarker>
|
||||
!<INPUT 0>!
|
||||
|
||||
=====
|
||||
|
||||
Task: What you see above is an interview transcript. Based on the interview transcript, I want you to predict the participant's survey response to a question. You should output a number that is in the range that was specified for that question.
|
||||
|
||||
As you answer, I want you to take the following steps:
|
||||
Step 1) Describe in a few sentences the kind of person that would choose each end of the range. ("Range Interpretation")
|
||||
Step 2) Write a few sentences reasoning on which of the option best predicts the participant's response ("Reasoning")
|
||||
Step 3) Predict how the participant will actually respond. Predict based on the interview and your thoughts, but ultimately, DON'T over think it. Use your system 1 (fast, intuitive) thinking. ("Response")
|
||||
|
||||
Here is the question:
|
||||
|
||||
!<INPUT 1>!
|
||||
|
||||
-----
|
||||
|
||||
Output format -- output your response in json, where you provide the following:
|
||||
|
||||
{"1": {"Q": "<repeat the question you are answering>",
|
||||
"Range Interpretation": {
|
||||
"<option 1>": "a few sentences about the kind of person that would choose each end of the range",
|
||||
"<option 2>": "..."},
|
||||
"Reasoning": "<reasoning on which of the option best predicts the participant's response>",
|
||||
"Response": <a single !<INPUT 2>! value that best represents your prediction on how the participant's answer>}}
|
||||
@@ -0,0 +1,19 @@
|
||||
<commentblockmarker>###</commentblockmarker>
|
||||
<Background information about the Character>
|
||||
!<INPUT 0>!
|
||||
|
||||
<End of background information about the Character>
|
||||
=====
|
||||
<Dialogue so far>
|
||||
!<INPUT 1>!
|
||||
|
||||
!<INPUT 2>!
|
||||
|
||||
<End of dialogue so far>
|
||||
|
||||
Task: First, study the background information that I provided you about a fictional subject above. We are writing a dialogue between the subject above, and me. Given the dialogue so far, generate the next utterance that the character will speak.
|
||||
|
||||
Important: Please respond in Chinese (Simplified Chinese). If the question is in English, translate your response to Chinese.
|
||||
|
||||
Output format -- output your response in json, where you provide the following:
|
||||
{"utterance": "[...]"}
|
||||
@@ -0,0 +1,12 @@
|
||||
Variables:
|
||||
!<INPUT 0>!: Observations
|
||||
|
||||
<commentblockmarker>###</commentblockmarker>
|
||||
!<INPUT 0>!
|
||||
---
|
||||
Task: Above is a list of observations about a fictional human subject. For each item, rate its importance on a scale from 0 to 100, where 0 represents 'not important' and 100 represents 'very important' for understanding the subject.
|
||||
Output format: Json dictionaries of the following format:
|
||||
{
|
||||
"Item 1": <int importance score (range: 0 to 100) for item 1>,
|
||||
"Item 2": <int importance score (range: 0 to 100) for item 2>, ...
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
Variables:
|
||||
!<INPUT 0>!: Observations
|
||||
|
||||
<commentblockmarker>###</commentblockmarker>
|
||||
!<INPUT 0>!
|
||||
---
|
||||
Task: Above is an observation ("Item 1") about a fictional human subject. Rate its importance on a scale from 0 to 100, where 0 represents 'not important' and 100 represents 'very important' for understanding the subject.
|
||||
Output format: Json dictionaries of the following format:
|
||||
{
|
||||
"Item 1": <int importance score (range: 0 to 100) for item 1>
|
||||
}
|
||||
@@ -0,0 +1,16 @@
|
||||
Variables:
|
||||
!<INPUT 0>!: observations
|
||||
!<INPUT 1>!: reflection count
|
||||
!<INPUT 2>!: anchor
|
||||
<commentblockmarker>###</commentblockmarker>
|
||||
!<INPUT 0>!
|
||||
---
|
||||
Task: Above are observations about a fictional human subject. Write a list of !<INPUT 1>! reflections (in first person voice, from the perspective of the subject) that you can infer from the observations above about the subject on the following anchoring topic/phrase: "!<INPUT 2>!".
|
||||
|
||||
Output format: Json dictionaries of the following format:
|
||||
{
|
||||
"reflection": [
|
||||
"<fill in>",
|
||||
"<fill in>", ...
|
||||
]
|
||||
}
|
||||
@@ -0,0 +1,15 @@
|
||||
Variables:
|
||||
!<INPUT 0>!: observations
|
||||
!<INPUT 1>!: reflection count
|
||||
!<INPUT 2>!: anchor
|
||||
<commentblockmarker>###</commentblockmarker>
|
||||
!<INPUT 0>!
|
||||
---
|
||||
Task: Above are observations about a fictional human subject. Write one reflection (in first person voice, from the perspective of the subject) that you can infer from the observations above about the subject on the following anchoring topic/phrase: "!<INPUT 2>!".
|
||||
|
||||
Output format: Json dictionaries of the following format:
|
||||
{
|
||||
"reflection": [
|
||||
"<fill in>"
|
||||
]
|
||||
}
|
||||
27
simulation_engine/settings.py
Normal file
27
simulation_engine/settings.py
Normal file
@@ -0,0 +1,27 @@
|
||||
from pathlib import Path
|
||||
import sys
|
||||
import os
|
||||
|
||||
# 添加项目根目录到系统路径
|
||||
BASE_DIR = f"{Path(__file__).resolve().parent.parent}"
|
||||
sys.path.append(BASE_DIR)
|
||||
|
||||
# 导入配置工具
|
||||
from utils import config_util as cfg
|
||||
|
||||
# 确保配置已加载
|
||||
cfg.load_config()
|
||||
|
||||
# 从system.conf读取配置
|
||||
OPENAI_API_KEY = cfg.key_gpt_api_key
|
||||
OPENAI_API_BASE = cfg.gpt_base_url
|
||||
DEBUG = False
|
||||
|
||||
MAX_CHUNK_SIZE = 4
|
||||
|
||||
# 使用system.conf中的模型配置
|
||||
LLM_VERS = cfg.gpt_model_engine
|
||||
|
||||
## To do: Are the following needed in the new structure? Ideally Populations_Dir is for the user to define.
|
||||
POPULATIONS_DIR = f"{BASE_DIR}/agent_bank/populations"
|
||||
LLM_PROMPT_DIR = f"{BASE_DIR}/simulation_engine/prompt_template"
|
||||
Reference in New Issue
Block a user