Compare commits

...

6 Commits

Author SHA1 Message Date
zhayujie
9879878dd0 fix: concurrency issue in session 2026-03-12 17:08:09 +08:00
zhayujie
d78105d57c fix: tool call match 2026-03-12 17:05:27 +08:00
zhayujie
153c9e3565 fix(memory): remove useless prompt 2026-03-12 15:29:58 +08:00
zhayujie
c11623596d fix(memory): prevent context memory loss by improving trim strategy 2026-03-12 15:25:46 +08:00
zhayujie
e791a77f77 fix: strengthen bootstrap flow 2026-03-12 12:13:05 +08:00
zhayujie
b641bffb2c fix(feishu): remove bot_name dependency for group chat 2026-03-12 11:30:42 +08:00
8 changed files with 259 additions and 273 deletions

View File

@@ -408,179 +408,6 @@ class MemoryManager:
self._dirty = True
return success
def build_memory_guidance(self, lang: str = "zh", include_context: bool = True) -> str:
"""
Build natural memory guidance for agent system prompt
Following clawdbot's approach:
1. Load MEMORY.md as bootstrap context (blends into background)
2. Load daily files on-demand via memory_search tool
3. Agent should NOT proactively mention memories unless user asks
Args:
lang: Language for guidance ("en" or "zh")
include_context: Whether to include bootstrap memory context (default: True)
MEMORY.md is loaded as background context (like clawdbot)
Daily files are accessed via memory_search tool
Returns:
Memory guidance text (and optionally context) for system prompt
"""
today_file = self.flush_manager.get_today_memory_file().name
if lang == "zh":
guidance = f"""## 记忆系统
**背景知识**: 下方包含核心长期记忆,可直接使用。需要查找历史时,用 memory_search 搜索(搜索一次即可,不要重复)。
**主动存储**: 遇到以下情况时,主动用 edit/write 工具存储(无需告知用户):
- 用户要求记住的信息、个人偏好、重要决策
- 对话中产生的重要结论、方案、约定
- 完成复杂任务后的关键步骤和结果
- 长期信息 → MEMORY.md保持精简
- 当天笔记 → memory/{today_file}
**使用原则**: 自然使用记忆,就像你本来就知道。不需要生硬地提起或列举记忆,除非用户提到。"""
else:
guidance = f"""## Memory System
**Background Knowledge**: Core long-term memories below - use directly. For history, use memory_search once (don't repeat).
**Proactive Storage**: Store memories silently when:
- User asks to remember something, shares preferences or decisions
- Important conclusions, plans, or agreements emerge in conversation
- Complex tasks are completed (record key steps and results)
- Durable info → MEMORY.md (keep concise)
- Daily notes → memory/{today_file}
**Usage**: Use memories naturally as if you always knew. Don't mention or list unless user explicitly asks."""
if include_context:
# Load bootstrap context (MEMORY.md only, like clawdbot)
bootstrap_context = self.load_bootstrap_memories()
if bootstrap_context:
guidance += f"\n\n## Background Context\n\n{bootstrap_context}"
return guidance
def load_bootstrap_memories(self, user_id: Optional[str] = None) -> str:
"""
Load bootstrap memory files for session start
Loads:
1. MEMORY.md from workspace root (long-term curated memory)
2. User-specific MEMORY.md if user_id provided
3. Recent daily memory files (today + yesterday) for continuity
Returns memory content WITHOUT obvious headers so it blends naturally
into the context as background knowledge.
Args:
user_id: Optional user ID for user-specific memories
Returns:
Memory content to inject into system prompt
"""
workspace_dir = self.config.get_workspace()
memory_dir = self.config.get_memory_dir()
sections = []
# 1. Load MEMORY.md from workspace root (long-term curated memory)
memory_file = Path(workspace_dir) / "MEMORY.md"
if memory_file.exists():
try:
content = memory_file.read_text(encoding='utf-8').strip()
if content:
sections.append(content)
except Exception as e:
from common.log import logger
logger.warning(f"[MemoryManager] Failed to read MEMORY.md: {e}")
# 2. Load user-specific MEMORY.md if user_id provided
if user_id:
user_memory_dir = memory_dir / "users" / user_id
user_memory_file = user_memory_dir / "MEMORY.md"
if user_memory_file.exists():
try:
content = user_memory_file.read_text(encoding='utf-8').strip()
if content:
sections.append(content)
except Exception as e:
from common.log import logger
logger.warning(f"[MemoryManager] Failed to read user memory: {e}")
# 3. Load recent daily memory files (today + yesterday) for context continuity
recent_daily = self._load_recent_daily_memories(
memory_dir, user_id, days=2, max_tokens=2000
)
if recent_daily:
sections.append(recent_daily)
if not sections:
return ""
return "\n\n".join(sections)
def _load_recent_daily_memories(
self,
memory_dir: Path,
user_id: Optional[str],
days: int = 2,
max_tokens: int = 2000
) -> str:
"""
Load recent daily memory files for bootstrap context.
Loads the most recent N days that have non-empty content.
Args:
memory_dir: Memory directory path
user_id: Optional user ID
days: Number of recent days to load
max_tokens: Approximate max tokens to include (rough char estimate)
"""
from common.log import logger
daily_sections = []
total_chars = 0
max_chars = max_tokens * 4 # rough token-to-char ratio
for i in range(days):
date = (datetime.now() - timedelta(days=i)).strftime("%Y-%m-%d")
# Check user-specific daily file first, then shared
candidates = []
if user_id:
candidates.append(memory_dir / "users" / user_id / f"{date}.md")
candidates.append(memory_dir / f"{date}.md")
for daily_file in candidates:
if not daily_file.exists():
continue
try:
content = daily_file.read_text(encoding='utf-8').strip()
if not content or len(content) < 30:
continue
# Truncate if adding this would exceed limit
remaining = max_chars - total_chars
if remaining <= 0:
break
if len(content) > remaining:
content = content[:remaining] + "\n...(truncated)"
label = "Today" if i == 0 else "Yesterday" if i == 1 else date
daily_sections.append(f"### {label} ({date})\n{content}")
total_chars += len(content)
break # only load one file per date (user-specific takes priority)
except Exception as e:
logger.warning(f"[MemoryManager] Failed to read daily memory {daily_file}: {e}")
if not daily_sections:
return ""
return "### Recent Activity\n\n" + "\n\n".join(daily_sections)
def get_status(self) -> Dict[str, Any]:
"""Get memory status"""
stats = self.storage.get_stats()

