diff --git a/agent/protocol/agent_stream.py b/agent/protocol/agent_stream.py index 117d6f9..71ba384 100644 --- a/agent/protocol/agent_stream.py +++ b/agent/protocol/agent_stream.py @@ -912,22 +912,21 @@ class AgentStreamExecutor: def _validate_and_fix_messages(self): """ - Validate message history and fix incomplete tool_use/tool_result pairs. + Validate message history and fix broken tool_use/tool_result pairs. - All LLM APIs (OpenAI, Claude, Moonshot, DashScope) require: - 1. Each tool_use in an assistant message must have a matching tool_result - in the immediately following user message. - 2. Each tool_result in a user message must reference a tool_use_id that - exists in the preceding assistant message. - - This method performs a full scan and removes any messages that would - cause a 400 error due to broken tool_use/tool_result pairing. + Historical messages restored from DB are text-only (no tool calls), + so this method only needs to handle edge cases in the current session: + - Trailing assistant message with tool_use but no following tool_result + (e.g. process was interrupted mid-execution) + - Orphaned tool_result at the start of messages (e.g. after context + trimming removed the preceding assistant tool_use) """ if not self.messages: return - # Pass 1: remove trailing incomplete tool_use (assistant with tool_use - # but no following tool_result) + removed = 0 + + # Remove trailing incomplete tool_use assistant messages while self.messages: last_msg = self.messages[-1] if last_msg.get("role") == "assistant": @@ -938,99 +937,27 @@ class AgentStreamExecutor: ): logger.warning("⚠️ Removing trailing incomplete tool_use assistant message") self.messages.pop() + removed += 1 continue break - # Pass 2: full scan for orphaned tool_result and missing tool_result - removed = 0 - i = 0 - while i < len(self.messages): - msg = self.messages[i] - role = msg.get("role") - content = msg.get("content", []) - - if role == "assistant" and isinstance(content, list): - tool_use_ids = { - b.get("id") - for b in content - if isinstance(b, dict) and b.get("type") == "tool_use" and b.get("id") - } - if tool_use_ids: - # There must be a following user message with matching tool_results - next_idx = i + 1 - if next_idx >= len(self.messages): - # No following message at all — remove - logger.warning(f"⚠️ Removing assistant tool_use at index {i} (no following tool_result)") - self.messages.pop(i) - removed += 1 - continue - - next_msg = self.messages[next_idx] - next_content = next_msg.get("content", []) - if next_msg.get("role") != "user" or not isinstance(next_content, list): - # Next message is not a user message with tool_results - logger.warning(f"⚠️ Removing assistant tool_use at index {i} (next message is not tool_result)") - self.messages.pop(i) - removed += 1 - continue - - result_ids = { - b.get("tool_use_id") - for b in next_content - if isinstance(b, dict) and b.get("type") == "tool_result" - } - if not tool_use_ids.issubset(result_ids): - # Some tool_use ids have no matching result — remove both - logger.warning( - f"⚠️ Removing mismatched tool_use/result pair at index {i},{next_idx} " - f"(use_ids={tool_use_ids}, result_ids={result_ids})" - ) - self.messages.pop(next_idx) - self.messages.pop(i) - removed += 2 - continue - - elif role == "user" and isinstance(content, list): - has_tool_results = any( + # Remove leading orphaned tool_result user messages + while self.messages: + first_msg = self.messages[0] + if first_msg.get("role") == "user": + content = first_msg.get("content", []) + if isinstance(content, list) and any( isinstance(b, dict) and b.get("type") == "tool_result" for b in content - ) - if has_tool_results: - # Check that the preceding message is an assistant with matching tool_use - if i == 0: - logger.warning(f"⚠️ Removing orphaned tool_result at index {i} (no preceding assistant)") - self.messages.pop(i) - removed += 1 - continue - - prev_msg = self.messages[i - 1] - prev_content = prev_msg.get("content", []) - if prev_msg.get("role") != "assistant" or not isinstance(prev_content, list): - logger.warning(f"⚠️ Removing orphaned tool_result at index {i} (prev is not assistant)") - self.messages.pop(i) - removed += 1 - continue - - prev_use_ids = { - b.get("id") - for b in prev_content - if isinstance(b, dict) and b.get("type") == "tool_use" and b.get("id") - } - result_ids = { - b.get("tool_use_id") - for b in content - if isinstance(b, dict) and b.get("type") == "tool_result" - } - if not result_ids.issubset(prev_use_ids): - logger.warning( - f"⚠️ Removing orphaned tool_result at index {i} " - f"(result_ids={result_ids} not in prev use_ids={prev_use_ids})" - ) - self.messages.pop(i) - removed += 1 - continue - - i += 1 + ) and not any( + isinstance(b, dict) and b.get("type") == "text" + for b in content + ): + logger.warning("⚠️ Removing leading orphaned tool_result user message") + self.messages.pop(0) + removed += 1 + continue + break if removed > 0: logger.info(f"🔧 Message validation: removed {removed} broken message(s)") diff --git a/bridge/agent_initializer.py b/bridge/agent_initializer.py index 62df5c8..f4c8329 100644 --- a/bridge/agent_initializer.py +++ b/bridge/agent_initializer.py @@ -130,8 +130,14 @@ class AgentInitializer: Load persisted conversation messages from SQLite and inject them into the agent's in-memory message list. - Only runs when conversation persistence is enabled (default: True). - Respects agent_max_context_turns to limit how many turns are loaded. + Only user text and assistant text are restored. Tool call chains + (tool_use / tool_result) are stripped out because: + 1. They are intermediate process, the value is already in the final + assistant text reply. + 2. They consume massive context tokens (often 80%+ of history). + 3. Different models have incompatible tool message formats, so + restoring tool chains across model switches causes 400 errors. + 4. Eliminates the entire class of tool_use/tool_result pairing bugs. """ from config import conf if not conf().get("conversation_persistence", True): @@ -140,25 +146,86 @@ class AgentInitializer: try: from agent.memory import get_conversation_store store = get_conversation_store() - # On restore, load at most min(10, max_turns // 2) turns so that - # a long-running session does not immediately fill the context window - # after a restart. The full max_turns budget is reserved for the - # live conversation that follows. max_turns = conf().get("agent_max_context_turns", 30) - restore_turns = max(4, max_turns // 5) + restore_turns = max(6, max_turns // 3) saved = store.load_messages(session_id, max_turns=restore_turns) if saved: - with agent.messages_lock: - agent.messages = saved - logger.debug( - f"[AgentInitializer] Restored {len(saved)} messages " - f"({restore_turns} turns cap) for session={session_id}" - ) + filtered = self._filter_text_only_messages(saved) + if filtered: + with agent.messages_lock: + agent.messages = filtered + logger.debug( + f"[AgentInitializer] Restored {len(filtered)} text messages " + f"(from {len(saved)} total, {restore_turns} turns cap) " + f"for session={session_id}" + ) except Exception as e: logger.warning( f"[AgentInitializer] Failed to restore conversation history for " f"session={session_id}: {e}" ) + + @staticmethod + def _filter_text_only_messages(messages: list) -> list: + """ + Filter messages to keep only user text and assistant text. + + Strips out: + - assistant messages that only contain tool_use (no text) + - user messages that only contain tool_result (no text) + - internal hint messages injected by the agent loop + + Keeps: + - user messages with actual text content + - assistant messages with text content (tool_use blocks removed, + only the text portion is kept) + """ + filtered = [] + for msg in messages: + role = msg.get("role") + content = msg.get("content") + + if isinstance(content, str): + if content.strip(): + filtered.append(msg) + continue + + if not isinstance(content, list): + continue + + if role == "user": + text_parts = [ + b.get("text", "") + for b in content + if isinstance(b, dict) and b.get("type") == "text" + ] + has_tool_result = any( + isinstance(b, dict) and b.get("type") == "tool_result" + for b in content + ) + if has_tool_result: + continue + text = "\n".join(p for p in text_parts if p).strip() + if text: + filtered.append({ + "role": "user", + "content": [{"type": "text", "text": text}] + }) + + elif role == "assistant": + text_parts = [ + b.get("text", "") + for b in content + if isinstance(b, dict) and b.get("type") == "text" + ] + text = "\n".join(p for p in text_parts if p).strip() + if text: + filtered.append({ + "role": "assistant", + "content": [{"type": "text", "text": text}] + }) + + return filtered def _load_env_file(self): """Load environment variables from .env file"""