diff --git a/agent/memory/__init__.py b/agent/memory/__init__.py index 89f03c3..6ad3b57 100644 --- a/agent/memory/__init__.py +++ b/agent/memory/__init__.py @@ -9,6 +9,7 @@ from agent.memory.manager import MemoryManager from agent.memory.config import MemoryConfig, get_default_memory_config, set_global_memory_config from agent.memory.embedding import create_embedding_provider from agent.memory.conversation_store import ConversationStore, get_conversation_store +from agent.memory.summarizer import ensure_daily_memory_file __all__ = [ 'MemoryManager', @@ -18,4 +19,5 @@ __all__ = [ 'create_embedding_provider', 'ConversationStore', 'get_conversation_store', + 'ensure_daily_memory_file', ] diff --git a/agent/memory/config.py b/agent/memory/config.py index 50b7d0d..70d303d 100644 --- a/agent/memory/config.py +++ b/agent/memory/config.py @@ -48,9 +48,6 @@ class MemoryConfig: enable_auto_sync: bool = True sync_on_search: bool = True - # Memory flush config (独立于模型 context window) - flush_token_threshold: int = 50000 # 50K tokens 触发 flush - flush_turn_threshold: int = 20 # 20 轮对话触发 flush (用户+AI各一条为一轮) def get_workspace(self) -> Path: """Get workspace root directory""" diff --git a/agent/memory/manager.py b/agent/memory/manager.py index 5463953..e0f39f9 100644 --- a/agent/memory/manager.py +++ b/agent/memory/manager.py @@ -363,76 +363,33 @@ class MemoryManager: size=stat.st_size ) - def should_flush_memory( + def flush_memory( self, - current_tokens: int = 0 - ) -> bool: - """ - Check if memory flush should be triggered - - 独立的 flush 触发机制,不依赖模型 context window。 - 使用配置中的阈值: flush_token_threshold 和 flush_turn_threshold - - Args: - current_tokens: Current session token count - - Returns: - True if memory flush should run - """ - return self.flush_manager.should_flush( - current_tokens=current_tokens, - token_threshold=self.config.flush_token_threshold, - turn_threshold=self.config.flush_turn_threshold - ) - - def increment_turn(self): - """增加对话轮数计数(每次用户消息+AI回复算一轮)""" - self.flush_manager.increment_turn() - - async def execute_memory_flush( - self, - agent_executor, - current_tokens: int, + messages: list, user_id: Optional[str] = None, - **executor_kwargs + reason: str = "threshold", + max_messages: int = 10, ) -> bool: """ - Execute memory flush before compaction - - This runs a silent agent turn to write durable memories to disk. - Similar to clawdbot's pre-compaction memory flush. + Flush conversation summary to daily memory file. Args: - agent_executor: Async function to execute agent with prompt - current_tokens: Current session token count + messages: Conversation message list user_id: Optional user ID - **executor_kwargs: Additional kwargs for agent executor - + reason: "threshold" | "overflow" | "daily_summary" + max_messages: Max recent messages to include (0 = all) + Returns: - True if flush completed successfully - - Example: - >>> async def run_agent(prompt, system_prompt, silent=False): - ... # Your agent execution logic - ... pass - >>> - >>> if manager.should_flush_memory(current_tokens=100000): - ... await manager.execute_memory_flush( - ... agent_executor=run_agent, - ... current_tokens=100000 - ... ) + True if content was written """ - success = await self.flush_manager.execute_flush( - agent_executor=agent_executor, - current_tokens=current_tokens, + success = self.flush_manager.flush_from_messages( + messages=messages, user_id=user_id, - **executor_kwargs + reason=reason, + max_messages=max_messages, ) - if success: - # Mark dirty so next search will sync the new memories self._dirty = True - return success def build_memory_guidance(self, lang: str = "zh", include_context: bool = True) -> str: @@ -460,10 +417,12 @@ class MemoryManager: **背景知识**: 下方包含核心长期记忆,可直接使用。需要查找历史时,用 memory_search 搜索(搜索一次即可,不要重复)。 -**存储记忆**: 当用户分享重要信息时(偏好、决策、事实等),主动用 write 工具存储: -- 长期信息 → MEMORY.md +**主动存储**: 遇到以下情况时,主动用 edit/write 工具存储(无需告知用户): +- 用户要求记住的信息、个人偏好、重要决策 +- 对话中产生的重要结论、方案、约定 +- 完成复杂任务后的关键步骤和结果 +- 长期信息 → MEMORY.md(保持精简) - 当天笔记 → memory/{today_file} -- 静默存储,仅在明确要求时确认 **使用原则**: 自然使用记忆,就像你本来就知道。不需要生硬地提起或列举记忆,除非用户提到。""" else: @@ -471,10 +430,12 @@ class MemoryManager: **Background Knowledge**: Core long-term memories below - use directly. For history, use memory_search once (don't repeat). -**Store Memories**: When user shares important info (preferences, decisions, facts), proactively write: -- Durable info → MEMORY.md +**Proactive Storage**: Store memories silently when: +- User asks to remember something, shares preferences or decisions +- Important conclusions, plans, or agreements emerge in conversation +- Complex tasks are completed (record key steps and results) +- Durable info → MEMORY.md (keep concise) - Daily notes → memory/{today_file} -- Store silently; confirm only when explicitly requested **Usage**: Use memories naturally as if you always knew. Don't mention or list unless user explicitly asks.""" @@ -490,10 +451,10 @@ class MemoryManager: """ Load bootstrap memory files for session start - Following clawdbot's design: - - Only loads MEMORY.md from workspace root (long-term curated memory) - - Daily files (memory/YYYY-MM-DD.md) are accessed via memory_search tool, not bootstrap - - User-specific MEMORY.md is also loaded if user_id provided + Loads: + 1. MEMORY.md from workspace root (long-term curated memory) + 2. User-specific MEMORY.md if user_id provided + 3. Recent daily memory files (today + yesterday) for continuity Returns memory content WITHOUT obvious headers so it blends naturally into the context as background knowledge. @@ -502,7 +463,7 @@ class MemoryManager: user_id: Optional user ID for user-specific memories Returns: - Memory content to inject into system prompt (blends naturally as background context) + Memory content to inject into system prompt """ workspace_dir = self.config.get_workspace() memory_dir = self.config.get_memory_dir() @@ -510,7 +471,6 @@ class MemoryManager: sections = [] # 1. Load MEMORY.md from workspace root (long-term curated memory) - # Following clawdbot: only MEMORY.md is bootstrap, daily files use memory_search memory_file = Path(workspace_dir) / "MEMORY.md" if memory_file.exists(): try: @@ -518,7 +478,8 @@ class MemoryManager: if content: sections.append(content) except Exception as e: - print(f"Warning: Failed to read MEMORY.md: {e}") + from common.log import logger + logger.warning(f"[MemoryManager] Failed to read MEMORY.md: {e}") # 2. Load user-specific MEMORY.md if user_id provided if user_id: @@ -530,15 +491,80 @@ class MemoryManager: if content: sections.append(content) except Exception as e: - print(f"Warning: Failed to read user memory: {e}") + from common.log import logger + logger.warning(f"[MemoryManager] Failed to read user memory: {e}") + + # 3. Load recent daily memory files (today + yesterday) for context continuity + recent_daily = self._load_recent_daily_memories( + memory_dir, user_id, days=2, max_tokens=2000 + ) + if recent_daily: + sections.append(recent_daily) if not sections: return "" - # Join sections without obvious headers - let memories blend naturally - # This makes the agent feel like it "just knows" rather than "checking memory files" return "\n\n".join(sections) + def _load_recent_daily_memories( + self, + memory_dir: Path, + user_id: Optional[str], + days: int = 2, + max_tokens: int = 2000 + ) -> str: + """ + Load recent daily memory files for bootstrap context. + Loads the most recent N days that have non-empty content. + + Args: + memory_dir: Memory directory path + user_id: Optional user ID + days: Number of recent days to load + max_tokens: Approximate max tokens to include (rough char estimate) + """ + from common.log import logger + + daily_sections = [] + total_chars = 0 + max_chars = max_tokens * 4 # rough token-to-char ratio + + for i in range(days): + date = (datetime.now() - timedelta(days=i)).strftime("%Y-%m-%d") + + # Check user-specific daily file first, then shared + candidates = [] + if user_id: + candidates.append(memory_dir / "users" / user_id / f"{date}.md") + candidates.append(memory_dir / f"{date}.md") + + for daily_file in candidates: + if not daily_file.exists(): + continue + try: + content = daily_file.read_text(encoding='utf-8').strip() + if not content or len(content) < 30: + continue + + # Truncate if adding this would exceed limit + remaining = max_chars - total_chars + if remaining <= 0: + break + if len(content) > remaining: + content = content[:remaining] + "\n...(truncated)" + + label = "Today" if i == 0 else "Yesterday" if i == 1 else date + daily_sections.append(f"### {label} ({date})\n{content}") + total_chars += len(content) + break # only load one file per date (user-specific takes priority) + except Exception as e: + logger.warning(f"[MemoryManager] Failed to read daily memory {daily_file}: {e}") + + if not daily_sections: + return "" + + return "### Recent Activity\n\n" + "\n\n".join(daily_sections) + def get_status(self) -> Dict[str, Any]: """Get memory status""" stats = self.storage.get_stats() @@ -568,6 +594,37 @@ class MemoryManager: content = f"{path}:{start_line}:{end_line}" return hashlib.md5(content.encode('utf-8')).hexdigest() + @staticmethod + def _compute_temporal_decay(path: str, half_life_days: float = 30.0) -> float: + """ + Compute temporal decay multiplier for dated memory files. + + Inspired by OpenClaw's temporal-decay: exponential decay based on file date. + MEMORY.md and non-dated files are "evergreen" (no decay, multiplier=1.0). + Daily files like memory/2025-03-01.md decay based on age. + + Formula: multiplier = exp(-ln2/half_life * age_in_days) + """ + import re + import math + + match = re.search(r'(\d{4})-(\d{2})-(\d{2})\.md$', path) + if not match: + return 1.0 # evergreen: MEMORY.md, non-dated files + + try: + file_date = datetime( + int(match.group(1)), int(match.group(2)), int(match.group(3)) + ) + age_days = (datetime.now() - file_date).days + if age_days <= 0: + return 1.0 + + decay_lambda = math.log(2) / half_life_days + return math.exp(-decay_lambda * age_days) + except (ValueError, OverflowError): + return 1.0 + def _merge_results( self, vector_results: List[SearchResult], @@ -575,8 +632,7 @@ class MemoryManager: vector_weight: float, keyword_weight: float ) -> List[SearchResult]: - """Merge vector and keyword search results""" - # Create a map by (path, start_line, end_line) + """Merge vector and keyword search results with temporal decay for dated files""" merged_map = {} for result in vector_results: @@ -598,7 +654,6 @@ class MemoryManager: 'keyword_score': result.score } - # Calculate combined scores merged_results = [] for entry in merged_map.values(): combined_score = ( @@ -606,7 +661,11 @@ class MemoryManager: keyword_weight * entry['keyword_score'] ) + # Apply temporal decay for dated memory files result = entry['result'] + decay = self._compute_temporal_decay(result.path) + combined_score *= decay + merged_results.append(SearchResult( path=result.path, start_line=result.start_line, @@ -617,6 +676,5 @@ class MemoryManager: user_id=result.user_id )) - # Sort by score merged_results.sort(key=lambda r: r.score, reverse=True) return merged_results diff --git a/agent/memory/summarizer.py b/agent/memory/summarizer.py index 46b2b59..20900fb 100644 --- a/agent/memory/summarizer.py +++ b/agent/memory/summarizer.py @@ -1,225 +1,324 @@ """ Memory flush manager -Triggers memory flush before context compaction (similar to clawdbot) +Handles memory persistence when conversation context is trimmed or overflows: +- Uses LLM to summarize discarded messages into concise key-information entries +- Writes to daily memory files (lazy creation) +- Deduplicates trim flushes to avoid repeated writes +- Runs summarization asynchronously to avoid blocking normal replies +- Provides daily summary interface for scheduler """ -from typing import Optional, Callable, Any +import threading +from typing import Optional, Callable, Any, List, Dict from pathlib import Path from datetime import datetime +from common.log import logger + + +SUMMARIZE_SYSTEM_PROMPT = """你是一个记忆提取助手。你的任务是从对话记录中提取值得记住的信息,生成简洁的记忆摘要。 + +输出要求: +1. 以事件/关键信息为维度记录,每条一行,用 "- " 开头 +2. 记录有价值的关键信息,例如用户提出的要求及助手的解决方案,对话中涉及的事实信息,用户的偏好、决策或重要结论 +3. 每条摘要需要简明扼要,只保留关键信息 +4. 直接输出摘要内容,不要加任何前缀说明 +5. 当对话没有任何记录价值例如只是简单问候,可回复"无\"""" + +SUMMARIZE_USER_PROMPT = """请从以下对话记录中提取关键信息,生成记忆摘要: + +{conversation}""" class MemoryFlushManager: """ - Manages memory flush operations before context compaction + Manages memory flush operations. - Similar to clawdbot's memory flush mechanism: - - Triggers when context approaches token limit - - Runs a silent agent turn to write memories to disk - - Uses memory/YYYY-MM-DD.md for daily notes - - Uses MEMORY.md (workspace root) for long-term curated memories + Flush is triggered by agent_stream in two scenarios: + 1. Context trim: _trim_messages discards old turns → flush discarded content + 2. Context overflow: API rejects request → emergency flush before clearing + + Additionally, create_daily_summary() can be called by scheduler for end-of-day summaries. """ def __init__( self, workspace_dir: Path, - llm_model: Optional[Any] = None + llm_model: Optional[Any] = None, ): - """ - Initialize memory flush manager - - Args: - workspace_dir: Workspace directory - llm_model: LLM model for agent execution (optional) - """ self.workspace_dir = workspace_dir self.llm_model = llm_model self.memory_dir = workspace_dir / "memory" self.memory_dir.mkdir(parents=True, exist_ok=True) - # Tracking - self.last_flush_token_count: Optional[int] = None self.last_flush_timestamp: Optional[datetime] = None - self.turn_count: int = 0 # 对话轮数计数器 + self._trim_flushed_hashes: set = set() # Content hashes of already-flushed messages + self._last_flushed_content_hash: str = "" # Content hash at last flush, for daily dedup - def should_flush( - self, - current_tokens: int = 0, - token_threshold: int = 50000, - turn_threshold: int = 20 - ) -> bool: - """ - Determine if memory flush should be triggered - - 独立的 flush 触发机制,不依赖模型 context window: - - Token 阈值: 达到 50K tokens 时触发 - - 轮次阈值: 达到 20 轮对话时触发 - - Args: - current_tokens: Current session token count - token_threshold: Token threshold to trigger flush (default: 50K) - turn_threshold: Turn threshold to trigger flush (default: 20) - - Returns: - True if flush should run - """ - # 检查 token 阈值 - if current_tokens > 0 and current_tokens >= token_threshold: - # 避免重复 flush - if self.last_flush_token_count is not None: - if current_tokens <= self.last_flush_token_count + 5000: - return False - return True - - # 检查轮次阈值 - if self.turn_count >= turn_threshold: - return True - - return False - - def get_today_memory_file(self, user_id: Optional[str] = None) -> Path: - """ - Get today's memory file path: memory/YYYY-MM-DD.md - - Args: - user_id: Optional user ID for user-specific memory - - Returns: - Path to today's memory file - """ + def get_today_memory_file(self, user_id: Optional[str] = None, ensure_exists: bool = False) -> Path: + """Get today's memory file path: memory/YYYY-MM-DD.md""" today = datetime.now().strftime("%Y-%m-%d") if user_id: user_dir = self.memory_dir / "users" / user_id - user_dir.mkdir(parents=True, exist_ok=True) - return user_dir / f"{today}.md" + if ensure_exists: + user_dir.mkdir(parents=True, exist_ok=True) + today_file = user_dir / f"{today}.md" else: - return self.memory_dir / f"{today}.md" + today_file = self.memory_dir / f"{today}.md" + + if ensure_exists and not today_file.exists(): + today_file.parent.mkdir(parents=True, exist_ok=True) + today_file.write_text(f"# Daily Memory: {today}\n\n") + + return today_file def get_main_memory_file(self, user_id: Optional[str] = None) -> Path: - """ - Get main memory file path: MEMORY.md (workspace root) - - Args: - user_id: Optional user ID for user-specific memory - - Returns: - Path to main memory file - """ + """Get main memory file path: MEMORY.md (workspace root)""" if user_id: user_dir = self.memory_dir / "users" / user_id user_dir.mkdir(parents=True, exist_ok=True) return user_dir / "MEMORY.md" else: - # Return workspace root MEMORY.md return Path(self.workspace_dir) / "MEMORY.md" - def create_flush_prompt(self) -> str: - """ - Create prompt for memory flush turn - - Similar to clawdbot's DEFAULT_MEMORY_FLUSH_PROMPT - """ - today = datetime.now().strftime("%Y-%m-%d") - return ( - f"Pre-compaction memory flush. " - f"Store durable memories now (use memory/{today}.md for daily notes; " - f"create memory/ if needed). " - f"\n\n" - f"重要提示:\n" - f"- MEMORY.md: 记录最核心、最常用的信息(例如重要规则、偏好、决策、要求等)\n" - f" 如果 MEMORY.md 过长,可以精简或移除不再重要的内容。避免冗长描述,用关键词和要点形式记录\n" - f"- memory/{today}.md: 记录当天发生的事件、关键信息、经验教训、对话过程摘要等,突出重点\n" - f"- 如果没有重要内容需要记录,回复 NO_REPLY\n" - ) - - def create_flush_system_prompt(self) -> str: - """ - Create system prompt for memory flush turn - - Similar to clawdbot's DEFAULT_MEMORY_FLUSH_SYSTEM_PROMPT - """ - return ( - "Pre-compaction memory flush turn. " - "The session is near auto-compaction; capture durable memories to disk. " - "\n\n" - "记忆写入原则:\n" - "1. MEMORY.md 精简原则: 只记录核心信息(<2000 tokens)\n" - " - 记录重要规则、偏好、决策、要求等需要长期记住的关键信息,无需记录过多细节\n" - " - 如果 MEMORY.md 过长,可以根据需要精简或删除过时内容\n" - "\n" - "2. 天级记忆 (memory/YYYY-MM-DD.md):\n" - " - 记录当天的重要事件、关键信息、经验教训、对话过程摘要等,确保核心信息点被完整记录\n" - "\n" - "3. 判断标准:\n" - " - 这个信息未来会经常用到吗?→ MEMORY.md\n" - " - 这是今天的重要事件或决策吗?→ memory/YYYY-MM-DD.md\n" - " - 这是临时性的、不重要的内容吗?→ 不记录\n" - "\n" - "You may reply, but usually NO_REPLY is correct." - ) - - async def execute_flush( - self, - agent_executor: Callable, - current_tokens: int, - user_id: Optional[str] = None, - **executor_kwargs - ) -> bool: - """ - Execute memory flush by running a silent agent turn - - Args: - agent_executor: Function to execute agent with prompt - current_tokens: Current token count - user_id: Optional user ID - **executor_kwargs: Additional kwargs for agent executor - - Returns: - True if flush completed successfully - """ - try: - # Create flush prompts - prompt = self.create_flush_prompt() - system_prompt = self.create_flush_system_prompt() - - # Execute agent turn (silent, no user-visible reply expected) - await agent_executor( - prompt=prompt, - system_prompt=system_prompt, - silent=True, # NO_REPLY expected - **executor_kwargs - ) - - # Track flush - self.last_flush_token_count = current_tokens - self.last_flush_timestamp = datetime.now() - self.turn_count = 0 # 重置轮数计数器 - - return True - - except Exception as e: - print(f"Memory flush failed: {e}") - return False - - def increment_turn(self): - """增加对话轮数计数""" - self.turn_count += 1 - def get_status(self) -> dict: - """Get memory flush status""" return { - 'last_flush_tokens': self.last_flush_token_count, 'last_flush_time': self.last_flush_timestamp.isoformat() if self.last_flush_timestamp else None, 'today_file': str(self.get_today_memory_file()), 'main_file': str(self.get_main_memory_file()) } + # ---- Flush execution (called by agent_stream or scheduler) ---- + + def flush_from_messages( + self, + messages: List[Dict], + user_id: Optional[str] = None, + reason: str = "trim", + max_messages: int = 0, + ) -> bool: + """ + Asynchronously summarize and flush messages to daily memory. + + Deduplication runs synchronously, then LLM summarization + file write + run in a background thread so the main reply flow is never blocked. + + Args: + messages: Conversation message list (OpenAI/Claude format) + user_id: Optional user ID for user-scoped memory + reason: Why flush was triggered ("trim" | "overflow" | "daily_summary") + max_messages: Max recent messages to summarize (0 = all) + + Returns: + True if flush was dispatched + """ + try: + import hashlib + deduped = [] + for m in messages: + text = self._extract_text_from_content(m.get("content", "")) + if not text or not text.strip(): + continue + h = hashlib.md5(text.encode("utf-8")).hexdigest() + if h not in self._trim_flushed_hashes: + self._trim_flushed_hashes.add(h) + deduped.append(m) + if not deduped: + return False + + import copy + snapshot = copy.deepcopy(deduped) + thread = threading.Thread( + target=self._flush_worker, + args=(snapshot, user_id, reason, max_messages), + daemon=True, + ) + thread.start() + logger.info(f"[MemoryFlush] Async flush dispatched (reason={reason}, msgs={len(snapshot)})") + return True + + except Exception as e: + logger.warning(f"[MemoryFlush] Failed to dispatch flush (reason={reason}): {e}") + return False + + def _flush_worker( + self, + messages: List[Dict], + user_id: Optional[str], + reason: str, + max_messages: int, + ): + """Background worker: summarize with LLM and write to daily file.""" + try: + summary = self._summarize_messages(messages, max_messages) + if not summary or not summary.strip() or summary.strip() == "无": + logger.info(f"[MemoryFlush] No valuable content to flush (reason={reason})") + return + + daily_file = ensure_daily_memory_file(self.workspace_dir, user_id) + + if reason == "overflow": + header = f"## Context Overflow Recovery ({datetime.now().strftime('%H:%M')})" + note = "The following conversation was trimmed due to context overflow:\n" + elif reason == "trim": + header = f"## Trimmed Context ({datetime.now().strftime('%H:%M')})" + note = "" + elif reason == "daily_summary": + header = f"## Daily Summary ({datetime.now().strftime('%H:%M')})" + note = "" + else: + header = f"## Session Notes ({datetime.now().strftime('%H:%M')})" + note = "" + + flush_entry = f"\n{header}\n\n{note}{summary}\n" + + with open(daily_file, "a", encoding="utf-8") as f: + f.write(flush_entry) + + self.last_flush_timestamp = datetime.now() + + logger.info(f"[MemoryFlush] Wrote to {daily_file.name} (reason={reason}, chars={len(summary)})") + + except Exception as e: + logger.warning(f"[MemoryFlush] Async flush failed (reason={reason}): {e}") + + def create_daily_summary( + self, + messages: List[Dict], + user_id: Optional[str] = None + ) -> bool: + """ + Generate end-of-day summary. Called by daily timer. + Skips if messages haven't changed since last flush. + """ + import hashlib + content = "".join( + self._extract_text_from_content(m.get("content", "")) + for m in messages + ) + content_hash = hashlib.md5(content.encode("utf-8")).hexdigest() + if content_hash == self._last_flushed_content_hash: + logger.debug("[MemoryFlush] Daily summary skipped: no new content since last flush") + return False + self._last_flushed_content_hash = content_hash + return self.flush_from_messages( + messages=messages, + user_id=user_id, + reason="daily_summary", + max_messages=0, + ) + + # ---- Internal helpers ---- + + def _summarize_messages(self, messages: List[Dict], max_messages: int = 0) -> str: + """ + Summarize conversation messages using LLM, with rule-based fallback. + """ + conversation_text = self._format_conversation_for_summary(messages, max_messages) + if not conversation_text.strip(): + return "" + + # Try LLM summarization first + if self.llm_model: + try: + summary = self._call_llm_for_summary(conversation_text) + if summary and summary.strip() and summary.strip() != "无": + return summary.strip() + except Exception as e: + logger.warning(f"[MemoryFlush] LLM summarization failed, using fallback: {e}") + + return self._extract_summary_fallback(messages, max_messages) + + def _format_conversation_for_summary(self, messages: List[Dict], max_messages: int = 0) -> str: + """Format messages into readable conversation text for LLM summarization.""" + msgs = messages if max_messages == 0 else messages[-max_messages * 2:] + lines = [] + for msg in msgs: + role = msg.get("role", "") + text = self._extract_text_from_content(msg.get("content", "")) + if not text or not text.strip(): + continue + text = text.strip() + if role == "user": + lines.append(f"用户: {text[:500]}") + elif role == "assistant": + lines.append(f"助手: {text[:500]}") + return "\n".join(lines) + + def _call_llm_for_summary(self, conversation_text: str) -> str: + """Call LLM to generate a concise summary of the conversation.""" + from agent.protocol.models import LLMRequest + + request = LLMRequest( + messages=[{"role": "user", "content": SUMMARIZE_USER_PROMPT.format(conversation=conversation_text)}], + temperature=0, + max_tokens=500, + stream=False, + system=SUMMARIZE_SYSTEM_PROMPT, + ) + + response = self.llm_model.call(request) + + if isinstance(response, dict): + if response.get("error"): + raise RuntimeError(response.get("message", "LLM call failed")) + # OpenAI format + choices = response.get("choices", []) + if choices: + return choices[0].get("message", {}).get("content", "") + + # Handle response object with attribute access (e.g. OpenAI SDK response) + if hasattr(response, "choices") and response.choices: + return response.choices[0].message.content or "" + + return "" + + @staticmethod + def _extract_summary_fallback(messages: List[Dict], max_messages: int = 0) -> str: + """Rule-based fallback when LLM is unavailable.""" + msgs = messages if max_messages == 0 else messages[-max_messages * 2:] + + items = [] + for msg in msgs: + role = msg.get("role", "") + text = MemoryFlushManager._extract_text_from_content(msg.get("content", "")) + if not text or not text.strip(): + continue + text = text.strip() + + if role == "user": + if len(text) <= 5: + continue + items.append(f"- 用户请求: {text[:200]}") + elif role == "assistant": + first_line = text.split("\n")[0].strip() + if len(first_line) > 10: + items.append(f"- 处理结果: {first_line[:200]}") + + return "\n".join(items[:15]) + + @staticmethod + def _extract_text_from_content(content) -> str: + """Extract plain text from message content (string or content blocks).""" + if isinstance(content, str): + return content + if isinstance(content, list): + parts = [] + for block in content: + if isinstance(block, dict) and block.get("type") == "text": + parts.append(block.get("text", "")) + elif isinstance(block, str): + parts.append(block) + return "\n".join(parts) + return "" + def create_memory_files_if_needed(workspace_dir: Path, user_id: Optional[str] = None): """ - Create default memory files if they don't exist + Create essential memory files if they don't exist. + Only creates MEMORY.md; daily files are created lazily on first write. Args: workspace_dir: Workspace directory @@ -228,7 +327,7 @@ def create_memory_files_if_needed(workspace_dir: Path, user_id: Optional[str] = memory_dir = workspace_dir / "memory" memory_dir.mkdir(parents=True, exist_ok=True) - # Create main MEMORY.md in workspace root + # Create main MEMORY.md in workspace root (always needed for bootstrap) if user_id: user_dir = memory_dir / "users" / user_id user_dir.mkdir(parents=True, exist_ok=True) @@ -237,14 +336,28 @@ def create_memory_files_if_needed(workspace_dir: Path, user_id: Optional[str] = main_memory = Path(workspace_dir) / "MEMORY.md" if not main_memory.exists(): - # Create empty file or with minimal structure (no obvious "Memory" header) - # Following clawdbot's approach: memories should blend naturally into context main_memory.write_text("") + + +def ensure_daily_memory_file(workspace_dir: Path, user_id: Optional[str] = None) -> Path: + """ + Ensure today's daily memory file exists, creating it only when actually needed. + Called lazily before first write to daily memory. + + Args: + workspace_dir: Workspace directory + user_id: Optional user ID for user-specific files + + Returns: + Path to today's memory file + """ + memory_dir = workspace_dir / "memory" + memory_dir.mkdir(parents=True, exist_ok=True) - # Create today's memory file today = datetime.now().strftime("%Y-%m-%d") if user_id: user_dir = memory_dir / "users" / user_id + user_dir.mkdir(parents=True, exist_ok=True) today_memory = user_dir / f"{today}.md" else: today_memory = memory_dir / f"{today}.md" @@ -252,5 +365,6 @@ def create_memory_files_if_needed(workspace_dir: Path, user_id: Optional[str] = if not today_memory.exists(): today_memory.write_text( f"# Daily Memory: {today}\n\n" - f"Day-to-day notes and running context.\n\n" ) + + return today_memory diff --git a/agent/prompt/builder.py b/agent/prompt/builder.py index b618f9d..1523ef0 100644 --- a/agent/prompt/builder.py +++ b/agent/prompt/builder.py @@ -280,9 +280,14 @@ def _build_memory_section(memory_manager: Any, tools: Optional[List[Any]], langu if not has_memory_tools: return [] + from datetime import datetime + today_file = datetime.now().strftime("%Y-%m-%d") + ".md" + lines = [ "## 记忆系统", "", + "### 检索记忆", + "", "在回答关于以前的工作、决定、日期、人物、偏好或待办事项的任何问题之前:", "", "1. 不确定记忆文件位置 → 先用 `memory_search` 通过关键词和语义检索相关内容", @@ -290,13 +295,24 @@ def _build_memory_section(memory_manager: Any, tools: Optional[List[Any]], langu "3. search 无结果 → 尝试用 `memory_get` 读取MEMORY.md及最近两天记忆文件", "", "**记忆文件结构**:", - "- `MEMORY.md`: 长期记忆(核心信息、偏好、决策等)", - "- `memory/YYYY-MM-DD.md`: 每日记忆,记录当天的事件和对话信息", + f"- `MEMORY.md`: 长期记忆(核心信息、偏好、决策等)", + f"- `memory/YYYY-MM-DD.md`: 每日记忆,今天是 `memory/{today_file}`", "", - "**写入记忆**:", + "### 写入记忆", + "", + "**主动存储**:遇到以下情况时,应主动将信息写入记忆文件(无需告知用户):", + "", + "- 用户明确要求你记住某些信息", + "- 用户分享了重要的个人偏好、习惯、决策", + "- 对话中产生了重要的结论、方案、约定", + "- 完成了复杂任务,值得记录关键步骤和结果", + "- 发现了用户经常遇到的问题或解决方案", + "", + "**存储规则**:", + f"- 长期有效的核心信息 → `MEMORY.md`(文件保持精简,< 2000 tokens)", + f"- 当天的事件、进展、笔记 → `memory/{today_file}`", "- 追加内容 → `edit` 工具,oldText 留空", "- 修改内容 → `edit` 工具,oldText 填写要替换的文本", - "- 新建文件 → `write` 工具", "- **禁止写入敏感信息**:API密钥、令牌等敏感信息严禁写入记忆文件", "", "**使用原则**: 自然使用记忆,就像你本来就知道;不用刻意提起,除非用户问起。", @@ -369,7 +385,7 @@ def _build_workspace_section(workspace_dir: str, language: str, is_first_convers "", "**交流规范**:", "", - "- 在对话中,不要直接输出工作空间中的技术细节,特别是不要输出 AGENT.md、USER.md、MEMORY.md 等文件名称", + "- 在对话中,无需直接输出工作空间中的技术细节,例如 AGENT.md、USER.md、MEMORY.md 等文件名称", "- 例如用自然表达例如「我已记住」而不是「已更新 MEMORY.md」", "", ] diff --git a/agent/protocol/agent.py b/agent/protocol/agent.py index 7d1138b..85e1535 100644 --- a/agent/protocol/agent.py +++ b/agent/protocol/agent.py @@ -544,11 +544,15 @@ class Agent: logger.info("[Agent] Cleared Agent message history after executor recovery") raise - # Append only the NEW messages from this execution (thread-safe) - # This allows concurrent requests to both contribute to history + # Sync executor's messages back to agent (thread-safe). + # If the executor trimmed context, its message list is shorter than + # original_length, so we must replace rather than append. with self.messages_lock: - new_messages = executor.messages[original_length:] - self.messages.extend(new_messages) + self.messages = list(executor.messages) + # Track messages added in this run (user query + all assistant/tool messages) + # original_length may exceed executor.messages length after trimming + trim_adjusted_start = min(original_length, len(executor.messages)) + self._last_run_new_messages = list(executor.messages[trim_adjusted_start:]) # Store executor reference for agent_bridge to access files_to_send self.stream_executor = executor diff --git a/agent/protocol/agent_stream.py b/agent/protocol/agent_stream.py index 112260e..0d1a2d0 100644 --- a/agent/protocol/agent_stream.py +++ b/agent/protocol/agent_stream.py @@ -201,26 +201,6 @@ class AgentStreamExecutor: logger.info(f"[Agent] 第 {turn} 轮") self._emit_event("turn_start", {"turn": turn}) - # Check if memory flush is needed (before calling LLM) - # 使用独立的 flush 阈值(50K tokens 或 20 轮) - if self.agent.memory_manager and hasattr(self.agent, 'last_usage'): - usage = self.agent.last_usage - if usage and 'input_tokens' in usage: - current_tokens = usage.get('input_tokens', 0) - - if self.agent.memory_manager.should_flush_memory( - current_tokens=current_tokens - ): - self._emit_event("memory_flush_start", { - "current_tokens": current_tokens, - "turn_count": self.agent.memory_manager.flush_manager.turn_count - }) - - # TODO: Execute memory flush in background - # This would require async support - logger.info( - f"Memory flush recommended: tokens={current_tokens}, turns={self.agent.memory_manager.flush_manager.turn_count}") - # Call LLM (enable retry_on_empty for better reliability) assistant_msg, tool_calls = self._call_llm_stream(retry_on_empty=True) final_response = assistant_msg @@ -473,10 +453,6 @@ class AgentStreamExecutor: logger.info(f"[Agent] 🏁 完成 ({turn}轮)") self._emit_event("agent_end", {"final_response": final_response}) - # 每轮对话结束后增加计数(用户消息+AI回复=1轮) - if self.agent.memory_manager: - self.agent.memory_manager.increment_turn() - return final_response def _call_llm_stream(self, retry_on_empty=True, retry_count=0, max_retries=3, @@ -501,7 +477,8 @@ class AgentStreamExecutor: # Prepare messages messages = self._prepare_messages() - logger.info(f"Sending {len(messages)} messages to LLM") + turns = self._identify_complete_turns() + logger.info(f"Sending {len(messages)} messages ({len(turns)} turns) to LLM") # Prepare tool definitions (OpenAI/Claude format) tools_schema = None @@ -655,6 +632,14 @@ class AgentStreamExecutor: error_type = "context overflow" if is_context_overflow else "message format error" logger.error(f"💥 {error_type} detected: {e}") + # Flush memory before trimming to preserve context that will be lost + if is_context_overflow and self.agent.memory_manager: + user_id = getattr(self.agent, '_current_user_id', None) + self.agent.memory_manager.flush_memory( + messages=self.messages, user_id=user_id, + reason="overflow", max_messages=0 + ) + # Strategy: try aggressive trimming first, only clear as last resort if is_context_overflow and not _overflow_retry: trimmed = self._aggressive_trim_for_overflow() @@ -1204,14 +1189,28 @@ class AgentStreamExecutor: if not turns: return - # Step 2: 轮次限制 - 保留最近 N 轮 + # Step 2: 轮次限制 - 超出时裁到 max_turns/2,批量 flush 被裁的轮次 if len(turns) > self.max_context_turns: - removed_turns = len(turns) - self.max_context_turns - turns = turns[-self.max_context_turns:] # 保留最近的轮次 + keep_count = max(1, self.max_context_turns // 2) + removed_count = len(turns) - keep_count + + # Flush discarded turns to daily memory + if self.agent.memory_manager: + discarded_messages = [] + for turn in turns[:removed_count]: + discarded_messages.extend(turn["messages"]) + if discarded_messages: + user_id = getattr(self.agent, '_current_user_id', None) + self.agent.memory_manager.flush_memory( + messages=discarded_messages, user_id=user_id, + reason="trim", max_messages=0 + ) + + turns = turns[-keep_count:] logger.info( - f"💾 上下文轮次超限: {len(turns) + removed_turns} > {self.max_context_turns}," - f"移除最早的 {removed_turns} 轮完整对话" + f"💾 上下文轮次超限: {keep_count + removed_count} > {self.max_context_turns}," + f"裁剪至 {keep_count} 轮(移除 {removed_count} 轮)" ) # Step 3: Token 限制 - 保留完整轮次 @@ -1248,56 +1247,41 @@ class AgentStreamExecutor: logger.info(f" 重建消息列表: {old_count} -> {len(self.messages)} 条消息") return - # Token limit exceeded - keep complete turns from newest + # Token limit exceeded - keep the latest half of turns (same strategy as turn limit) + keep_count = max(1, len(turns) // 2) + removed_count = len(turns) - keep_count + kept_turns = turns[-keep_count:] + kept_tokens = sum(self._estimate_turn_tokens(t) for t in kept_turns) + logger.info( f"🔄 上下文tokens超限: ~{current_tokens + system_tokens} > {max_tokens}," - f"将按完整轮次移除最早的对话" + f"裁剪至 {keep_count} 轮(移除 {removed_count} 轮)" ) - # 从最新轮次开始,反向累加(保持完整轮次) - kept_turns = [] - accumulated_tokens = 0 - min_turns = 3 # 尽量保留至少 3 轮,但不强制(避免超出 token 限制) + # Flush discarded turns to daily memory + if self.agent.memory_manager: + discarded_messages = [] + for turn in turns[:removed_count]: + discarded_messages.extend(turn["messages"]) + if discarded_messages: + user_id = getattr(self.agent, '_current_user_id', None) + self.agent.memory_manager.flush_memory( + messages=discarded_messages, user_id=user_id, + reason="trim", max_messages=0 + ) - for i, turn in enumerate(reversed(turns)): - turn_tokens = self._estimate_turn_tokens(turn) - turns_from_end = i + 1 - - # 检查是否超出限制 - if accumulated_tokens + turn_tokens <= available_tokens: - kept_turns.insert(0, turn) - accumulated_tokens += turn_tokens - else: - # 超出限制 - # 如果还没有保留足够的轮次,且这是最后的机会,尝试保留 - if len(kept_turns) < min_turns and turns_from_end <= min_turns: - # 检查是否严重超出(超出 20% 以上则放弃) - overflow_ratio = (accumulated_tokens + turn_tokens - available_tokens) / available_tokens - if overflow_ratio < 0.2: # 允许最多超出 20% - kept_turns.insert(0, turn) - accumulated_tokens += turn_tokens - logger.debug(f" 为保留最少轮次,允许超出 {overflow_ratio*100:.1f}%") - continue - # 停止保留更早的轮次 - break - - # 重建消息列表 new_messages = [] for turn in kept_turns: new_messages.extend(turn['messages']) old_count = len(self.messages) - old_turn_count = len(turns) self.messages = new_messages - new_count = len(self.messages) - new_turn_count = len(kept_turns) - if old_count > new_count: - logger.info( - f" 移除了 {old_turn_count - new_turn_count} 轮对话 " - f"({old_count} -> {new_count} 条消息," - f"~{current_tokens + system_tokens} -> ~{accumulated_tokens + system_tokens} tokens)" - ) + logger.info( + f" 移除了 {removed_count} 轮对话 " + f"({old_count} -> {len(self.messages)} 条消息," + f"~{current_tokens + system_tokens} -> ~{kept_tokens + system_tokens} tokens)" + ) def _clear_session_db(self): """ diff --git a/bridge/agent_bridge.py b/bridge/agent_bridge.py index a233852..cf27326 100644 --- a/bridge/agent_bridge.py +++ b/bridge/agent_bridge.py @@ -375,10 +375,6 @@ class AgentBridge: # Store session_id on agent so executor can clear DB on fatal errors agent._current_session_id = session_id - # Record message count before execution so we can diff new messages - with agent.messages_lock: - pre_run_len = len(agent.messages) - try: # Use agent's run_stream method with event handler response = agent.run_stream( @@ -397,19 +393,19 @@ class AgentBridge: # Persist new messages generated during this run if session_id: channel_type = (context.get("channel_type") or "") if context else "" - with agent.messages_lock: - new_messages = agent.messages[pre_run_len:] + new_messages = getattr(agent, '_last_run_new_messages', []) if new_messages: self._persist_messages(session_id, list(new_messages), channel_type) - elif pre_run_len > 0 and len(agent.messages) == 0: - # Agent cleared its messages (recovery from format error / overflow) - # Also clear the DB to prevent reloading dirty data - try: - from agent.memory import get_conversation_store - get_conversation_store().clear_session(session_id) - logger.info(f"[AgentBridge] Cleared DB for recovered session: {session_id}") - except Exception as e: - logger.warning(f"[AgentBridge] Failed to clear DB after recovery: {e}") + else: + with agent.messages_lock: + msg_count = len(agent.messages) + if msg_count == 0: + try: + from agent.memory import get_conversation_store + get_conversation_store().clear_session(session_id) + logger.info(f"[AgentBridge] Cleared DB for recovered session: {session_id}") + except Exception as e: + logger.warning(f"[AgentBridge] Failed to clear DB after recovery: {e}") # Check if there are files to send (from read tool) if hasattr(agent, 'stream_executor') and hasattr(agent.stream_executor, 'files_to_send'): diff --git a/bridge/agent_initializer.py b/bridge/agent_initializer.py index 32ff84c..b866f20 100644 --- a/bridge/agent_initializer.py +++ b/bridge/agent_initializer.py @@ -115,14 +115,19 @@ class AgentInitializer: runtime_info=runtime_info # Pass runtime_info for dynamic time updates ) - # Attach memory manager + # Attach memory manager and share LLM model for summarization if memory_manager: agent.memory_manager = memory_manager + if hasattr(agent, 'model') and agent.model: + memory_manager.flush_manager.llm_model = agent.model # Restore persisted conversation history for this session if session_id: self._restore_conversation_history(agent, session_id) + # Start daily memory flush timer (once, on first agent init regardless of session) + self._start_daily_flush_timer() + return agent def _restore_conversation_history(self, agent, session_id: str) -> None: @@ -514,3 +519,59 @@ class AgentInitializer: logger.info(f"[AgentInitializer] Migrated {len(keys_to_migrate)} API keys to .env: {list(keys_to_migrate.keys())}") except Exception as e: logger.warning(f"[AgentInitializer] Failed to migrate API keys: {e}") + + def _start_daily_flush_timer(self): + """Start a background thread that flushes all agents' memory daily at 23:55.""" + if getattr(self.agent_bridge, '_daily_flush_started', False): + return + self.agent_bridge._daily_flush_started = True + + import threading + + def _daily_flush_loop(): + while True: + try: + now = datetime.datetime.now() + target = now.replace(hour=23, minute=55, second=0, microsecond=0) + if target <= now: + target += datetime.timedelta(days=1) + wait_seconds = (target - now).total_seconds() + logger.info(f"[DailyFlush] Next flush at {target.strftime('%Y-%m-%d %H:%M')} (in {wait_seconds/3600:.1f}h)") + time.sleep(wait_seconds) + + self._flush_all_agents() + except Exception as e: + logger.warning(f"[DailyFlush] Error in daily flush loop: {e}") + time.sleep(3600) + + t = threading.Thread(target=_daily_flush_loop, daemon=True) + t.start() + + def _flush_all_agents(self): + """Flush memory for all active agent sessions.""" + agents = [] + if self.agent_bridge.default_agent: + agents.append(("default", self.agent_bridge.default_agent)) + for sid, agent in self.agent_bridge.agents.items(): + agents.append((sid, agent)) + + if not agents: + return + + flushed = 0 + for label, agent in agents: + try: + if not agent.memory_manager: + continue + with agent.messages_lock: + messages = list(agent.messages) + if not messages: + continue + result = agent.memory_manager.flush_manager.create_daily_summary(messages) + if result: + flushed += 1 + except Exception as e: + logger.warning(f"[DailyFlush] Failed for session {label}: {e}") + + if flushed: + logger.info(f"[DailyFlush] Flushed {flushed}/{len(agents)} agent session(s)")