View File

@@ -186,16 +186,24 @@ def _is_template_placeholder(content: str) -> bool:
def _is_onboarding_done(workspace_dir: str) -> bool:
"""Check if AGENT.md has been filled in (name field is no longer a placeholder)"""
"""Check if AGENT.md or USER.md has been modified from the original template"""
agent_path = os.path.join(workspace_dir, DEFAULT_AGENT_FILENAME)
if not os.path.exists(agent_path):
return False
try:
with open(agent_path, 'r', encoding='utf-8') as f:
content = f.read()
return "*(在首次对话时填写" not in content
except Exception:
return False
user_path = os.path.join(workspace_dir, DEFAULT_USER_FILENAME)
agent_template = _get_agent_template().strip()
user_template = _get_user_template().strip()
for path, template in [(agent_path, agent_template), (user_path, user_template)]:
if not os.path.exists(path):
continue
try:
with open(path, 'r', encoding='utf-8') as f:
content = f.read().strip()
if content != template:
return True
except Exception:
continue
return False
# ============= 模板内容 =============
@@ -356,15 +364,18 @@ _你刚刚启动这是你的第一次对话。_
**重要**: 如果用户第一句话是具体的任务或提问,先回答他们的问题,然后在回复末尾自然地引导初始化(如:"顺便问一下,你想怎么称呼我?我该怎么叫你?")。
## 确定后
## 信息写入(必须严格执行)
用 `edit` 工具将收集到的信息更新到:
- `AGENT.md` — 你的名字、角色、性格、交流风格
- `USER.md` — 用户的姓名、称呼
每当用户提供了名字、称呼、风格等任何初始化信息时,**必须在当轮回复中立即调用 `edit` 工具写入文件**,不能只口头确认。
## 完成后
- `AGENT.md` — 你的名字、角色、性格、交流风格(每收到一条相关信息就立即更新对应字段)
- `USER.md` — 用户的姓名、称呼、基本信息等
用 bash 执行 `rm BOOTSTRAP.md` 删除此文件。你不再需要引导脚本了——你已经是你了
⚠️ 只说"记住了"而不调用 edit 写入 = 没有完成。信息只有写入文件才会被持久保存
## 全部完成后
当 AGENT.md 和 USER.md 的核心字段都已填写后,用 bash 执行 `rm BOOTSTRAP.md` 删除此文件。你不再需要引导脚本了——你已经是你了。
"""

View File

@@ -8,7 +8,7 @@ import time
from typing import List, Dict, Any, Optional, Callable, Tuple
from agent.protocol.models import LLMRequest, LLMModel
from agent.protocol.message_utils import sanitize_claude_messages
from agent.protocol.message_utils import sanitize_claude_messages, compress_turn_to_text_only
from agent.tools.base_tool import BaseTool, ToolResult
from common.log import logger
@@ -191,6 +191,16 @@ class AgentStreamExecutor:
]
})
# Trim context ONCE before the agent loop starts, not during tool steps.
# This ensures tool_use/tool_result chains created during the current run
# are never stripped mid-execution (which would cause LLM loops).
self._trim_messages()
# Validate after trimming: trimming may leave orphaned tool_use at the
# boundary (e.g. the last kept turn ends with an assistant tool_use whose
# tool_result was in a discarded turn).
self._validate_and_fix_messages()
self._emit_event("agent_start")
final_response = ""
@@ -481,14 +491,10 @@ class AgentStreamExecutor:
Returns:
(response_text, tool_calls)
"""
# Validate and fix message history first
self._validate_and_fix_messages()
# Trim messages if needed (using agent's context management)
self._trim_messages()
# Re-validate after trimming: trimming may produce new orphaned
# tool_result messages when it removes turns at the boundary.
# Validate and fix message history (e.g. orphaned tool_result blocks).
# Context trimming is done once in run_stream() before the loop starts,
# NOT here — trimming mid-execution would strip the current run's
# tool_use/tool_result chains and cause LLM loops.
self._validate_and_fix_messages()
# Prepare messages
@@ -1165,10 +1171,10 @@ class AgentStreamExecutor:
if not turns:
return
# Step 2: 轮次限制 - 超出时裁到 max_turns/2批量 flush 被裁的轮次
# Step 2: 轮次限制 - 超出时移除前一半,保留后一半
if len(turns) > self.max_context_turns:
keep_count = max(1, self.max_context_turns // 2)
removed_count = len(turns) - keep_count
removed_count = len(turns) // 2
keep_count = len(turns) - removed_count
# Flush discarded turns to daily memory
if self.agent.memory_manager:
@@ -1223,9 +1229,47 @@ class AgentStreamExecutor:
logger.info(f" 重建消息列表: {old_count} -> {len(self.messages)} 条消息")
return
# Token limit exceeded - keep the latest half of turns (same strategy as turn limit)
keep_count = max(1, len(turns) // 2)
removed_count = len(turns) - keep_count
# Token limit exceeded — tiered strategy based on turn count:
#
# Few turns (<5): Compress ALL turns to text-only (strip tool chains,
# keep user query + final reply). Never discard turns
# — losing even one is too painful when context is thin.
#
# Many turns (>=5): Directly discard the first half of turns.
# With enough turns the oldest ones are less
# critical, and keeping the recent half intact
# (with full tool chains) is more useful.
COMPRESS_THRESHOLD = 5
if len(turns) < COMPRESS_THRESHOLD:
# --- Few turns: compress ALL turns to text-only, never discard ---
compressed_turns = []
for t in turns:
compressed = compress_turn_to_text_only(t)
if compressed["messages"]:
compressed_turns.append(compressed)
new_messages = []
for turn in compressed_turns:
new_messages.extend(turn["messages"])
new_tokens = sum(self._estimate_turn_tokens(t) for t in compressed_turns)
old_count = len(self.messages)
self.messages = new_messages
logger.info(
f"📦 上下文tokens超限(轮次<{COMPRESS_THRESHOLD}): "
f"~{current_tokens + system_tokens} > {max_tokens}"
f"压缩全部 {len(turns)} 轮为纯文本 "
f"({old_count} -> {len(self.messages)} 条消息,"
f"~{current_tokens + system_tokens} -> ~{new_tokens + system_tokens} tokens)"
)
return
# --- Many turns (>=5): discard the older half, keep the newer half ---
removed_count = len(turns) // 2
keep_count = len(turns) - removed_count
kept_turns = turns[-keep_count:]
kept_tokens = sum(self._estimate_turn_tokens(t) for t in kept_turns)
@@ -1234,7 +1278,6 @@ class AgentStreamExecutor:
f"裁剪至 {keep_count} 轮(移除 {removed_count} 轮)"
)
# Flush discarded turns to daily memory
if self.agent.memory_manager:
discarded_messages = []
for turn in turns[:removed_count]:
@@ -1245,14 +1288,14 @@ class AgentStreamExecutor:
messages=discarded_messages, user_id=user_id,
reason="trim", max_messages=0
)
new_messages = []
for turn in kept_turns:
new_messages.extend(turn['messages'])
old_count = len(self.messages)
self.messages = new_messages
logger.info(
f" 移除了 {removed_count} 轮对话 "
f"({old_count} -> {len(self.messages)} 条消息,"

View File

@@ -70,67 +70,71 @@ def sanitize_claude_messages(messages: List[Dict]) -> int:
else:
break
# 3. Full scan: ensure every tool_result references a known tool_use id
known_ids: Set[str] = set()
i = 0
while i < len(messages):
msg = messages[i]
role = msg.get("role")
content = msg.get("content", [])
# 3. Iteratively remove unmatched tool_use / tool_result until stable.
# Removing one broken message can orphan others (e.g. an assistant msg
# with both matched and unmatched tool_use — deleting it orphans the
# previously-matched tool_result). Loop until clean.
for _ in range(5):
use_ids: Set[str] = set()
result_ids: Set[str] = set()
for msg in messages:
for block in (msg.get("content") or []):
if not isinstance(block, dict):
continue
if block.get("type") == "tool_use" and block.get("id"):
use_ids.add(block["id"])
elif block.get("type") == "tool_result" and block.get("tool_use_id"):
result_ids.add(block["tool_use_id"])
if role == "assistant" and isinstance(content, list):
for block in content:
if isinstance(block, dict) and block.get("type") == "tool_use":
tid = block.get("id", "")
if tid:
known_ids.add(tid)
bad_use = use_ids - result_ids
bad_result = result_ids - use_ids
if not bad_use and not bad_result:
break
elif role == "user" and isinstance(content, list):
if not _has_block_type(content, "tool_result"):
pass_removed = 0
i = 0
while i < len(messages):
msg = messages[i]
role = msg.get("role")
content = msg.get("content", [])
if not isinstance(content, list):
i += 1
continue
orphaned = [
b.get("tool_use_id", "")
for b in content
if isinstance(b, dict)
and b.get("type") == "tool_result"
and b.get("tool_use_id", "")
and b.get("tool_use_id", "") not in known_ids
]
if orphaned:
orphaned_set = set(orphaned)
if not _has_block_type(content, "text"):
logger.warning(
f"⚠️ Removing orphaned tool_result message (tool_ids: {orphaned})"
)
messages.pop(i)
removed += 1
# Also remove a preceding broken assistant tool_use message
if i > 0 and messages[i - 1].get("role") == "assistant":
prev = messages[i - 1].get("content", [])
if isinstance(prev, list) and _has_block_type(prev, "tool_use"):
messages.pop(i - 1)
removed += 1
i -= 1
continue
else:
new_content = [
b for b in content
if not (
isinstance(b, dict)
and b.get("type") == "tool_result"
and b.get("tool_use_id", "") in orphaned_set
)
]
delta = len(content) - len(new_content)
if delta:
logger.warning(
f"⚠️ Stripped {delta} orphaned tool_result block(s) from mixed message"
)
msg["content"] = new_content
removed += delta
i += 1
if role == "assistant" and bad_use and any(
isinstance(b, dict) and b.get("type") == "tool_use"
and b.get("id") in bad_use for b in content
):
logger.warning(f"⚠️ Removing assistant msg with unmatched tool_use")
messages.pop(i)
pass_removed += 1
continue
if role == "user" and bad_result and _has_block_type(content, "tool_result"):
has_bad = any(
isinstance(b, dict) and b.get("type") == "tool_result"
and b.get("tool_use_id") in bad_result for b in content
)
if has_bad:
if not _has_block_type(content, "text"):
logger.warning(f"⚠️ Removing user msg with unmatched tool_result")
messages.pop(i)
pass_removed += 1
continue
else:
before = len(content)
msg["content"] = [
b for b in content
if not (isinstance(b, dict) and b.get("type") == "tool_result"
and b.get("tool_use_id") in bad_result)
]
pass_removed += before - len(msg["content"])
i += 1
removed += pass_removed
if pass_removed == 0:
break
if removed:
logger.info(f"🔧 Message validation: removed {removed} broken message(s)")
@@ -177,3 +181,60 @@ def _has_block_type(content: list, block_type: str) -> bool:
isinstance(b, dict) and b.get("type") == block_type
for b in content
)
def _extract_text_from_content(content) -> str:
"""Extract plain text from a message content field (str or list of blocks)."""
if isinstance(content, str):
return content.strip()
if isinstance(content, list):
parts = [
b.get("text", "")
for b in content
if isinstance(b, dict) and b.get("type") == "text"
]
return "\n".join(p for p in parts if p).strip()
return ""
def compress_turn_to_text_only(turn: Dict) -> Dict:
"""
Compress a full turn (with tool_use/tool_result chains) into a lightweight
text-only turn that keeps only the first user text and the last assistant text.
This preserves the conversational context (what the user asked and what the
agent concluded) while stripping out the bulky intermediate tool interactions.
Returns a new turn dict with a ``messages`` list; the original is not mutated.
"""
user_text = ""
last_assistant_text = ""
for msg in turn["messages"]:
role = msg.get("role")
content = msg.get("content", [])
if role == "user":
if isinstance(content, list) and _has_block_type(content, "tool_result"):
continue
if not user_text:
user_text = _extract_text_from_content(content)
elif role == "assistant":
text = _extract_text_from_content(content)
if text:
last_assistant_text = text
compressed_messages = []
if user_text:
compressed_messages.append({
"role": "user",
"content": [{"type": "text", "text": user_text}]
})
if last_assistant_text:
compressed_messages.append({
"role": "assistant",
"content": [{"type": "text", "text": last_assistant_text}]
})
return {"messages": compressed_messages}

