Spaces:
Sleeping
Sleeping
| """ | |
| Agent 核心逻辑 - 实现 ReAct 模式和 Memory 机制 | |
| """ | |
| import json | |
| import os | |
| import re | |
| from typing import List, Dict, Optional, Any, Callable | |
| from dataclasses import dataclass, field | |
| from datetime import datetime | |
| from src.searcher import CodeSearcher | |
| from src.api_key_manager import ApiKeyManager | |
| from prompts import get_system_prompt, get_react_format_prompt | |
| class Memory: | |
| """记忆结构""" | |
| file_path: str | |
| overview: str = "" | |
| key_definitions: List[str] = field(default_factory=list) | |
| core_logic: str = "" | |
| dependencies: List[str] = field(default_factory=list) | |
| needed_info: str = "" | |
| def to_dict(self) -> Dict: | |
| return { | |
| "file": self.file_path, | |
| "overview": self.overview, | |
| "key_definitions": self.key_definitions, | |
| "core_logic": self.core_logic, | |
| "dependencies": self.dependencies, | |
| "needed_info": self.needed_info | |
| } | |
| def to_string(self) -> str: | |
| parts = [f"📄 {self.file_path}"] | |
| if self.overview: | |
| parts.append(f"概述: {self.overview}") | |
| if self.key_definitions: | |
| parts.append(f"关键定义: {'; '.join(self.key_definitions)}") | |
| if self.core_logic: | |
| parts.append(f"核心逻辑: {self.core_logic}") | |
| if self.dependencies: | |
| parts.append(f"依赖: {' -> '.join(self.dependencies)}") | |
| if self.needed_info: | |
| parts.append(f"待验证: {self.needed_info}") | |
| return "\n".join(parts) | |
| class ToolExecutor: | |
| """工具执行器""" | |
| def __init__(self, searcher: CodeSearcher): | |
| self.searcher = searcher | |
| self._tool_registry: Dict[str, Callable] = {} | |
| def register_tools(self): | |
| """注册可用工具""" | |
| self._tool_registry = { | |
| "read_file": self._read_file, | |
| "find_files": self._find_files, | |
| "search_code": self._search_code, | |
| "find_by_ext": self._find_by_ext, | |
| "list_dir": self._list_dir, | |
| "get_file_info": self._get_file_info, | |
| } | |
| def execute_tool(self, tool_name: str, **kwargs) -> Dict: | |
| """执行工具""" | |
| if tool_name not in self._tool_registry: | |
| return {"error": f"未知工具: {tool_name}"} | |
| try: | |
| result = self._tool_registry[tool_name](**kwargs) | |
| return {"success": True, "tool": tool_name, "result": result} | |
| except Exception as e: | |
| return {"success": False, "tool": tool_name, "error": str(e)} | |
| def _read_file(self, path: str, max_lines: int = 500, start_line: int = 1) -> Dict: | |
| return self.searcher.read_file(path, max_lines, start_line) | |
| def _find_files(self, pattern: str = "*", path: str = ".", max_results: int = 20) -> List[str]: | |
| return self.searcher.find_files(pattern, path, max_results) | |
| def _search_code(self, keyword: str, extensions: str = "*", max_results: int = 20) -> List[Dict]: | |
| return self.searcher.search_code(keyword, extensions, max_results) | |
| def _find_by_ext(self, extensions: str = "py", max_results: int = 20) -> List[str]: | |
| return self.searcher.find_by_ext(extensions, max_results) | |
| def _list_dir(self, path: str = ".") -> Dict: | |
| return self.searcher.list_dir(path) | |
| def _get_file_info(self, path: str) -> Dict: | |
| return self.searcher.get_file_info(path) | |
| def get_available_tools(self) -> List[Dict]: | |
| """获取可用工具列表""" | |
| return [ | |
| { | |
| "name": "read_file", | |
| "description": "读取文件内容", | |
| "params": { | |
| "path": {"type": "string", "description": "文件路径"}, | |
| "max_lines": {"type": "integer", "description": "最大行数", "default": 500}, | |
| "start_line": {"type": "integer", "description": "起始行号", "default": 1} | |
| } | |
| }, | |
| { | |
| "name": "find_files", | |
| "description": "按文件名模式查找文件", | |
| "params": { | |
| "pattern": {"type": "string", "description": "文件名模式,如 *.py"}, | |
| "max_results": {"type": "integer", "description": "最大结果数", "default": 20} | |
| } | |
| }, | |
| { | |
| "name": "search_code", | |
| "description": "搜索代码内容", | |
| "params": { | |
| "keyword": {"type": "string", "description": "搜索关键词"}, | |
| "extensions": {"type": "string", "description": "文件扩展名", "default": "*"}, | |
| "max_results": {"type": "integer", "description": "最大结果数", "default": 20} | |
| } | |
| }, | |
| { | |
| "name": "find_by_ext", | |
| "description": "按扩展名查找文件", | |
| "params": { | |
| "extensions": {"type": "string", "description": "扩展名,如 py,js"}, | |
| "max_results": {"type": "integer", "description": "最大结果数", "default": 20} | |
| } | |
| }, | |
| { | |
| "name": "list_dir", | |
| "description": "列出目录内容", | |
| "params": { | |
| "path": {"type": "string", "description": "目录路径", "default": "."} | |
| } | |
| }, | |
| { | |
| "name": "get_file_info", | |
| "description": "获取文件信息", | |
| "params": { | |
| "path": {"type": "string", "description": "文件路径"} | |
| } | |
| } | |
| ] | |
| class ReadAgent: | |
| """Read Agent 主类""" | |
| def __init__( | |
| self, | |
| code_dir: str = "./repos", | |
| api_key: Optional[str] = None, | |
| base_url: Optional[str] = None, | |
| model: str = "gpt-4", | |
| max_steps: int = 10, | |
| stream_output: bool = True, | |
| tree_depth: int = 3, | |
| api_key_manager=None, | |
| max_retries: int = None, | |
| retry_delays: list = None | |
| ): | |
| self.searcher = CodeSearcher(code_dir, use_index=True, lazy_index=True) | |
| self.tool_executor = ToolExecutor(self.searcher) | |
| self.tool_executor.register_tools() | |
| self.base_url = base_url or os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1") | |
| self.model = model or os.getenv("OPENAI_MODEL", "gpt-4") | |
| self.max_steps = max_steps | |
| self.stream_output = stream_output | |
| self.tree_depth = tree_depth | |
| # 重试配置 | |
| self.max_retries = max_retries or int(os.getenv("MAX_RETRIES", "3")) | |
| self.retry_delays = retry_delays or [float(d.strip()) for d in os.getenv("RETRY_DELAYS", "1,2,4").split(",")] | |
| # API Key 管理器(支持多 key 随机选择) | |
| self.api_key_manager = api_key_manager | |
| if self.api_key_manager is None: | |
| # 如果没有提供 ApiKeyManager,创建一个单 key 的 | |
| key = api_key or os.getenv("OPENAI_API_KEY") | |
| if key: | |
| from src.api_key_manager import ApiKeyManager | |
| self.api_key_manager = ApiKeyManager(key) | |
| else: | |
| self.api_key_manager = None | |
| self.api_key = None | |
| self.conversation_history: List[Dict] = [] | |
| self.memories: List[Memory] = [] | |
| self.steps: List[Dict] = [] | |
| # 预加载目录树(延迟化,需要时才生成) | |
| self._dir_tree_cached = None | |
| self.tree_depth = tree_depth | |
| def _extract_thought_action(self, response: str) -> tuple: | |
| """从响应中提取 Thought 和 Action(JSON 格式) | |
| Returns: | |
| (thought, actions_list) 其中 actions_list 是 [(action_name, args_dict), ...] | |
| 如果是单个 action,actions_list 长度为 1 | |
| 如果是批量 actions,actions_list 长度 > 1 | |
| 如果没有 action,actions_list 为空列表 | |
| """ | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| thought = "" | |
| actions_list = [] | |
| # 检查响应是否为空 | |
| if not response or not response.strip(): | |
| logger.warning("[_extract_thought_action] LLM 返回空响应") | |
| return thought, actions_list | |
| # 尝试提取 JSON 块 | |
| json_match = re.search(r'\{.*\}', response, re.DOTALL) | |
| if not json_match: | |
| logger.warning("[_extract_thought_action] 未能找到 JSON 格式") | |
| return thought, actions_list | |
| try: | |
| data = json.loads(json_match.group()) | |
| except json.JSONDecodeError as e: | |
| logger.warning(f"[_extract_thought_action] JSON 解析失败: {e}") | |
| return thought, actions_list | |
| # 提取 thought | |
| thought = data.get("thought", "") | |
| if len(thought) > 5000: | |
| logger.warning(f"[_extract_thought_action] Thought 过长 ({len(thought)} chars),截断到 5000") | |
| thought = thought[:5000] | |
| valid_tools = set(self.tool_executor._tool_registry.keys()) | |
| # 检查批量 actions | |
| if "actions" in data and isinstance(data["actions"], list): | |
| for action_item in data["actions"]: | |
| tool = action_item.get("tool") | |
| args = action_item.get("args", {}) | |
| if tool and tool in valid_tools: | |
| actions_list.append((tool, args)) | |
| elif tool: | |
| logger.warning(f"[_extract_thought_action] 未知的 Action: '{tool}'") | |
| logger.debug(f"[_extract_thought_action] 批量提取了 {len(actions_list)} 个 Actions") | |
| # 检查单个 action | |
| elif "action" in data: | |
| action_item = data.get("action", {}) | |
| tool = action_item.get("tool") | |
| args = action_item.get("args", {}) | |
| if tool and tool in valid_tools: | |
| actions_list.append((tool, args)) | |
| elif tool: | |
| logger.warning(f"[_extract_thought_action] 未知的 Action: '{tool}'") | |
| return thought, actions_list | |
| def _extract_final_answer(self, response: str) -> tuple: | |
| """提取最终答案和 Memory(JSON 格式)""" | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| answer = "" | |
| memory_data = None | |
| # 检查响应是否为空 | |
| if not response or not response.strip(): | |
| logger.warning("[_extract_final_answer] 响应为空") | |
| return answer, memory_data | |
| # 尝试提取 JSON 块 | |
| json_match = re.search(r'\{.*\}', response, re.DOTALL) | |
| if not json_match: | |
| logger.warning("[_extract_final_answer] 未能找到 JSON 格式") | |
| return answer, memory_data | |
| try: | |
| data = json.loads(json_match.group()) | |
| except json.JSONDecodeError as e: | |
| logger.warning(f"[_extract_final_answer] JSON 解析失败: {e}") | |
| return answer, memory_data | |
| # 提取 final_answer | |
| if "final_answer" in data: | |
| answer = data.get("final_answer", "") | |
| if len(answer) > 10000: | |
| logger.warning(f"[_extract_final_answer] Final Answer 过长 ({len(answer)} chars),截断到 10000") | |
| answer = answer[:10000] | |
| # 提取 memory | |
| if "memory" in data: | |
| memory = data.get("memory", {}) | |
| if "file" in memory: | |
| memory_data = { | |
| "file": memory.get("file", ""), | |
| "overview": memory.get("overview", ""), | |
| "key_definitions": memory.get("key_definitions", []), | |
| "core_logic": memory.get("core_logic", ""), | |
| "dependencies": memory.get("dependencies", []), | |
| "needed_info": memory.get("needed_info", "") | |
| } | |
| return answer, memory_data | |
| def _call_llm(self, messages: List[Dict]) -> str: | |
| """调用 LLM API(支持流式输出和自动重试)""" | |
| import urllib.request | |
| import urllib.error | |
| import time | |
| # 使用实例的重试配置 | |
| max_retries = self.max_retries | |
| retry_delays = self.retry_delays | |
| # 可重试的 HTTP 状态码 | |
| retryable_status_codes = [401, 429, 500, 502, 503, 504] | |
| for attempt in range(max_retries): | |
| # 从管理器获取 API key(轮询) | |
| api_key = None | |
| if self.api_key_manager: | |
| api_key = self.api_key_manager.get_key() | |
| else: | |
| api_key = self.api_key | |
| if not api_key: | |
| raise Exception("未配置 API Key") | |
| headers = { | |
| "Content-Type": "application/json", | |
| "Authorization": f"Bearer {api_key}" | |
| } | |
| data = { | |
| "model": self.model, | |
| "messages": messages, | |
| "temperature": 0.3, | |
| "stream": True # 启用流式输出 | |
| } | |
| full_content = "" | |
| try: | |
| req = urllib.request.Request( | |
| f"{self.base_url}/chat/completions", | |
| headers=headers, | |
| data=json.dumps(data).encode("utf-8"), | |
| method="POST" | |
| ) | |
| with urllib.request.urlopen(req, timeout=60) as response: | |
| for line in response: | |
| line = line.decode("utf-8").strip() | |
| if not line.startswith("data: "): | |
| continue | |
| if line == "data: [DONE]": | |
| break | |
| data_str = line[6:] # 移除 "data: " 前缀 | |
| try: | |
| chunk = json.loads(data_str) | |
| if chunk.get("choices") and len(chunk["choices"]) > 0: | |
| delta = chunk["choices"][0].get("delta", {}) | |
| content = delta.get("content", "") | |
| if content: | |
| # 流式输出思考内容 | |
| if self.stream_output: | |
| print(content, end="", flush=True) | |
| full_content += content | |
| except json.JSONDecodeError: | |
| continue | |
| # 流式输出完成后换行 | |
| if self.stream_output: | |
| print() | |
| # 记录成功 | |
| if self.api_key_manager: | |
| self.api_key_manager.record_success(api_key) | |
| return full_content | |
| except urllib.error.HTTPError as e: | |
| error_body = e.read().decode("utf-8") if e.fp else "" | |
| # 检查是否可重试 | |
| if e.code in retryable_status_codes and attempt < max_retries - 1: | |
| # 记录失败但不立即抛出 | |
| if self.api_key_manager: | |
| self.api_key_manager.record_error(api_key, f"HTTP {e.code}: {error_body} (重试 {attempt + 1}/{max_retries})") | |
| # 获取延迟(如果超出数组长度,使用最后一个值) | |
| delay = retry_delays[min(attempt, len(retry_delays) - 1)] | |
| if self.stream_output: | |
| print(f"\n\n⏳ API 返回 {e.code},{delay} 秒后重试... ({attempt + 1}/{max_retries})", flush=True) | |
| else: | |
| import logging | |
| logging.getLogger(__name__).warning(f"API 返回 {e.code},{delay} 秒后重试... ({attempt + 1}/{max_retries})") | |
| time.sleep(delay) | |
| continue | |
| else: | |
| # 最后一次重试或不可重试的错误 | |
| if self.api_key_manager: | |
| self.api_key_manager.record_error(api_key, f"HTTP {e.code}: {error_body}") | |
| raise Exception(f"API 错误: {e.code} - {error_body}") | |
| except urllib.error.URLError as e: | |
| # 网络错误,可重试 | |
| if attempt < max_retries - 1: | |
| if self.api_key_manager: | |
| self.api_key_manager.record_error(api_key, f"网络错误: {str(e)} (重试 {attempt + 1}/{max_retries})") | |
| # 获取延迟(如果超出数组长度,使用最后一个值) | |
| delay = retry_delays[min(attempt, len(retry_delays) - 1)] | |
| if self.stream_output: | |
| print(f"\n\n⏳ 网络错误,{delay} 秒后重试... ({attempt + 1}/{max_retries})", flush=True) | |
| else: | |
| import logging | |
| logging.getLogger(__name__).warning(f"网络错误,{delay} 秒后重试... ({attempt + 1}/{max_retries})") | |
| time.sleep(delay) | |
| continue | |
| else: | |
| if self.api_key_manager: | |
| self.api_key_manager.record_error(api_key, f"网络错误: {str(e)}") | |
| raise Exception(f"网络错误: {str(e)}") | |
| except Exception as e: | |
| # 避免在错误信息中泄露敏感信息 | |
| error_msg = str(e) | |
| if "sk-" in error_msg: | |
| error_msg = "API 配置错误: OPENAI_BASE_URL 设置不正确" | |
| # 记录失败(不重试其他类型的错误) | |
| if self.api_key_manager: | |
| self.api_key_manager.record_error(api_key, error_msg) | |
| raise Exception(f"请求错误: {error_msg}") | |
| def _build_system_prompt(self) -> str: | |
| """构建系统提示词""" | |
| tools_info = self.tool_executor.get_available_tools() | |
| memories_info = "" | |
| if self.memories: | |
| memories_info = "\n\n已读取文件的 Memory:\n" + "\n".join( | |
| [m.to_string() for m in self.memories] | |
| ) | |
| # 目录树信息(延迟化,只在第一次调用时生成) | |
| dir_tree_info = "" | |
| if self._dir_tree_cached is None and self.tree_depth > 0: | |
| # 第一次需要时才生成,之后缓存起来 | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| logger.info("正在生成目录树...") | |
| self._dir_tree_cached = self.searcher.get_dir_tree(self.tree_depth) | |
| logger.info(f"目录树生成完成,共 {len(self._dir_tree_cached)} 个节点") | |
| elif self._dir_tree_cached: | |
| dir_tree_info = f"\n\n代码目录结构({self.tree_depth}层):\n{self._dir_tree_cached}" | |
| # 使用 prompts.py 中的函数生成系统提示词 | |
| return get_system_prompt( | |
| tools_info=tools_info, | |
| max_steps=self.max_steps, | |
| memories=memories_info, | |
| dir_tree=dir_tree_info | |
| ) | |
| def _format_step(self, step: Dict) -> str: | |
| """格式化步骤显示(支持批量执行)""" | |
| parts = [f"\n🔄 步骤 {step['step']}"] | |
| if step.get("thought"): | |
| parts.append(f"💭 思考: {step['thought']}") | |
| if step.get("action"): | |
| parts.append(f"🔧 行动: {step['action']}") | |
| # 处理批量执行结果 | |
| if step.get("batch_results"): | |
| parts.append(f"🔧 批量执行完成 ({step['batch_actions']} 个操作)") | |
| for item in step['batch_results']: | |
| idx = item.get('index', 0) | |
| action_str = item.get('action', '') | |
| result = item.get('result', {}) | |
| parts.append(f" [{idx}] {action_str}") | |
| if isinstance(result, dict): | |
| if result.get("success"): | |
| parts.append(f" ✅ 成功: {str(result.get('result', ''))[:200]}") | |
| else: | |
| parts.append(f" ❌ 错误: {result.get('error', 'Unknown')}") | |
| else: | |
| parts.append(f" 📋 结果: {str(result)[:200]}") | |
| elif step.get("observation"): | |
| # 单个执行结果 | |
| obs = step['observation'] | |
| if isinstance(obs, dict) and not obs.get('batch'): | |
| if obs.get("success"): | |
| parts.append(f"✅ 结果: {json.dumps(obs.get('result'), ensure_ascii=False, indent=2)[:500]}") | |
| else: | |
| parts.append(f"❌ 错误: {obs.get('error')}") | |
| else: | |
| parts.append(f"📋 结果: {str(obs)[:500]}") | |
| return "\n".join(parts) | |
| def _think_and_act(self, user_question: str) -> str: | |
| """思考并执行行动""" | |
| # 构建消息 | |
| messages = [ | |
| {"role": "system", "content": self._build_system_prompt()} | |
| ] | |
| # 添加对话历史 | |
| for msg in self.conversation_history: | |
| messages.append(msg) | |
| # 添加当前问题 | |
| messages.append({ | |
| "role": "user", | |
| "content": f"用户问题:{user_question}\n\n{get_react_format_prompt()}" | |
| }) | |
| return self._call_llm(messages) | |
| def ask(self, question: str) -> str: | |
| """ | |
| 询问关于代码库的问题 | |
| Args: | |
| question: 用户问题 | |
| Returns: | |
| Agent 的回答 | |
| """ | |
| self.steps = [] | |
| self.conversation_history.append({"role": "user", "content": question}) | |
| # 确保索引已构建(首次调用时) | |
| self.searcher._ensure_index() | |
| # 流式模式下输出标题 | |
| if self.stream_output: | |
| print(f"\n{'='*60}") | |
| print(f"🤔 问题: {question}") | |
| print(f"\n📝 分析过程:") | |
| for step in range(1, self.max_steps + 1): | |
| # 获取思考和行动 | |
| response = self._think_and_act(question) | |
| # 记录步骤 | |
| step_info = {"step": step, "raw_response": response} | |
| thought, actions_list = self._extract_thought_action(response) | |
| step_info["thought"] = thought | |
| # 检查是否有最终答案和 Memory | |
| final_answer, memory_data = self._extract_final_answer(response) | |
| # 如果有 Memory,保存到列表 | |
| if memory_data: | |
| path = memory_data.get("file", "") | |
| if path: | |
| # 检查是否已存在 | |
| existing = [m for m in self.memories if m.file_path == path] | |
| if existing: | |
| self.memories.remove(existing[0]) | |
| # 创建新的 Memory 对象 | |
| memory = Memory( | |
| file_path=path, | |
| overview=memory_data.get("overview", ""), | |
| key_definitions=memory_data.get("key_definitions", []), | |
| core_logic=memory_data.get("core_logic", ""), | |
| dependencies=memory_data.get("dependencies", []), | |
| needed_info=memory_data.get("needed_info", "") | |
| ) | |
| self.memories.append(memory) | |
| if final_answer: | |
| step_info["final_answer"] = final_answer | |
| self.steps.append(step_info) | |
| self.conversation_history.append({"role": "assistant", "content": final_answer}) | |
| # 流式输出最终答案 | |
| if self.stream_output: | |
| print(f"\n{'='*60}") | |
| print(f"💡 回答:\n{final_answer}") | |
| return "" | |
| else: | |
| return self._format_output(question, final_answer) | |
| # 执行工具调用(支持批量) | |
| # 注意:actions_list 可能是空列表(只有 Thought,没有 Action) | |
| if actions_list: | |
| # 如果是批量执行 | |
| if len(actions_list) > 1: | |
| step_info["batch_actions"] = len(actions_list) | |
| step_info["action"] = f"批量执行 {len(actions_list)} 个操作" | |
| batch_results = [] | |
| # 流式输出批量执行提示 | |
| if self.stream_output: | |
| print(f"\n🔄 步骤 {step}") | |
| print(f"💭 思考: {thought}") | |
| print(f"🔧 批量执行 {len(actions_list)} 个操作:") | |
| # 批量执行所有 Actions | |
| for i, (action, action_args) in enumerate(actions_list, 1): | |
| tool_result = self.tool_executor.execute_tool(action, **action_args) | |
| batch_results.append({ | |
| "index": i, | |
| "action": f"{action}({action_args})", | |
| "result": tool_result | |
| }) | |
| # 流式输出单个 Action 结果 | |
| if self.stream_output: | |
| print(f" [{i}/{len(actions_list)}] {action}({action_args})") | |
| if tool_result.get("success"): | |
| print(f" ✅ 完成") | |
| else: | |
| print(f" ❌ 错误: {tool_result.get('error', 'Unknown')}") | |
| step_info["batch_results"] = batch_results | |
| step_info["observation"] = {"batch": True, "results": batch_results} | |
| # 将批量观察结果添加到对话(纯文本) | |
| obs_text = "结果: " + "; ".join([ | |
| f"{r['action']}: {r['result'].get('success') and '成功' or r['result'].get('error', '完成')}" | |
| for r in batch_results | |
| ]) | |
| self.conversation_history.append({ | |
| "role": "user", | |
| "content": obs_text | |
| }) | |
| else: | |
| # 单个 Action | |
| action, action_args = actions_list[0] | |
| step_info["action"] = f"{action}({action_args})" | |
| tool_result = self.tool_executor.execute_tool(action, **action_args) | |
| step_info["observation"] = tool_result | |
| # 流式输出当前步骤 | |
| if self.stream_output: | |
| print(self._format_step(step_info)) | |
| # 将观察结果添加到对话(纯文本) | |
| if tool_result.get("success"): | |
| result = tool_result.get("result", "") | |
| obs_text = f"结果: {str(result)[:500]}" | |
| else: | |
| obs_text = f"错误: {tool_result.get('error', '未知错误')}" | |
| self.conversation_history.append({ | |
| "role": "user", | |
| "content": obs_text | |
| }) | |
| self.steps.append(step_info) | |
| # 超时,返回最后的结果 | |
| if self.stream_output: | |
| print(f"\n{'='*60}") | |
| print(f"💡 回答:\n已达到最大步骤数限制,请尝试更具体的问题。") | |
| return "" | |
| else: | |
| return self._format_output(question, "已达到最大步骤数限制,请尝试更具体的问题。") | |
| def _format_output(self, question: str, answer: str) -> str: | |
| """格式化输出""" | |
| output = [f"\n{'='*60}"] | |
| output.append(f"🤔 问题: {question}") | |
| output.append(f"\n📝 分析过程:") | |
| for step_info in self.steps: | |
| output.append(self._format_step(step_info)) | |
| output.append(f"\n{'='*60}") | |
| output.append(f"💡 回答:\n{answer}") | |
| return "\n".join(output) | |
| def clear_memory(self): | |
| """清空 Memory""" | |
| self.memories = [] | |
| def clear_history(self): | |
| """清空对话历史""" | |
| self.conversation_history = [] | |
| self.memories = [] | |
| self.steps = [] | |
| def get_stats(self) -> Dict: | |
| """获取统计信息""" | |
| return { | |
| "conversation_length": len(self.conversation_history), | |
| "memory_count": len(self.memories), | |
| "total_steps": len(self.steps), | |
| "code_dir": str(self.searcher.root_dir) | |
| } | |