View File

@@ -431,7 +431,7 @@ class ChatChannel(Channel):
if session_id not in self.sessions:
self.sessions[session_id] = [
Dequeue(),
threading.BoundedSemaphore(conf().get("concurrency_in_session", 4)),
threading.BoundedSemaphore(conf().get("concurrency_in_session", 1)),
]
if context.type == ContextType.TEXT and context.content.startswith("#"):
self.sessions[session_id][0].putleft(context) # 优先处理管理命令

View File

@@ -69,6 +69,7 @@ class FeiShuChanel(ChatChannel):
self._http_server = None
self._ws_client = None
self._ws_thread = None
self._bot_open_id = None # cached bot open_id for @-mention matching
logger.debug("[FeiShu] app_id={}, app_secret={}, verification_token={}, event_mode={}".format(
self.feishu_app_id, self.feishu_app_secret, self.feishu_token, self.feishu_event_mode))
# 无需群校验和前缀
@@ -85,11 +86,31 @@ class FeiShuChanel(ChatChannel):
self.feishu_app_secret = conf().get('feishu_app_secret')
self.feishu_token = conf().get('feishu_token')
self.feishu_event_mode = conf().get('feishu_event_mode', 'websocket')
self._fetch_bot_open_id()
if self.feishu_event_mode == 'websocket':
self._startup_websocket()
else:
self._startup_webhook()
def _fetch_bot_open_id(self):
"""Fetch the bot's own open_id via API so we can match @-mentions without feishu_bot_name."""
try:
access_token = self.fetch_access_token()
if not access_token:
logger.warning("[FeiShu] Cannot fetch bot info: no access_token")
return
headers = {"Authorization": "Bearer " + access_token}
resp = requests.get("https://open.feishu.cn/open-apis/bot/v3/info/", headers=headers, timeout=5)
if resp.status_code == 200:
data = resp.json()
if data.get("code") == 0:
self._bot_open_id = data.get("bot", {}).get("open_id")
logger.info(f"[FeiShu] Bot open_id fetched: {self._bot_open_id}")
else:
logger.warning(f"[FeiShu] Fetch bot info failed: code={data.get('code')}, msg={data.get('msg')}")
except Exception as e:
logger.warning(f"[FeiShu] Fetch bot open_id error: {e}")
def stop(self):
import ctypes
logger.info("[FeiShu] stop() called")
@@ -147,11 +168,15 @@ class FeiShuChanel(ChatChannel):
def handle_message_event(data: lark.im.v1.P2ImMessageReceiveV1) -> None:
"""处理接收消息事件 v2.0"""
try:
logger.debug(f"[FeiShu] websocket receive event: {lark.JSON.marshal(data, indent=2)}")
# 转换为标准的event格式
event_dict = json.loads(lark.JSON.marshal(data))
event = event_dict.get("event", {})
msg = event.get("message", {})
# Skip group messages that don't @-mention the bot (reduce log noise)
if msg.get("chat_type") == "group" and not msg.get("mentions") and msg.get("message_type") == "text":
return
logger.debug(f"[FeiShu] websocket receive event: {lark.JSON.marshal(data, indent=2)}")
# 处理消息
self._handle_message_event(event)
@@ -236,6 +261,27 @@ class FeiShuChanel(ChatChannel):
logger.info("[FeiShu] ✅ Websocket thread started, ready to receive messages")
ws_thread.join()
def _is_mention_bot(self, mentions: list) -> bool:
"""Check whether any mention in the list refers to this bot.
Priority:
1. Match by open_id (obtained from /bot/v3/info at startup, no config needed)
2. Fallback to feishu_bot_name config for backward compatibility
3. If neither is available, assume the first mention is the bot (Feishu only
delivers group messages that @-mention the bot, so this is usually correct)
"""
if self._bot_open_id:
return any(
m.get("id", {}).get("open_id") == self._bot_open_id
for m in mentions
)
bot_name = conf().get("feishu_bot_name")
if bot_name:
return any(m.get("name") == bot_name for m in mentions)
# Feishu event subscription only delivers messages that @-mention the bot,
# so reaching here means the bot was indeed mentioned.
return True
def _handle_message_event(self, event: dict):
"""
处理消息事件的核心逻辑
@@ -270,10 +316,9 @@ class FeiShuChanel(ChatChannel):
if not msg.get("mentions") and msg.get("message_type") == "text":
# 群聊中未@不响应
return
if msg.get("mentions") and msg.get("mentions")[0].get("name") != conf().get("feishu_bot_name") and msg.get(
"message_type") == "text":
# 不是@机器人,不响应
return
if msg.get("mentions") and msg.get("message_type") == "text":
if not self._is_mention_bot(msg.get("mentions")):
return
# 群聊
is_group = True
receive_id_type = "chat_id"

View File

@@ -20,7 +20,6 @@
"use_linkai": false,
"linkai_api_key": "",
"linkai_app_code": "",
"feishu_bot_name": "",
"feishu_app_id": "",
"feishu_app_secret": "",
"dingtalk_client_id": "",

View File

@@ -37,7 +37,7 @@ available_setting = {
"group_name_white_list": ["ChatGPT测试群", "ChatGPT测试群2"], # 开启自动回复的群名称列表
"group_name_keyword_white_list": [], # 开启自动回复的群名称关键词列表
"group_chat_in_one_session": ["ChatGPT测试群"], # 支持会话上下文共享的群名称
"group_shared_session": True, # 群聊是否共享会话上下文(所有成员共享)默认为True。False时每个用户在群内有独立会话
"group_shared_session": False, # 群聊是否共享会话上下文所有成员共享。False时每个用户在群内有独立会话
"nick_name_black_list": [], # 用户昵称黑名单
"group_welcome_msg": "", # 配置新人进群固定欢迎语,不配置则使用随机风格欢迎
"trigger_by_self": False, # 是否允许机器人触发